#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2018 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It provides functions implementations to compile the Excel functions.
Sub-Modules:
.. currentmodule:: formulas.functions
.. autosummary::
:nosignatures:
:toctree: functions/
~eng
~financial
~info
~logic
~look
~math
~operators
~stat
~text
"""
import importlib
import functools
import collections
import numpy as np
import schedula as sh
from formulas.errors import (
RangeValueError, FunctionError, FoundError, BaseError, BroadcastError
)
from formulas.tokens.operand import Error, XlError
[docs]class Array(np.ndarray):
_default = Error.errors['#N/A']
_collapse_value = None
[docs] def reshape(self, shape, *shapes, order='C'):
try:
return super(Array, self).reshape(shape, *shapes, order=order)
except ValueError:
res, (r, c) = np.empty(shape, object), self.shape
res[:, :] = self._default
r = None if r == 1 else r
c = None if c == 1 else c
try:
res[:r, :c] = self
except ValueError:
res[:, :] = self.collapse(shape)
return res
[docs] def collapse(self, shape):
if self._collapse_value is not None and tuple(shape) == (
1, 1) != self.shape:
return self._collapse_value
return np.resize(self, shape)
def __reduce__(self):
reduce = super(Array, self).__reduce__() # Get the parent's __reduce__.
state = {
'_collapse_value': self._collapse_value,
'_default': self._default
}, # Additional state params to pass to __setstate__.
return reduce[0], reduce[1], reduce[2] + state
def __setstate__(self, state):
self.__dict__.update(state[-1]) # Set the attributes.
super(Array, self).__setstate__(state[0:-1])
# noinspection PyUnusedLocal
[docs]def not_implemented(*args, **kwargs):
raise FunctionError()
[docs]def replace_empty(x, empty=0):
if isinstance(x, np.ndarray):
y = x.ravel().tolist()
if sh.EMPTY in y:
y = [empty if v is sh.EMPTY else v for v in y]
return np.asarray(y, object).reshape(*x.shape)
return x
# noinspection PyUnusedLocal
[docs]def wrap_func(func, ranges=False):
def wrapper(*args, **kwargs):
# noinspection PyBroadException
try:
return func(*args, **kwargs)
except FoundError as ex:
return np.asarray([[ex.err]], object)
except BaseError as ex:
raise ex
except Exception:
return np.asarray([[Error.errors['#VALUE!']]], object)
if not ranges:
return wrap_ranges_func(functools.update_wrapper(wrapper, func))
return functools.update_wrapper(wrapper, func)
[docs]def wrap_ranges_func(func, n_out=1):
def wrapper(*args, **kwargs):
try:
args, kwargs = parse_ranges(*args, **kwargs)
return func(*args, **kwargs)
except RangeValueError:
return sh.bypass(*((sh.NONE,) * n_out))
return functools.update_wrapper(wrapper, func)
[docs]def parse_ranges(*args, **kw):
from ..ranges import Ranges
args = tuple(v.value if isinstance(v, Ranges) else v for v in args)
kw = {k: v.value if isinstance(v, Ranges) else v for k, v in kw.items()}
return args, kw
SUBMODULES = [
'.info', '.logic', '.math', '.stat', '.financial', '.text', '.look', '.eng'
]
# noinspection PyDictCreation
FUNCTIONS = {}
FUNCTIONS['ARRAY'] = lambda *args: np.asarray(args, object).view(Array)
FUNCTIONS['ARRAYROW'] = lambda *args: np.asarray(args, object).view(Array)
[docs]def get_error(*vals):
# noinspection PyTypeChecker
for v in flatten(vals, None):
if isinstance(v, XlError):
return v
[docs]def raise_errors(*args):
# noinspection PyTypeChecker
v = get_error(*args)
if v:
raise FoundError(err=v)
[docs]def is_number(number):
if isinstance(number, bool):
return False
elif not isinstance(number, Error):
try:
float(number)
except (ValueError, TypeError):
return False
return True
[docs]def flatten(l, check=is_number):
if not isinstance(l, str) and isinstance(l, collections.Iterable):
try:
for el in l:
yield from flatten(el, check)
except TypeError:
yield from flatten(l.tolist(), check)
elif not check or check(l):
yield l
[docs]def value_return(res, *args):
res._collapse_value = Error.errors['#VALUE!']
return res
[docs]def wrap_ufunc(
func, input_parser=lambda *a: map(float, a), check_error=get_error,
args_parser=lambda *a: map(replace_empty, a), otype=Array,
ranges=False, return_func=lambda res, *args: res, **kw):
"""Helps call a numpy universal function (ufunc)."""
def safe_eval(*vals):
try:
r = check_error(*vals) or func(*input_parser(*vals))
if not isinstance(r, (XlError, str)):
r = (np.isnan(r) or np.isinf(r)) and Error.errors['#NUM!'] or r
except (ValueError, TypeError):
r = Error.errors['#VALUE!']
return r
kw['otypes'] = kw.get('otypes', [object])
# noinspection PyUnusedLocal
def wrapper(*args, **kwargs):
try:
args = tuple(args_parser(*args))
with np.errstate(divide='ignore', invalid='ignore'):
res = np.vectorize(safe_eval, **kw)(*args)
try:
res = res.view(otype)
except AttributeError:
res = np.asarray([[res]], object).view(otype)
return return_func(res, *args)
except ValueError as ex:
try:
np.broadcast(*args)
except ValueError:
raise BroadcastError()
raise ex
return wrap_func(functools.update_wrapper(wrapper, func), ranges=ranges)
@functools.lru_cache()
def get_functions():
functions = collections.defaultdict(lambda: not_implemented)
for name in SUBMODULES:
functions.update(importlib.import_module(name, __name__).FUNCTIONS)
functions.update(FUNCTIONS)
return functions