Source code for formulas.functions.stat

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2023 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
Python equivalents of statistical Excel functions.
"""
import math
import functools
import numpy as np
import schedula as sh
from . import (
    raise_errors, flatten, wrap_func, Error, is_number, _text2num, xfilter,
    XlError, wrap_ufunc, replace_empty, get_error, is_not_empty, _convert_args,
    convert_nan, FoundError, value_return
)

FUNCTIONS = {}


def _convert(v):
    if isinstance(v, str):
        return 0
    if isinstance(v, bool):
        return int(v)
    return v


[docs] def xfunc(*args, func=max, check=is_number, convert=None, default=0, _raise=True): _raise and raise_errors(args) it = flatten(map(_convert_args, args), check=check) default = [] if default is None else [default] return func(list(map(convert, it) if convert else it) or default)
def _xaverage(v): if v: return sum(v) / len(v) return Error.errors['#DIV/0!'] xaverage = functools.partial(xfunc, func=_xaverage, default=None) FUNCTIONS['AVERAGE'] = wrap_func(xaverage) FUNCTIONS['AVERAGEA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=_xaverage, default=None )) FUNCTIONS['AVERAGEIF'] = wrap_func(functools.partial(xfilter, xaverage))
[docs] def xcorrel(arr1, arr2): try: arr1, arr2 = _parse_yxp(arr1, arr2) except FoundError as ex: return ex.err return np.corrcoef(arr1, arr2)[0, 1]
FUNCTIONS['CORREL'] = wrap_func(xcorrel) FUNCTIONS['COUNT'] = wrap_func(functools.partial( xfunc, func=len, _raise=False, default=None, check=functools.partial(is_number, xl_return=False) )) FUNCTIONS['COUNTA'] = wrap_func(functools.partial( xfunc, check=is_not_empty, func=len, _raise=False, default=None )) FUNCTIONS['COUNTBLANK'] = wrap_func(functools.partial( xfunc, check=lambda x: (x == '' or x is sh.EMPTY), func=len, _raise=False, default=None )) FUNCTIONS['COUNTIF'] = wrap_func(functools.partial( xfilter, len, operating_range=None ))
[docs] def xsort(values, k, large=True): err = get_error(k) if err: return err k = float(_text2num(k)) if isinstance(values, XlError): return values if 1 <= k <= len(values): if large: k = -k else: k -= 1 return values[math.floor(k)] return Error.errors['#NUM!']
def _sort_parser(values, k): if isinstance(values, XlError): raise FoundError(err=values) err = get_error(values) if err: return err, k values = np.array(tuple(flatten( values, lambda v: not isinstance(v, (str, bool)) )), float) values.sort() return values, replace_empty(k) FUNCTIONS['LARGE'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda *a: a, return_func=value_return ) FUNCTIONS['SMALL'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda values, k: (values, k, False), return_func=value_return ) FUNCTIONS['MAX'] = wrap_func(xfunc) FUNCTIONS['MAXA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty )) FUNCTIONS['MEDIAN'] = wrap_func(functools.partial( xfunc, func=lambda x: convert_nan(np.median(x) if x else np.nan), default=None )) FUNCTIONS['MIN'] = wrap_func(functools.partial(xfunc, func=min)) FUNCTIONS['MINA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=min )) def _forecast_known_filter(known_y, known_x): for v in zip(known_y, known_x): if not any(isinstance(i, (str, bool)) for i in v): yield v
[docs] def xslope(yp, xp): try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: return ex.err return b
FUNCTIONS['SLOPE'] = wrap_func(xslope) def _parse_yxp(yp, xp): yp, xp = tuple(flatten(yp, check=None)), tuple(flatten(xp, check=None)) if (sh.EMPTY,) == yp or (sh.EMPTY,) == xp: raise FoundError(err=Error.errors['#VALUE!']) if len(yp) != len(xp): raise FoundError(err=Error.errors['#N/A']) raise_errors(*zip(yp, xp)) yxp = tuple(_forecast_known_filter(yp, xp)) if len(yxp) <= 1: raise FoundError(err=Error.errors['#DIV/0!']) return tuple(zip(*yxp)) def _slope_coeff(yp, xp): ym, xm = yp.mean(), xp.mean() dx = xp - xm b = (dx ** 2).sum() if not b: raise FoundError(err=Error.errors['#DIV/0!']) b = (dx * (yp - ym)).sum() / b a = ym - xm * b return a, b def _args_parser_forecast(x, yp, xp): x = replace_empty(x) try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: return x, ex.err return x, a, b
[docs] def xforecast(x, a=None, b=None): return a + b * x
FUNCTIONS['_XLFN.FORECAST.LINEAR'] = FUNCTIONS['FORECAST'] = wrap_ufunc( xforecast, args_parser=_args_parser_forecast, excluded={1, 2}, input_parser=lambda x, a, b: (_convert_args(x), a, b), return_func=value_return ) FUNCTIONS['FORECAST.LINEAR'] = FUNCTIONS['FORECAST']
[docs] def xstdev(args, ddof=1, func=np.std): if len(args) <= ddof: return Error.errors['#DIV/0!'] return func(args, ddof=ddof)
FUNCTIONS['_XLFN.STDEV.S'] = FUNCTIONS['STDEV.S'] = wrap_func(functools.partial( xfunc, func=xstdev )) FUNCTIONS['_XLFN.STDEV.P'] = FUNCTIONS['STDEV.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0), default=None )) FUNCTIONS['STDEVA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=xstdev )) FUNCTIONS['STDEVPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0 ), default=None )) FUNCTIONS['_XLFN.VAR.S'] = FUNCTIONS['VAR.S'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, func=np.var) )) FUNCTIONS['_XLFN.VAR.P'] = FUNCTIONS['VAR.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0, func=np.var), default=None )) FUNCTIONS['VARA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, func=np.var ) )) FUNCTIONS['VARPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0, func=np.var ), default=None ))