Source code for formulas.functions

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2023 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides functions implementations to compile the Excel functions.

Sub-Modules:

.. currentmodule:: formulas.functions

.. autosummary::
    :nosignatures:
    :toctree: functions/

    ~comp
    ~date
    ~eng
    ~financial
    ~google
    ~info
    ~logic
    ~look
    ~math
    ~operators
    ~stat
    ~text
"""
import re
import copy
import importlib
import functools
import collections
import numpy as np
import schedula as sh
from collections.abc import Iterable
from formulas.errors import (
    RangeValueError, FoundError, BaseError, BroadcastError, InvalidRangeError
)
from formulas.tokens.operand import Error, XlError

COMPILING = sh.Token('Run')


def _init_reshape(base_shape, value):
    res, (r, c) = np.empty(base_shape, object), value.shape
    res[:, :] = getattr(value, '_default', Error.errors['#N/A'])
    r = None if r == 1 else r
    c = None if c == 1 else c
    return res, r, c


[docs] class Array(np.ndarray): _default = Error.errors['#N/A'] _collapse_value = None
[docs] def reshape(self, shape, *shapes, order='C'): try: # noinspection PyArgumentList return super(Array, self).reshape(shape, *shapes, order=order) except ValueError: res, r, c = _init_reshape(shape, self) try: res[:r, :c] = self except ValueError: res[:, :] = self.collapse(shape) return res
[docs] def collapse(self, shape): if self._collapse_value is not None and \ tuple(shape) == (1, 1) != self.shape: return self._collapse_value return np.resize(self, shape)
def __reduce__(self): reduce = super(Array, self).__reduce__() # Get the parent's __reduce__. state = { '_collapse_value': self._collapse_value, '_default': self._default }, # Additional state params to pass to __setstate__. return reduce[0], reduce[1], reduce[2] + state def __setstate__(self, state, *args, **kwargs): self.__dict__.update(state[-1]) # Set the attributes. super(Array, self).__setstate__(state[0:-1], *args, **kwargs) def __deepcopy__(self, memo=None, *args, **kwargs): obj = super(Array, self).__deepcopy__(memo, *args, **kwargs) # noinspection PyArgumentList obj._collapse_value = copy.deepcopy(self._collapse_value, memo) # noinspection PyArgumentList obj._default = copy.deepcopy(self._default, memo) return obj def __hash__(self): return hash(self.tolist())
# noinspection PyUnusedLocal
[docs] def not_implemented(*args, **kwargs): raise NotImplementedError
[docs] def replace_empty(x, empty=0): if isinstance(x, np.ndarray): obj = np.array(sh.EMPTY, dtype=object) if obj in x: x = np.where(obj == x, empty, x) elif x is sh.EMPTY: return empty return x
[docs] def is_not_empty(v): return v is not sh.EMPTY
[docs] def wrap_impure_func(func): def wrapper(compiling, *args, **kwargs): return sh.NONE if compiling else func(*args, **kwargs) return functools.update_wrapper(wrapper, func)
# noinspection PyUnusedLocal
[docs] def wrap_func(func, ranges=False): def wrapper(*args, **kwargs): # noinspection PyBroadException try: return func(*args, **kwargs) except FoundError as ex: return np.asarray([[ex.err]], object) except InvalidRangeError: return np.asarray([[Error.errors['#VALUE!']]], object) except BaseError as ex: raise ex except Exception: return np.asarray([[Error.errors['#VALUE!']]], object) if not ranges: return wrap_ranges_func(functools.update_wrapper(wrapper, func)) return functools.update_wrapper(wrapper, func)
[docs] def wrap_ranges_func(func, n_out=1): def wrapper(*args, **kwargs): try: args, kwargs = parse_ranges(*args, **kwargs) return func(*args, **kwargs) except RangeValueError: return sh.bypass(*((sh.NONE,) * n_out)) return functools.update_wrapper(wrapper, func)
[docs] def parse_ranges(*args, **kw): from ..ranges import Ranges args = tuple(v.value if isinstance(v, Ranges) else v for v in args) kw = {k: v.value if isinstance(v, Ranges) else v for k, v in kw.items()} return args, kw
SUBMODULES = [ '.info', '.logic', '.math', '.stat', '.financial', '.text', '.look', '.eng', '.date', '.comp', '.google' ] # noinspection PyDictCreation FUNCTIONS = {} FUNCTIONS['ARRAY'] = lambda *args: np.asarray(args, object).view(Array) FUNCTIONS['ARRAYROW'] = lambda *args: np.asarray(args, object).view(Array)
[docs] def get_error(*vals): # noinspection PyTypeChecker for v in flatten(vals, None, True): if isinstance(v, XlError): return v
[docs] def raise_errors(*args): # noinspection PyTypeChecker v = get_error(*args) if v: raise FoundError(err=v)
def _to_number(number): if isinstance(number, (bool, np.bool_)) and number: return np.nan try: return float(number) except (ValueError, TypeError): return np.nan @functools.lru_cache(None) def _compile_func(func): return np.frompyfunc(func, 1, 1)
[docs] def to_number(*args, **kwargs): return _compile_func(_to_number)(*args, **kwargs)
[docs] def clean_values(values): return values[values != np.array(sh.EMPTY, dtype=object)]
[docs] def is_number(number, xl_return=True, bool_return=False): if isinstance(number, (bool, np.bool_)): return bool_return elif isinstance(number, XlError): return xl_return elif number is sh.EMPTY: return False else: try: float(number) except (ValueError, TypeError): return False return True
def _text2num(value): if isinstance(value, Array) and not value.shape: value = value.tolist() if not isinstance(value, Error) and isinstance(value, str): try: return float(value) except (ValueError, TypeError): from .date import xdate, _text2datetime try: return xdate(*_text2datetime(value)[:3]) except (FoundError, AssertionError): pass return value
[docs] def convert2float(*a): return map(_convert2float, a)
def _convert2float(v): if isinstance(v, XlError): raise FoundError(err=v) if isinstance(v, bool): return int(v) if isinstance(v, str): return float(_text2num(v)) return float(v) def _convert_args(v): if isinstance(v, XlError): return v if isinstance(v, bool): return int(v) if isinstance(v, str): return float(_text2num(v)) return v @functools.lru_cache(None) def _text2num_vectorize(): return np.vectorize(_text2num, otypes=[object])
[docs] def text2num(*args, **kwargs): return _text2num_vectorize()(*args, **kwargs)
def _get_single_args(*args): res = [] for v in args: v = tuple(flatten(v, None)) if len(v) != 1 or isinstance(v[0], bool): raise FoundError(err=Error.errors['#VALUE!']) res.append(v[0]) return res _re_condition = re.compile('(?<!~)[?*]') def _xfilter(accumulator, test_range, condition, operating_range): from .operators import LOGIC_OPERATORS operator, operating_range = '=', np.asarray(operating_range) if isinstance(condition, str): for k in LOGIC_OPERATORS: if condition.startswith(k) and condition != k: operator, condition = k, condition[len(k):] break if operator == '=': it = _re_condition.findall(condition) if it: _ = lambda v: re.escape(v.replace('~?', '?').replace('~*', '*')) match = re.compile(''.join(sum(zip( map(_, _re_condition.split(condition)), tuple(map(lambda v: '.%s' % v, it)) + ('',) ), ()))).match f = lambda v: isinstance(v, str) and bool(match(v)) b = np.vectorize(f, otypes=[bool])(test_range['raw']) try: return accumulator(operating_range[b]) except FoundError as ex: return ex.err elif any(v in condition for v in ('~?', '~*')): condition = condition.replace('~?', '?').replace('~*', '*') from ..tokens.operand import Number, Error from ..errors import TokenError for token in (Number, Error): try: token = token(condition) if token.end_match == len(condition): condition = token.compile() break except TokenError: pass condition = _text2num(condition) from .operators import _get_type_id type_id, operator = _get_type_id(condition), LOGIC_OPERATORS[operator] @functools.lru_cache() def check(value): return _get_type_id(value) == type_id and operator(value, condition) if is_number(condition): if 'num' not in test_range: test_range['num'] = text2num(test_range['raw']) b = np.vectorize(check, otypes=[bool])(test_range['num']) else: b = np.vectorize(check, otypes=[bool])(test_range['raw']) try: return accumulator(operating_range[b]) except FoundError as ex: return ex.err _xfilter = np.vectorize(_xfilter, otypes=[object], excluded={0, 1, 3})
[docs] def xfilter(accumulator, test_range, condition, operating_range=None): operating_range = test_range if operating_range is None else operating_range # noinspection PyTypeChecker test_range = {'raw': replace_empty(test_range, '')} res = _xfilter(accumulator, test_range, condition, operating_range) return res.view(Array)
[docs] def flatten(v, check=is_number, drop_empty=False): if isinstance(v, np.ndarray): if drop_empty or check is is_number or check is is_not_empty: v = v[v != np.array(sh.EMPTY, dtype=object)] if not check or check is is_not_empty: yield from v.ravel() else: yield from filter(check, v.ravel()) elif not isinstance(v, str) and isinstance(v, Iterable): for el in v: yield from flatten(el, check, drop_empty) elif not check or check(v): yield v
# noinspection PyUnusedLocal
[docs] def value_return(res, *args): res._collapse_value = Error.errors['#VALUE!'] return res
[docs] def convert_nan(value, default=Error.errors['#NUM!']): return value if np.isfinite(value) else default
[docs] def convert_noshp(value): if isinstance(value, np.ndarray) and not value.shape: value = value.ravel()[0] return value
[docs] def args2vals(args): return (np.ravel(v)[0] for v in args)
[docs] def args2list(max_shape, shapes, *args): it = [] for arg, shape in zip(args, shapes): if not shape or shape[0] == 1: arg = np.tile(arg, max_shape) elif shape[0] != max_shape: raise BroadcastError() it.append(arg) return map(args2vals, zip(*it))
[docs] def wrap_ufunc( func, input_parser=lambda *a: map(float, a), check_error=get_error, args_parser=lambda *a: map(replace_empty, a), otype=Array, ranges=False, return_func=lambda res, *args: res, check_nan=True, **kw): """Helps call a numpy universal function (ufunc).""" def safe_eval(*vals): try: r = check_error(*vals) or convert_noshp(func(*input_parser(*vals))) if check_nan and not isinstance(r, (XlError, str)): r = convert_nan(r) except FoundError as ex: r = ex.err except (ValueError, TypeError): r = Error.errors['#VALUE!'] return r kw['otypes'] = kw.get('otypes', [object]) # noinspection PyUnusedLocal def wrapper(*args, **kwargs): try: args = tuple(args_parser(*args)) with np.errstate(divide='ignore', invalid='ignore'): if len(args) >= 32: shapes = [np.shape(arg) for arg in args] max_shape = max((s or (1,))[0] for s in shapes) if max_shape == 1: res = np.asarray([[ safe_eval(*args2vals(args)) ]], object).view(otype) else: res = np.asarray([safe_eval(*v) for v in args2list( max_shape, shapes, *args )], object).view(otype) else: res = np.vectorize(safe_eval, **kw)(*args) try: res = res.view(otype) except AttributeError: res = np.asarray([[res]], object).view(otype) return return_func(res, *args) except ValueError as ex: try: np.broadcast(*args) except ValueError: raise BroadcastError() raise ex return wrap_func(functools.update_wrapper(wrapper, func), ranges=ranges)
[docs] @functools.lru_cache() def get_functions(): functions = collections.defaultdict(lambda: not_implemented) for name in SUBMODULES: functions.update(importlib.import_module(name, __name__).FUNCTIONS) functions.update(FUNCTIONS) return functions