Source code for formulas.functions.stat

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2025 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
Python equivalents of statistical Excel functions.
"""
import math
import functools
import numpy as np
import schedula as sh
from . import (
    raise_errors, flatten, wrap_func, Error, is_number, _text2num, xfilter,
    XlError, wrap_ufunc, replace_empty, get_error, is_not_empty, _convert_args,
    convert_nan, FoundError, xfilters
)
from statistics import NormalDist

FUNCTIONS = {}


def _convert(v):
    if isinstance(v, str):
        return 0
    if isinstance(v, bool):
        return int(v)
    return v


[docs] def xfunc(*args, func=max, check=is_number, convert=None, default=0, _raise=True): _raise and raise_errors(args) it = flatten(map(_convert_args, args), check=check) default = [] if default is None else [default] return func(list(map(convert, it) if convert else it) or default)
def _xaverage(v): if v: return sum(v) / len(v) return Error.errors['#DIV/0!'] xaverage = functools.partial(xfunc, func=_xaverage, default=None) FUNCTIONS['AVERAGE'] = wrap_func(xaverage) FUNCTIONS['AVERAGEA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=_xaverage, default=None )) FUNCTIONS['AVERAGEIF'] = wrap_func(functools.partial(xfilter, xaverage)) FUNCTIONS['AVERAGEIFS'] = wrap_func(functools.partial(xfilters, xaverage))
[docs] def xcorrel(arr1, arr2): try: arr1, arr2 = _parse_yxp(arr1, arr2) except FoundError as ex: return ex.err return np.corrcoef(arr1, arr2)[0, 1]
FUNCTIONS['CORREL'] = wrap_func(xcorrel) FUNCTIONS['COUNT'] = wrap_func(functools.partial( xfunc, func=len, _raise=False, default=None, check=functools.partial(is_number, xl_return=False) )) FUNCTIONS['COUNTA'] = wrap_func(functools.partial( xfunc, check=is_not_empty, func=len, _raise=False, default=None )) FUNCTIONS['COUNTBLANK'] = wrap_func(functools.partial( xfunc, check=lambda x: (x == '' or x is sh.EMPTY), func=len, _raise=False, default=None )) FUNCTIONS['COUNTIF'] = wrap_func(functools.partial( xfilter, len, operating_range=None )) FUNCTIONS['COUNTIFS'] = wrap_func(functools.partial( xfilters, len, None ))
[docs] def xsort(values, k, large=True): err = get_error(k) if err: return err k = float(_text2num(k)) if isinstance(values, XlError): return values if 1 <= k <= len(values): if large: k = -k else: k -= 1 return values[math.floor(k)] return Error.errors['#NUM!']
def _sort_parser(values, k): if isinstance(values, XlError): raise FoundError(err=values) err = get_error(values) if err: return err, k values = np.array(tuple(flatten( values, lambda v: not isinstance(v, (str, bool)) )), float) values.sort() return values, replace_empty(k) FUNCTIONS['LARGE'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda *a: a ) FUNCTIONS['SMALL'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda values, k: (values, k, False) ) FUNCTIONS['MAX'] = wrap_func(xfunc) FUNCTIONS['MAXA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty )) FUNCTIONS['_XLFN.MAXIFS'] = FUNCTIONS['MAXIFS'] = wrap_func( functools.partial(xfilters, xfunc) ) FUNCTIONS['MEDIAN'] = wrap_func(functools.partial( xfunc, func=lambda x: convert_nan(np.median(x) if x else np.nan), default=None )) FUNCTIONS['MIN'] = wrap_func(functools.partial(xfunc, func=min)) FUNCTIONS['MINA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=min )) FUNCTIONS['_XLFN.MINIFS'] = FUNCTIONS['MINIFS'] = wrap_func(functools.partial( xfilters, functools.partial(xfunc, func=min) )) def _forecast_known_filter(known_y, known_x): for v in zip(known_y, known_x): if not any(isinstance(i, (str, bool)) for i in v): yield v
[docs] def xslope(yp, xp): try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: return ex.err return b
FUNCTIONS['SLOPE'] = wrap_func(xslope) def _parse_yxp(yp, xp): yp, xp = tuple(flatten(yp, check=None)), tuple(flatten(xp, check=None)) if (sh.EMPTY,) == yp or (sh.EMPTY,) == xp: raise FoundError(err=Error.errors['#VALUE!']) if len(yp) != len(xp): raise FoundError(err=Error.errors['#N/A']) raise_errors(*zip(yp, xp)) yxp = tuple(_forecast_known_filter(yp, xp)) if len(yxp) <= 1: raise FoundError(err=Error.errors['#DIV/0!']) return tuple(zip(*yxp)) def _slope_coeff(yp, xp): ym, xm = yp.mean(), xp.mean() dx = xp - xm b = (dx ** 2).sum() if not b: raise FoundError(err=Error.errors['#DIV/0!']) b = (dx * (yp - ym)).sum() / b a = ym - xm * b return a, b def _args_parser_forecast(x, yp, xp): x = replace_empty(x) try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: return x, ex.err return x, a, b
[docs] def xforecast(x, a=None, b=None): return a + b * x
FUNCTIONS['_XLFN.FORECAST.LINEAR'] = FUNCTIONS['FORECAST'] = wrap_ufunc( xforecast, args_parser=_args_parser_forecast, excluded={1, 2}, input_parser=lambda x, a, b: (_convert_args(x), a, b) ) FUNCTIONS['FORECAST.LINEAR'] = FUNCTIONS['FORECAST']
[docs] def xnormdist(z, mu, sigma, cumulative=True): if isinstance(cumulative, str): if cumulative.lower() in ('true', 'false'): cumulative = cumulative.lower() == 'true' else: return Error.errors['#VALUE!'] if sigma <= 0: return Error.errors['#NUM!'] norm = NormalDist(mu=mu, sigma=sigma) return norm.cdf(z) if cumulative else norm.pdf(z)
[docs] def xnorminv(z, mu=0, sigma=1): if z <= 0.0 or z >= 1.0 or sigma <= 0: return Error.errors['#NUM!'] norm = NormalDist(mu=mu, sigma=sigma) return norm.inv_cdf(z)
FUNCTIONS['_XLFN.NORM.DIST'] = FUNCTIONS['NORM.DIST'] = wrap_ufunc( xnormdist, input_parser=lambda *a: tuple(map(_convert_args, a[:-1])) + a[-1:] ) FUNCTIONS['_XLFN.NORM.INV'] = FUNCTIONS['NORM.INV'] = wrap_ufunc( xnorminv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.NORM.S.DIST'] = FUNCTIONS['NORM.S.DIST'] = wrap_ufunc( xnormdist, input_parser=lambda x, *a: (_convert_args(x), 0, 1) + a ) FUNCTIONS['_XLFN.NORM.S.INV'] = FUNCTIONS['NORM.S.INV'] = wrap_ufunc( xnorminv, input_parser=lambda x: (_convert_args(x),) ) _percentile_kw = { 'excluded': {0}, 'input_parser': lambda v, q: (v, _convert_args(q)), 'check_error': lambda *a: get_error(*a[::-1]), 'args_parser': lambda v, q: ( list(flatten(v, drop_empty=True)), replace_empty(q) ) }
[docs] def xpercentile(v, p, exclusive=False): if len(v) == 0 or not is_number(p) or p < 0 or p > 1: return Error.errors['#NUM!'] if exclusive: n = len(v) rank = (n + 1) * p if rank < 1 or rank > n: return Error.errors['#NUM!'] return np.percentile(v, p * 100, method=exclusive and 'weibull' or 'linear')
FUNCTIONS['_XLFN.PERCENTILE.EXC'] = FUNCTIONS['PERCENTILE.EXC'] = wrap_ufunc( functools.partial(xpercentile, exclusive=True), **_percentile_kw ) FUNCTIONS['_XLFN.PERCENTILE.INC'] = FUNCTIONS['PERCENTILE.INC'] = wrap_ufunc( xpercentile, **_percentile_kw )
[docs] def xquartile(v, q, exclusive=False): if len(v) == 0: return Error.errors['#NUM!'] if exclusive: n = len(v) rank = (n + 1) * q * 0.25 if q <= 0 or q >= 4 or rank < 1 or rank > n: return Error.errors['#NUM!'] method = 'weibull' else: if q < 0 or q > 4: return Error.errors['#NUM!'] method = 'linear' return np.quantile(v, q * 0.25, method=method)
_quartile_kw = sh.combine_dicts(_percentile_kw, { 'excluded': {0}, 'input_parser': lambda v, q: (v, np.floor(_convert_args(q))), 'check_error': lambda *a: get_error(*a[::-1]), 'args_parser': lambda v, q: ( list(flatten(v, drop_empty=True)), replace_empty(q) ) }) FUNCTIONS['_XLFN.QUARTILE.EXC'] = FUNCTIONS['QUARTILE.EXC'] = wrap_ufunc( functools.partial(xquartile, exclusive=True), **_quartile_kw ) FUNCTIONS['_XLFN.QUARTILE.INC'] = FUNCTIONS['QUARTILE.INC'] = wrap_ufunc( xquartile, **_quartile_kw )
[docs] def xstdev(args, ddof=1, func=np.std): if len(args) <= ddof: return Error.errors['#DIV/0!'] return func(args, ddof=ddof)
FUNCTIONS['_XLFN.STDEV.S'] = FUNCTIONS['STDEV.S'] = wrap_func(functools.partial( xfunc, func=xstdev )) FUNCTIONS['_XLFN.STDEV.P'] = FUNCTIONS['STDEV.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0), default=None )) FUNCTIONS['STDEVA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=xstdev )) FUNCTIONS['STDEVPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0 ), default=None )) FUNCTIONS['_XLFN.VAR.S'] = FUNCTIONS['VAR.S'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, func=np.var) )) FUNCTIONS['_XLFN.VAR.P'] = FUNCTIONS['VAR.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0, func=np.var), default=None )) FUNCTIONS['VARA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, func=np.var ) )) FUNCTIONS['VARPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0, func=np.var ), default=None ))