Source code for formulas.functions.stat

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2026 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
Python equivalents of statistical Excel functions.
"""
import math
import functools
import numpy as np
import schedula as sh
from collections import Counter
from . import (
    raise_errors, flatten, wrap_func, Error, is_number, _text2num, xfilter,
    XlError, wrap_ufunc, replace_empty, get_error, is_not_empty, _convert_args,
    convert_nan, FoundError, xfilters, Array
)
from statistics import NormalDist
from scipy import stats, linalg
from scipy.interpolate import interp1d

FUNCTIONS = {}


def _convert(v):
    if isinstance(v, str):
        return 0
    if isinstance(v, bool):
        return int(v)
    if not isinstance(v, (float, int)):
        return float(v)
    return v


[docs] def xfunc(*args, func=max, check=is_number, convert=None, default=0, _raise=True, parse_args=None): _raise and raise_errors(args) if parse_args: args = parse_args(args) it = flatten(map(_convert_args, args), check=check) default = [] if default is None else [default] return func(list(map(convert, it) if convert else it) or default)
def _xaverage(v): if v: return np.mean(v) return Error.errors['#DIV/0!'] def _xavedev(v): if v: mu = np.mean(v) return np.mean(np.abs(np.asarray(v) - mu)) return Error.errors['#NUM!'] def _xdevsq(v): if v: mu = np.mean(v) return np.sum((np.asarray(v) - mu) ** 2) return Error.errors['#NUM!'] def _xgeomean(v): v = np.asarray(v) if not v.size or (v <= 0.0).any(): return Error.errors['#NUM!'] # log-mean -> exp for numerical stability return np.exp(np.mean(np.log(v))) def _xharmean(v): v = np.asarray(v) if not v.size or (v <= 0.0).any(): return Error.errors['#NUM!'] return v.size / np.sum(1.0 / v) def _xkurt(v): n = len(v) if n < 4 or np.isclose(np.var(v, ddof=1), 0): return Error.errors['#DIV/0!'] # SciPy: fisher=True -> excess kurtosis; bias=False -> unbiased (Excel) return float(stats.kurtosis(v, fisher=True, bias=False, nan_policy='omit')) def _xmode_mult(v): cnt = Counter(v) maxc = max(cnt.values()) if maxc < 2: return Error.errors['#N/A'] return [k for k, c in cnt.items() if c == maxc] def _xmode_sngl(v): cnt = Counter(v) maxc = max(cnt.values()) if maxc < 2: return Error.errors['#N/A'] # choose smallest among values with count == maxc for k, c in cnt.items(): if c == maxc: return k def _xskew(bias, v): n = len(v) if n < 3 or np.isclose(np.var(v, ddof=1), 0.0): return Error.errors['#DIV/0!'] return float(stats.skew( v, bias=bias, nan_policy='omit' )) xskewp = functools.partial( xfunc, func=functools.partial(_xskew, True), default=None ) FUNCTIONS['_XLFN.SKEW.P'] = FUNCTIONS['SKEW.P'] = wrap_func(xskewp) xskew = functools.partial( xfunc, func=functools.partial(_xskew, False), default=None ) FUNCTIONS['_XLFN.SKEW'] = FUNCTIONS['SKEW'] = wrap_func(xskew) xmode_sngl = functools.partial( xfunc, func=_xmode_sngl, convert=_convert, parse_args=lambda a: (v for v in a if v is not "") ) FUNCTIONS['_XLFN.MODE.SNGL'] = FUNCTIONS['MODE.SNGL'] = wrap_func(xmode_sngl) xmode_mult = functools.partial( xfunc, func=_xmode_mult, convert=_convert, parse_args=lambda a: (v for v in a if v is not "") ) FUNCTIONS['_XLFN.MODE.MULT'] = FUNCTIONS['MODE.MULT'] = wrap_func(xmode_mult) xkurt = functools.partial(xfunc, func=_xkurt, default=None) FUNCTIONS['KURT'] = wrap_func(xkurt) xharmean = functools.partial(xfunc, func=_xharmean, default=None) FUNCTIONS['HARMEAN'] = wrap_func(xharmean) xgeomean = functools.partial(xfunc, func=_xgeomean, default=None) FUNCTIONS['GEOMEAN'] = wrap_func(xgeomean) xdevsq = functools.partial(xfunc, func=_xdevsq, default=None) FUNCTIONS['DEVSQ'] = wrap_func(xdevsq) xavedev = functools.partial(xfunc, func=_xavedev, default=None) FUNCTIONS['AVEDEV'] = wrap_func(xavedev) xaverage = functools.partial(xfunc, func=_xaverage, default=None) FUNCTIONS['AVERAGE'] = wrap_func(xaverage) FUNCTIONS['AVERAGEA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=_xaverage, default=None )) FUNCTIONS['AVERAGEIF'] = wrap_func(functools.partial(xfilter, xaverage)) FUNCTIONS['AVERAGEIFS'] = wrap_func(functools.partial(xfilters, xaverage))
[docs] def xcorrel(arr1, arr2): try: arr1, arr2 = _parse_yxp(arr1, arr2) except FoundError as ex: return ex.err return np.corrcoef(arr1, arr2)[0, 1]
FUNCTIONS['CORREL'] = wrap_func(xcorrel) FUNCTIONS['COUNT'] = wrap_func(functools.partial( xfunc, func=len, _raise=False, default=None, check=functools.partial(is_number, xl_return=False) )) FUNCTIONS['COUNTA'] = wrap_func(functools.partial( xfunc, check=is_not_empty, func=len, _raise=False, default=None )) FUNCTIONS['COUNTBLANK'] = wrap_func(functools.partial( xfunc, check=lambda x: (x == '' or x is sh.EMPTY), func=len, _raise=False, default=None )) FUNCTIONS['COUNTIF'] = wrap_func(functools.partial( xfilter, len, operating_range=None )) FUNCTIONS['COUNTIFS'] = wrap_func(functools.partial( xfilters, len, None ))
[docs] def xsort(values, k, large=True): err = get_error(k) if err: return err k = float(_text2num(k)) if isinstance(values, XlError): return values if 1 <= k <= len(values): if large: k = -k else: k -= 1 return values[math.floor(k)] return Error.errors['#NUM!']
def _sort_parser(values, k): if isinstance(values, XlError): raise FoundError(err=values) err = get_error(values) if err: return err, k values = np.array(tuple(flatten( values, lambda v: not isinstance(v, (str, bool)) )), float) values.sort() return values, replace_empty(k) FUNCTIONS['LARGE'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda *a: a ) FUNCTIONS['SMALL'] = wrap_ufunc( xsort, args_parser=_sort_parser, excluded={0}, check_error=lambda *a: None, input_parser=lambda values, k: (values, k, False) ) FUNCTIONS['MAX'] = wrap_func(xfunc) FUNCTIONS['MAXA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty )) FUNCTIONS['_XLFN.MAXIFS'] = FUNCTIONS['MAXIFS'] = wrap_func( functools.partial(xfilters, xfunc) ) FUNCTIONS['MEDIAN'] = wrap_func(functools.partial( xfunc, func=lambda x: convert_nan(np.median(x) if x else np.nan), default=None )) FUNCTIONS['MIN'] = wrap_func(functools.partial(xfunc, func=min)) FUNCTIONS['MINA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=min )) FUNCTIONS['_XLFN.MINIFS'] = FUNCTIONS['MINIFS'] = wrap_func(functools.partial( xfilters, functools.partial(xfunc, func=min) )) def _forecast_known_filter(known_y, known_x): for v in zip(known_y, known_x): if not any(isinstance(i, (str, bool)) for i in v): yield v
[docs] def xslope(yp, xp): try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: if ex.err is Error.errors['#NULL!']: return Error.errors['#NUM!'] return ex.err return b
[docs] def xintercept(yp, xp): try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: if ex.err is Error.errors['#NULL!']: return Error.errors['#NUM!'] return ex.err return a
FUNCTIONS['SLOPE'] = wrap_func(xslope) FUNCTIONS['INTERCEPT'] = wrap_func(xintercept) def _parse_yxp(yp, xp): yp, xp = tuple(flatten(yp, check=None)), tuple(flatten(xp, check=None)) if (sh.EMPTY,) == yp or (sh.EMPTY,) == xp: raise FoundError(err=Error.errors['#VALUE!']) if len(yp) != len(xp): raise FoundError(err=Error.errors['#N/A']) raise_errors(*zip(yp, xp)) yxp = tuple(_forecast_known_filter(yp, xp)) if len(yxp) <= 1: raise FoundError(err=Error.errors['#DIV/0!']) return tuple(zip(*yxp)) def _slope_coeff(yp, xp): ym, xm = yp.mean(), xp.mean() dx = xp - xm b = (dx ** 2).sum() if not b: raise FoundError(err=Error.errors['#DIV/0!']) b = (dx * (yp - ym)).sum() / b a = ym - xm * b return a, b def _args_parser_forecast(x, yp, xp): x = replace_empty(x) try: a, b = _slope_coeff(*map(np.array, _parse_yxp(yp, xp))) except FoundError as ex: return x, ex.err return x, a, b def _prepare_ets_data(values, timeline, data_completion, aggregation): if data_completion not in (0, 1): raise FoundError(err=Error.errors['#NUM!']) # 1) parse values yp, xp = _parse_yxp(values, timeline) yp = np.asarray(yp, float) xp = np.asarray(xp, float) # 2) sort by timeline idx = np.argsort(xp) xp, yp = xp[idx], yp[idx] # 3) resample time x = np.unique(xp) diffs = np.diff(x) med = np.median(diffs) rounded = np.round(diffs / med, 6) * med uniq, counts = np.unique(rounded, return_counts=True) step = uniq[np.argmax(counts)] x0 = [[]] for i, diff in enumerate(rounded): if diff == step: if x0[-1]: x0[-1][0] += 1 else: x0[-1] = [1, x[i]] elif x0[-1]: x0.append([]) x0 = float(max(x0, key=lambda x: x[0] if x else 0)[1]) i = np.where(xp == x0)[0][0] x0 = x0 - step * int(math.floor((x0 - xp[0]) / step)) n_steps = int(math.floor((xp[-1] - x0) / step)) + 1 x = x0 + step * np.arange(n_steps) y = np.full_like(x, np.nan, dtype=float) idx = np.round((xp - x[0]) / step).astype(int) if y.size < 2: raise FoundError(err=Error.errors['#NUM!']) # 4) aggregate duplicates aggr = { 0: np.mean, 1: np.sum, 2: FUNCTIONS['COUNT'], 3: FUNCTIONS['COUNTA'], 4: np.min, 5: np.max, 6: np.median }[int(aggregation)] for i in np.unique(idx): y[i] = aggr(yp[idx == i]) # 4) data completion (fill gaps) b = np.isnan(y) if b.any(): if data_completion: y[b] = interp1d( x[~b], y[~b], kind='linear', fill_value="extrapolate" )(x[b]) else: y = np.nan_to_num(y, False) return x, y, step def _train_ets_model( values, timeline, seasonality, data_completion, aggregation): x, y, step = _prepare_ets_data( values, timeline, data_completion, aggregation ) import pandas as pd from statsmodels.tsa.exponential_smoothing.ets import ETSModel # 5) seasonality seas = int(seasonality) if seas == 1: # auto seas = 0 ys = y - y.mean() denom = np.dot(ys, ys) if denom > 0.0: best_p, best_r = 0, 0.0 for p in range(2, int(min(8760, math.ceil(ys.size / 2.5)))): r = np.dot(ys[p:], ys[:-p]) if r > best_r: best_r, best_p = r, p if best_r / denom > 0.5: seas = best_p elif seas < 0 or seas > 8760: raise FoundError(err=Error.errors['#NUM!']) kw = {} if seas >= 2: kw['seasonal'] = 'add' kw['seasonal_periods'] = seas if y.size < 2 * seas: _y = y[:seas] kw['initialization_method'] = 'known' kw['initial_level'] = np.mean(_y) kw['initial_trend'] = (_y[-1] - _y[0]) / _y.size kw['initial_seasonal'] = _y - kw['initial_level'] model = ETSModel(pd.Series(y), trend='add', **kw).fit( warn_convergence=False ) return model, x, y, step
[docs] def xforecast_ets( target_date, values, timeline, seasonality=1, data_completion=1, aggregation=0): model, x, y, step = _train_ets_model( values, timeline, seasonality, data_completion, aggregation ) if target_date <= x[0] - step: return Error.errors['#NUM!'] h = int(math.floor((target_date - x[0]) / 1)) + 1 h = h or 1 x = x[0] + step * np.arange(h + 1) pred = model.get_prediction(start=0, end=h) return float(interp1d( x, pred.predicted_mean, kind='linear', fill_value="extrapolate" )(target_date))
[docs] def xforecast_ets_confint( target_date, values, timeline, confidence_level=.95, seasonality=1, data_completion=1, aggregation=0): model, x, y, step = _train_ets_model( values, timeline, seasonality, data_completion, aggregation ) if target_date <= x[0] - step: return Error.errors['#NUM!'] h = (int(math.floor((target_date - x[0]) / 1)) + 1) or 1 x = x[0] + step * np.arange(h + 1) pred = model.get_prediction(start=0, end=h) var = float(interp1d( x, pred.forecast_variance, kind='linear', fill_value="extrapolate" )(target_date)) return stats.norm.ppf((1 + float(confidence_level)) / 2) * np.sqrt(var)
[docs] def xforecast_ets_stat( values, timeline, statistic_type, seasonality=1, data_completion=1, aggregation=0 ): model, x, y, step = _train_ets_model( values, timeline, seasonality, data_completion, aggregation ) statistic_type = int(statistic_type) if statistic_type == 1: return model.alpha if statistic_type == 2: return model.beta if statistic_type == 3: return model.gamma if statistic_type == 8: return step y_true = model.model.endog y_fitted = model.fittedvalues if statistic_type == 4: num = np.mean(np.abs(y_true - y_fitted)) if np.isclose(num, 0, atol=1.e-7): return 0 m = model.model.seasonal_periods or 1 denom = np.mean(np.abs(y_true[m:] - y_true[:-m])) return num / denom if statistic_type == 5: denom = np.abs(y_true) + np.abs(y_fitted) num = 2.0 * np.abs(y_true - y_fitted) return np.mean(np.where(np.isclose(num, 0), 0, num / denom)) if statistic_type == 6: return np.mean(np.abs(y_true - y_fitted)) if statistic_type == 7: return np.sqrt(np.mean((y_true - y_fitted) ** 2)) return Error.errors['#NUM!']
[docs] def xforecast_ets_seasonality( values, timeline, data_completion=1, aggregation=0 ): model = _train_ets_model( values, timeline, 1, data_completion, aggregation )[0] seas = model.model.seasonal_periods or 1 return 0 if seas == 1 else seas
[docs] def xforecast(x, a=None, b=None): return a + b * x
FUNCTIONS['_XLFN.FORECAST.LINEAR'] = FUNCTIONS['FORECAST'] = wrap_ufunc( xforecast, args_parser=_args_parser_forecast, excluded={1, 2}, input_parser=lambda x, a, b: (_convert_args(x), a, b) ) FUNCTIONS['FORECAST.LINEAR'] = FUNCTIONS['FORECAST'] FUNCTIONS['_XLFN.FORECAST.ETS.STAT'] = FUNCTIONS[ 'FORECAST.ETS.STAT' ] = wrap_ufunc( xforecast_ets_stat, excluded={0, 1}, check_error=lambda values, timeline, statistic_type, seasonality=1, data_completion=1, aggregation=0: get_error( statistic_type, seasonality, data_completion, aggregation ), args_parser=lambda values, timeline, statistic_type, seasonality=1, data_completion=1, aggregation=0: ( replace_empty(values, ""), replace_empty(timeline, ""), replace_empty(statistic_type), replace_empty(seasonality), replace_empty(data_completion, 1), replace_empty(aggregation) ), input_parser=lambda values, timeline, statistic_type, seasonality=1, data_completion=1, aggregation=0: ( values, timeline, _convert_args(statistic_type), _convert_args(seasonality), _convert_args(data_completion), _convert_args(aggregation) ) ) FUNCTIONS['_XLFN.FORECAST.ETS.SEASONALITY'] = FUNCTIONS[ 'FORECAST.ETS.SEASONALITY' ] = wrap_ufunc( xforecast_ets_seasonality, excluded={0, 1}, check_error=lambda values, timeline, data_completion=1, aggregation=0: get_error( data_completion, aggregation ), args_parser=lambda values, timeline, data_completion=1, aggregation=0: ( replace_empty(values, ""), replace_empty(timeline, ""), replace_empty(data_completion, 1), replace_empty(aggregation) ), input_parser=lambda values, timeline, data_completion=1, aggregation=0: ( values, timeline, _convert_args(data_completion), _convert_args(aggregation) ) ) FUNCTIONS['_XLFN.FORECAST.ETS.CONFINT'] = FUNCTIONS[ 'FORECAST.ETS.CONFINT' ] = wrap_ufunc( xforecast_ets_confint, excluded={1, 2}, check_error=lambda target_date, values, timeline, confidence_level=.95, seasonality=1, data_completion=1, aggregation=0: get_error( target_date, confidence_level, seasonality, data_completion, aggregation ), args_parser=lambda target_date, values, timeline, confidence_level=.95, seasonality=1, data_completion=1, aggregation=0: ( replace_empty(target_date), replace_empty(values, ""), replace_empty(timeline, ""), replace_empty(confidence_level), replace_empty(seasonality), replace_empty(data_completion, 1), replace_empty(aggregation) ), input_parser=lambda target_date, values, timeline, confidence_level=.95, seasonality=1, data_completion=1, aggregation=0: ( _convert_args(target_date), values, timeline, _convert_args(confidence_level), _convert_args(seasonality), _convert_args(data_completion), _convert_args(aggregation) ) ) FUNCTIONS['_XLFN.FORECAST.ETS'] = FUNCTIONS['FORECAST.ETS'] = wrap_ufunc( xforecast_ets, excluded={1, 2}, check_error=lambda target_date, values, timeline, seasonality=1, data_completion=1, aggregation=0: get_error( target_date, seasonality, data_completion, aggregation ), args_parser=lambda target_date, values, timeline, seasonality=1, data_completion=1, aggregation=0: ( replace_empty(target_date), replace_empty(values, ""), replace_empty(timeline, ""), replace_empty(seasonality), replace_empty(data_completion, 1), replace_empty(aggregation) ), input_parser=lambda target_date, values, timeline, seasonality=1, data_completion=1, aggregation=0: ( _convert_args(target_date), values, timeline, _convert_args(seasonality), _convert_args(data_completion), _convert_args(aggregation) ) ) def _parse_cumulative(cumulative): if isinstance(cumulative, str): if cumulative.lower() in ('true', 'false'): cumulative = cumulative.lower() == 'true' else: raise FoundError(err=Error.errors['#VALUE!']) return cumulative
[docs] def xnormdist(z, mu, sigma, cumulative=True): if sigma <= 0: return Error.errors['#NUM!'] norm = NormalDist(mu=mu, sigma=sigma) return norm.cdf(z) if cumulative else norm.pdf(z)
[docs] def xnorminv(z, mu=0, sigma=1): if z <= 0.0 or z >= 1.0 or sigma <= 0: return Error.errors['#NUM!'] norm = NormalDist(mu=mu, sigma=sigma) return norm.inv_cdf(z)
FUNCTIONS['_XLFN.NORM.DIST'] = FUNCTIONS['NORM.DIST'] = wrap_ufunc( xnormdist, input_parser=lambda z, mu, sigma, cumulative=True: tuple( map(_convert_args, (z, mu, sigma)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.NORM.INV'] = FUNCTIONS['NORM.INV'] = wrap_ufunc( xnorminv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.NORM.S.DIST'] = FUNCTIONS['NORM.S.DIST'] = wrap_ufunc( xnormdist, input_parser=lambda x, a=1: (_convert_args(x), 0, 1, _parse_cumulative(a)) ) FUNCTIONS['_XLFN.NORM.S.INV'] = FUNCTIONS['NORM.S.INV'] = wrap_ufunc( xnorminv, input_parser=lambda x: (_convert_args(x),) )
[docs] def xweibulldist(x, alfa, beta, cumulative=True): if alfa <= 0 or beta <= 0 or x < 0: return Error.errors['#NUM!'] rv = stats.weibull_min(c=alfa, loc=0.0, scale=beta) return float(rv.cdf(x) if cumulative else rv.pdf(x))
FUNCTIONS['_XLFN.WEIBULL.DIST'] = FUNCTIONS['WEIBULL.DIST'] = wrap_ufunc( xweibulldist, input_parser=lambda x, alfa, beta, cumulative=True: tuple( map(_convert_args, (x, alfa, beta)) ) + (_parse_cumulative(cumulative),) )
[docs] def xlognormdist(z, mu, sigma, cumulative=True): if z <= 0 or sigma <= 0: return Error.errors['#NUM!'] func = stats.lognorm.cdf if cumulative else stats.lognorm.pdf return func(z, s=sigma, scale=np.exp(mu))
[docs] def xlognorminv(z, mu=0, sigma=1): if z <= 0.0 or z > 1.0 or sigma <= 0: return Error.errors['#NUM!'] return stats.lognorm.ppf(z, s=sigma, scale=np.exp(mu))
FUNCTIONS['_XLFN.LOGNORM.DIST'] = FUNCTIONS['LOGNORM.DIST'] = wrap_ufunc( xlognormdist, input_parser=lambda z, mu, sigma, cumulative=True: tuple( map(_convert_args, (z, mu, sigma)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.LOGNORM.INV'] = FUNCTIONS['LOGNORM.INV'] = wrap_ufunc( xlognorminv, input_parser=lambda *a: tuple(map(_convert_args, a)) )
[docs] def xbetadist(x, _alpha, _beta, cumulative=True, lb=0, ub=1): if x < lb or x > ub or lb >= ub or _alpha <= 0 or _beta <= 0: return Error.errors['#NUM!'] func = stats.beta.cdf if cumulative else stats.beta.pdf return func(x, _alpha, _beta, loc=lb, scale=(ub - lb))
[docs] def xbetainv(x, _alpha, _beta, lb=0, ub=1): if x <= 0.0 or x >= 1.0 or _alpha <= 0 or _beta <= 0 or lb >= ub: return Error.errors['#NUM!'] return stats.beta.ppf(x, _alpha, _beta, loc=lb, scale=(ub - lb))
FUNCTIONS['_XLFN.BETA.DIST'] = FUNCTIONS['BETA.DIST'] = wrap_ufunc( xbetadist, input_parser=lambda *a: tuple( map(_convert_args, a[:3]) ) + tuple(map(_parse_cumulative, a[3:4])) + tuple(map(_convert_args, a[4:])) ) FUNCTIONS['_XLFN.BETA.INV'] = FUNCTIONS['BETA.INV'] = wrap_ufunc( xbetainv, input_parser=lambda *a: tuple(map(_convert_args, a)) )
[docs] def xnegbinomdist(number_f, number_s, probability_s, cumulative=True): number_f = int(number_f) number_s = int(number_s) if not (0 <= probability_s <= 1 and number_f >= 0 and number_s >= 1): return Error.errors['#NUM!'] func = stats.nbinom.cdf if cumulative else stats.nbinom.pmf return func(number_f, number_s, probability_s)
[docs] def xbinomdist(number_s, trials, probability_s, cumulative=True): trials = int(trials) number_s = int(number_s) if not (0 <= probability_s <= 1 and 0 <= number_s <= trials): return Error.errors['#NUM!'] func = stats.binom.cdf if cumulative else stats.binom.pmf return func(number_s, trials, probability_s)
[docs] def xbinomdistrange(trials, probability_s, number_s, number_s2=None): number_s2 = number_s if number_s2 is None else number_s2 trials = int(trials) number_s = int(number_s) number_s2 = int(number_s2) if not (0 <= probability_s <= 1 and ( 0 <= number_s <= trials and 0 <= number_s2 <= trials )): return Error.errors['#NUM!'] func = stats.binom.cdf return func(number_s2, trials, probability_s) - func( number_s - 1, trials, probability_s )
[docs] def xbinominv(trials, probability_s, alpha): trials = int(trials) if not (0 <= probability_s <= 1 and 0 < alpha < 1 and trials >= 0): return Error.errors['#NUM!'] return stats.binom.ppf(alpha, trials, probability_s)
FUNCTIONS['_XLFN.NEGBINOM.DIST'] = FUNCTIONS['NEGBINOM.DIST'] = wrap_ufunc( xnegbinomdist, input_parser=lambda number_s, trials, probability_s, cumulative=True: tuple( map(_convert_args, (number_s, trials, probability_s)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.BINOM.DIST'] = FUNCTIONS['BINOM.DIST'] = wrap_ufunc( xbinomdist, input_parser=lambda number_s, trials, probability_s, cumulative=True: tuple( map(_convert_args, (number_s, trials, probability_s)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['BINOM.DIST.RANGE'] = wrap_ufunc( xbinomdistrange, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.BINOM.DIST.RANGE'] = FUNCTIONS['BINOM.DIST.RANGE'] FUNCTIONS['_XLFN.BINOM.INV'] = FUNCTIONS['BINOM.INV'] = wrap_ufunc( xbinominv, input_parser=lambda *a: tuple(map(_convert_args, a)) )
[docs] def xchisqdist(x, deg_freedom, cumulative=True): deg_freedom = int(deg_freedom) if not (x >= 0 and 1 <= deg_freedom <= 1e10): return Error.errors['#NUM!'] func = stats.chi2.cdf if cumulative else stats.chi2.pdf return func(x, deg_freedom)
[docs] def xchisqdistrt(x, deg_freedom): deg_freedom = int(deg_freedom) if not (x >= 0 and 1 <= deg_freedom <= 1e10): return Error.errors['#NUM!'] return stats.chi2.sf(x, deg_freedom)
[docs] def xchisqinv(probability, deg_freedom): deg_freedom = int(deg_freedom) if not (0 <= probability <= 1 and 1 <= deg_freedom <= 1e10): return Error.errors['#NUM!'] return stats.chi2.ppf(probability, deg_freedom)
[docs] def xchisqinvrt(probability, deg_freedom): deg_freedom = int(deg_freedom) if not (0 <= probability <= 1 and 1 <= deg_freedom <= 1e10): return Error.errors['#NUM!'] return stats.chi2.isf(probability, deg_freedom)
def _parse_ranges(arr1, arr2, error_x_row=False, raise_diff_len=True): arr1 = tuple(flatten(arr1, None)) arr2 = tuple(flatten(arr2, None)) if raise_diff_len and len(arr1) != len(arr2): raise FoundError(err=Error.errors['#N/A']) if not error_x_row: err = get_error(arr1, arr2) if err: if err is Error.errors['#NULL!']: err = Error.errors['#NUM!'] raise FoundError(err=err) _arr1 = [] _arr2 = [] if raise_diff_len: for a, e in zip(arr1, arr2): a = a.item() if isinstance(a, np.ndarray) else a e = e.item() if isinstance(e, np.ndarray) else e if error_x_row: err = get_error(a, e) if err: if err is Error.errors['#NULL!']: err = Error.errors['#NUM!'] raise FoundError(err=err) if isinstance(a, bool) or isinstance(e, bool): continue if isinstance(a, (int, float)) and isinstance(e, (int, float)): _arr1.append(a) _arr2.append(e) else: for arr, _arr in ((arr1, _arr1), (arr2, _arr2)): for v in arr: v = v.item() if isinstance(v, np.ndarray) else v if isinstance(v, bool): continue if isinstance(v, (int, float)): _arr.append(v) return _arr1, _arr2
[docs] def xchisqtest(actual_range, expected_range): actual_range = np.atleast_2d(actual_range) r, c = actual_range.shape if r > 1 and c > 1: ddof = (r - 1) * (c - 1) elif r == 1 and c > 1: ddof = c - 1 elif r > 1 and c == 1: ddof = r - 1 else: return Error.errors['#N/A'] _actual_range, _expected_range = _parse_ranges(actual_range, expected_range) if not _actual_range: return Error.errors['#DIV/0!'] test = stats.chisquare(_actual_range, _expected_range, sum_check=False) return stats.chi2.sf(test.statistic, ddof)
FUNCTIONS['_XLFN.CHISQ.DIST'] = FUNCTIONS['CHISQ.DIST'] = wrap_ufunc( xchisqdist, input_parser=lambda x, deg_freedom, cumulative=True: tuple( map(_convert_args, (x, deg_freedom)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.CHISQ.INV'] = FUNCTIONS['CHISQ.INV'] = wrap_ufunc( xchisqinv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.CHISQ.DIST.RT'] = FUNCTIONS['CHISQ.DIST.RT'] = wrap_ufunc( xchisqdistrt, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.CHISQ.INV.RT'] = FUNCTIONS['CHISQ.INV.RT'] = wrap_ufunc( xchisqinvrt, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.CHISQ.TEST'] = FUNCTIONS['CHISQ.TEST'] = wrap_func( xchisqtest )
[docs] def xconfidence_norm(alpha, standard_dev, size): """ CONFIDENCE.NORM(alpha, standard_dev, size) - alpha in (0,1) - standard_dev > 0 - size > 0 (integer) Returns the margin of error for a population mean when σ is known. """ n = int(size) if not (0 < alpha < 1) or standard_dev <= 0 or n <= 0: return Error.errors['#NUM!'] z = NormalDist().inv_cdf(1 - alpha / 2.0) return z * standard_dev / math.sqrt(n)
[docs] def xconfidence_t(alpha, standard_dev, size): """ CONFIDENCE.T(alpha, standard_dev, size) Returns the margin of error: t_{1-α/2, n-1} * sd / sqrt(n) """ n = int(size) if not (0 < alpha < 1) or standard_dev <= 0 or n < 1: return Error.errors['#NUM!'] if n == 1: return Error.errors['#DIV/0!'] tcrit = float(stats.t.ppf(1.0 - alpha / 2.0, n - 1)) return tcrit * standard_dev / math.sqrt(n)
FUNCTIONS['_XLFN.CONFIDENCE.NORM'] = FUNCTIONS['CONFIDENCE.NORM'] = wrap_ufunc( xconfidence_norm, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.CONFIDENCE.T'] = FUNCTIONS['CONFIDENCE.T'] = wrap_ufunc( xconfidence_t, input_parser=lambda *a: tuple(map(_convert_args, a)) ) def _cov_core(arr1, arr2, sample: bool): """ Excel-like covariance: - sample=False -> COVARIANCE.P (divide by N) - sample=True -> COVARIANCE.S (divide by N-1) Uses _parse_yxp to (a) align lengths, (b) drop pairs with text/bools, and (c) raise Excel-style errors. """ y, x = _parse_ranges(arr1, arr2, True) y = np.asarray(y, dtype=float) x = np.asarray(x, dtype=float) n = y.size if sample: if n <= 1: return Error.errors['#DIV/0!'] denom = n - 1 else: if n == 0: return Error.errors['#DIV/0!'] denom = n ybar = float(y.mean()) xbar = float(x.mean()) cov = float(np.dot(y - ybar, x - xbar) / denom) return cov
[docs] def xcovariance_p(arr1, arr2): return _cov_core(arr1, arr2, sample=False)
[docs] def xcovariance_s(arr1, arr2): return _cov_core(arr1, arr2, sample=True)
FUNCTIONS['_XLFN.COVARIANCE.P'] = FUNCTIONS['COVARIANCE.P'] = wrap_func( xcovariance_p ) FUNCTIONS['_XLFN.COVARIANCE.S'] = FUNCTIONS['COVARIANCE.S'] = wrap_func( xcovariance_s )
[docs] def xfdist(x, deg_freedom1, deg_freedom2, cumulative=True): deg_freedom1 = int(deg_freedom1) deg_freedom2 = int(deg_freedom2) if not (x >= 0 and deg_freedom1 >= 1 and deg_freedom2 >= 1): return Error.errors['#NUM!'] func = stats.f.cdf if cumulative else stats.f.pdf return func(x, deg_freedom1, deg_freedom2)
[docs] def xfdistrt(x, deg_freedom1, deg_freedom2): deg_freedom1 = int(deg_freedom1) deg_freedom2 = int(deg_freedom2) if not (x >= 0 and deg_freedom1 >= 1 and deg_freedom2 >= 1): return Error.errors['#NUM!'] return stats.f.sf(x, deg_freedom1, deg_freedom2)
[docs] def xfinv(probability, deg_freedom1, deg_freedom2): deg_freedom1 = int(deg_freedom1) deg_freedom2 = int(deg_freedom2) if not (0 <= probability <= 1 and deg_freedom1 >= 1 and deg_freedom2 >= 1): return Error.errors['#NUM!'] return stats.f.ppf(probability, deg_freedom1, deg_freedom2)
[docs] def xfinvrt(probability, deg_freedom1, deg_freedom2): deg_freedom1 = int(deg_freedom1) deg_freedom2 = int(deg_freedom2) if not (0 <= probability <= 1 and deg_freedom1 >= 1 and deg_freedom2 >= 1): return Error.errors['#NUM!'] return stats.f.isf(probability, deg_freedom1, deg_freedom2)
[docs] def xftest(array1, array2): _array1, _array2 = _parse_ranges(array1, array2, raise_diff_len=False) _array1, _array2 = np.asarray(_array1, float), np.asarray(_array2, float) if _array1.size < 2 or _array2.size < 2: return Error.errors['#DIV/0!'] va = float(np.var(_array1, ddof=1)) vb = float(np.var(_array2, ddof=1)) if np.isclose(va, 0) or np.isclose(vb, 0): return Error.errors['#DIV/0!'] # Put larger variance on top so F >= 1 if va >= vb: F = va / vb d1 = _array1.size - 1 d2 = _array2.size - 1 else: F = vb / va d1 = _array2.size - 1 d2 = _array1.size - 1 # One-tail (right) then two-tail p_right = float(stats.f.sf(F, d1, d2)) p_two = min(1.0, 2.0 * p_right) return p_two
FUNCTIONS['_XLFN.F.DIST'] = FUNCTIONS['F.DIST'] = wrap_ufunc( xfdist, input_parser=lambda x, deg_freedom1, deg_freedom2, cumulative=True: tuple( map(_convert_args, (x, deg_freedom1, deg_freedom2)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.F.INV'] = FUNCTIONS['F.INV'] = wrap_ufunc( xfinv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.F.DIST.RT'] = FUNCTIONS['F.DIST.RT'] = wrap_ufunc( xfdistrt, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.F.INV.RT'] = FUNCTIONS['F.INV.RT'] = wrap_ufunc( xfinvrt, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.F.TEST'] = FUNCTIONS['F.TEST'] = wrap_func(xftest)
[docs] def xt_dist(x, deg_freedom, cumulative=True): deg_freedom = int(deg_freedom) if deg_freedom <= 0: if not cumulative and deg_freedom == 0: return Error.errors['#DIV/0!'] return Error.errors['#NUM!'] func = stats.t.cdf if cumulative else stats.t.pdf return func(x, deg_freedom)
[docs] def xt_distrt(x, deg_freedom): deg_freedom = int(deg_freedom) if not (1 <= deg_freedom): return Error.errors['#NUM!'] return stats.t.sf(x, deg_freedom)
[docs] def xt_dist2t(x, deg_freedom): deg_freedom = int(deg_freedom) if not (x >= 0 and 1 <= deg_freedom): return Error.errors['#NUM!'] p_right = stats.t.sf(x, deg_freedom) return min(1.0, 2.0 * p_right)
[docs] def xt_inv(probability, deg_freedom): deg_freedom = int(deg_freedom) if not (0 <= probability <= 1 and 1 <= deg_freedom): return Error.errors['#NUM!'] return stats.t.ppf(probability, deg_freedom)
[docs] def xt_inv2t(probability, deg_freedom): deg_freedom = int(deg_freedom) if not (0 <= probability <= 1 and 1 <= deg_freedom): return Error.errors['#NUM!'] return stats.t.ppf(1.0 - probability / 2., deg_freedom)
[docs] def xt_test(array1, array2, tails, ttype): if tails not in (1, 2) or ttype not in (1, 2, 3): return Error.errors['#NUM!'] a, b = _parse_ranges( array1, array2, raise_diff_len=ttype == 1, error_x_row=ttype == 1 ) a = np.asarray(a, float) b = np.asarray(b, float) # paired if ttype == 1: if a.size < 2 or b.size < 2: return Error.errors['#DIV/0!'] d = a - b n = d.size mean = float(d.mean()) sd = float(d.std(ddof=1)) if np.isclose(sd, 0): return Error.errors['#DIV/0!'] t = mean / (sd / math.sqrt(n)) df = n - 1 # equal variance (pooled) elif ttype == 2: if a.size < 2 or b.size < 2: return Error.errors['#DIV/0!'] na, nb = a.size, b.size ma, mb = float(a.mean()), float(b.mean()) va, vb = float(a.var(ddof=1)), float(b.var(ddof=1)) if np.isclose(va, 0) and np.isclose(vb, 0): return Error.errors['#DIV/0!'] sp = math.sqrt(((na - 1) * va + (nb - 1) * vb) / (na + nb - 2)) if np.isclose(sp, 0): return Error.errors['#DIV/0!'] t = (ma - mb) / (sp * math.sqrt(1.0 / na + 1.0 / nb)) df = na + nb - 2 # unequal variance (Welch) else: # ttype == 3 if a.size < 2 or b.size < 2: return Error.errors['#DIV/0!'] na, nb = a.size, b.size ma, mb = float(a.mean()), float(b.mean()) va, vb = float(a.var(ddof=1)), float(b.var(ddof=1)) denom = va / na + vb / nb if np.isclose(denom, 0): return Error.errors['#DIV/0!'] t = (ma - mb) / math.sqrt(denom) # Welch–Satterthwaite df df = (denom ** 2) / ((va * va) / ((na * na) * (na - 1)) + (vb * vb) / ( (nb * nb) * (nb - 1))) if not math.isfinite(df) or df <= 0: return Error.errors['#DIV/0!'] # p-values from |t| p_one = stats.t.sf(abs(t), df) p_two = min(1.0, 2.0 * p_one) return p_one if tails == 1 else p_two
FUNCTIONS['_XLFN.T.DIST'] = FUNCTIONS['T.DIST'] = wrap_ufunc( xt_dist, input_parser=lambda x, deg_freedom, cumulative=True: tuple( map(_convert_args, (x, deg_freedom)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.T.INV'] = FUNCTIONS['T.INV'] = wrap_ufunc( xt_inv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.T.DIST.2T'] = FUNCTIONS['T.DIST.2T'] = wrap_ufunc( xt_dist2t, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.T.DIST.RT'] = FUNCTIONS['T.DIST.RT'] = wrap_ufunc( xt_distrt, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.T.INV.2T'] = FUNCTIONS['T.INV.2T'] = wrap_ufunc( xt_inv2t, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.T.TEST'] = FUNCTIONS['T.TEST'] = wrap_ufunc( xt_test, input_parser=lambda a1, a2, *a: (a1, a2) + tuple(map(_convert_args, a)), check_error=lambda *a: None, args_parser=lambda *a: a, excluded={0, 1} )
[docs] def xexpon_dist(x, rate, cumulative): """ EXPON.DIST(x, lambda, cumulative) - cumulative TRUE -> 1 - exp(-lambda * x) - cumulative FALSE -> lambda * exp(-lambda * x) - Excel errors: * x < 0 or lambda <= 0 -> #NUM! * cumulative not TRUE/FALSE -> #VALUE! """ if x < 0 or rate <= 0: return Error.errors['#NUM!'] if cumulative: return 1.0 - np.exp(-rate * x) else: return rate * np.exp(-rate * x)
FUNCTIONS['_XLFN.EXPON.DIST'] = FUNCTIONS['EXPON.DIST'] = wrap_ufunc( xexpon_dist, input_parser=lambda *a: tuple( map(_convert_args, a[:-1]) ) + tuple(map(_parse_cumulative, a[-1:])) )
[docs] def xpoisson_dist(x, mean, cumulative): if x < 0 or mean < 0: return Error.errors['#NUM!'] func = stats.poisson.cdf if cumulative else stats.poisson.pmf return func(x, mean)
FUNCTIONS['_XLFN.POISSON.DIST'] = FUNCTIONS['POISSON.DIST'] = wrap_ufunc( xpoisson_dist, input_parser=lambda *a: tuple( map(_convert_args, a[:-1]) ) + tuple(map(_parse_cumulative, a[-1:])) )
[docs] def xfisher(number): """ FISHER(number) -> 0.5 * ln((1+x)/(1-x)) Excel domain: -1 < x < 1 (else #NUM!) """ x = number if not (-1.0 < x < 1.0): return Error.errors['#NUM!'] return 0.5 * np.log((1.0 + x) / (1.0 - x))
[docs] def xfisherinv(y): """ FISHERINV(y) -> inverse Fisher transform = tanh(y) - Domain: any real y (returns value in (-1, 1)). """ return np.tanh(y)
FUNCTIONS['FISHER'] = wrap_ufunc( xfisher, input_parser=lambda x: (_convert_args(x),) ) FUNCTIONS['FISHERINV'] = wrap_ufunc( xfisherinv, input_parser=lambda y: (_convert_args(y),) )
[docs] def xphi(y): return NormalDist().pdf(y)
FUNCTIONS['_XLFN.PHI'] = FUNCTIONS['PHI'] = wrap_ufunc( xphi, input_parser=lambda x: (_convert_args(x),) )
[docs] def xgamma(number): """ GAMMA(number) Excel domain: - allowed: any real except non-positive integers - error: number in { …, -2, -1, 0 } -> #NUM! """ if number <= 0 and float(number).is_integer(): return Error.errors['#NUM!'] try: return float(math.gamma(number)) except (ValueError, OverflowError): return Error.errors['#NUM!']
[docs] def xgamma_dist(x, alpha, beta, cumulative=True): if x < 0 or alpha <= 0 or beta <= 0: return Error.errors['#NUM!'] rv = stats.gamma(alpha, loc=0.0, scale=beta) return rv.cdf(x) if cumulative else rv.pdf(x)
[docs] def xgamma_inv(probability, alpha, beta): """ GAMMA.INV(probability, alpha, beta) - 0 < probability < 1 - alpha > 0, beta > 0 """ if not (0.0 <= probability <= 1.0) or alpha <= 0 or beta <= 0: return Error.errors['#NUM!'] return stats.gamma.ppf(probability, alpha, loc=0.0, scale=beta)
[docs] def xgammaln(x): """ GAMMALN(x) and GAMMALN.PRECISE(x) Excel domain: x > 0 (else #NUM!) """ if x <= 0: return Error.errors['#NUM!'] return math.lgamma(x)
FUNCTIONS['_XLFN.GAMMA'] = FUNCTIONS['GAMMA'] = wrap_ufunc( xgamma, input_parser=lambda x: (_convert_args(x),) ) FUNCTIONS['_XLFN.GAMMA.DIST'] = FUNCTIONS['GAMMA.DIST'] = wrap_ufunc( xgamma_dist, input_parser=lambda x, alpha, beta, cumulative=True: tuple( map(_convert_args, (x, alpha, beta)) ) + (_parse_cumulative(cumulative),) ) FUNCTIONS['_XLFN.GAMMA.INV'] = FUNCTIONS['GAMMA.INV'] = wrap_ufunc( xgamma_inv, input_parser=lambda *a: tuple(map(_convert_args, a)) ) FUNCTIONS['_XLFN.GAMMALN.PRECISE'] = FUNCTIONS['GAMMALN.PRECISE'] = wrap_ufunc( xgammaln, input_parser=lambda x: (_convert_args(x),) ) FUNCTIONS['_XLFN.GAMMALN'] = FUNCTIONS['GAMMALN'] = FUNCTIONS['GAMMALN.PRECISE']
[docs] def xgauss(x): """ GAUSS(z) = Φ(z) - 0.5 (Φ is the CDF of N(0,1); result in (-0.5, 0.5)) """ return NormalDist().cdf(x) - 0.5
FUNCTIONS['_XLFN.GAUSS'] = FUNCTIONS['GAUSS'] = wrap_ufunc( xgauss, input_parser=lambda x: (_convert_args(x),) )
[docs] def xhypergeom_dist( sample_s, number_sample, population_s, number_pop, cumulative): """ EXPON.DIST(x, lambda, cumulative) - cumulative TRUE -> 1 - exp(-lambda * x) - cumulative FALSE -> lambda * exp(-lambda * x) - Excel errors: * x < 0 or lambda <= 0 -> #NUM! * cumulative not TRUE/FALSE -> #VALUE! """ k = int(sample_s) n = int(number_sample) K = int(population_s) or 1 M = int(number_pop) or 1 # Excel-like domain checks if M < 0 or n < 0 or K < 0 or K > M or n > M or k < 0: return Error.errors['#NUM!'] rv = stats.hypergeom(M, K, n) return rv.cdf(k) if cumulative else rv.pmf(k)
FUNCTIONS['_XLFN.HYPGEOM.DIST'] = FUNCTIONS['HYPGEOM.DIST'] = wrap_ufunc( xhypergeom_dist, input_parser=lambda *a: tuple( map(_convert_args, a[:-1]) ) + tuple(map(_parse_cumulative, a[-1:])) )
[docs] def xpearson(array1, array2): y, x = _parse_ranges(array1, array2, error_x_row=True) y = np.asarray(y, dtype=float) x = np.asarray(x, dtype=float) if y.size < 2: return Error.errors['#DIV/0!'] dy = y - y.mean() dx = x - x.mean() syy = float(np.dot(dy, dy)) sxx = float(np.dot(dx, dx)) if np.isclose(sxx, 0) or np.isclose(syy, 0): return Error.errors['#DIV/0!'] return float(np.dot(dx, dy) / math.sqrt(sxx * syy))
FUNCTIONS['_XLFN.PEARSON'] = FUNCTIONS['PEARSON'] = wrap_func(xpearson)
[docs] def xpercentrank(exc, vals, x, significance=3): """ PERCENTRANK.EXC(array, x, [significance]) - Returns rank of x in array as a percentage in (0,1) (excludes endpoints). - If x is outside [min(array), max(array)] -> #N/A - Rounds to 'significance' decimal places (default 3). """ if not vals: return Error.errors['#N/A'] n = len(vals) # outside range -> #N/A (exclusive) if x < vals[0] or x > vals[-1]: return Error.errors['#N/A'] # default/validate significance (decimal places to round) sig = int(significance) if sig < 1: return Error.errors['#NUM!'] # Find position (0-based) lo = int(np.searchsorted(vals, x, side='left')) hi = int(np.searchsorted(vals, x, side='right')) if lo == hi: # xx strictly between vals[lo-1] and vals[lo] -> interpolate y = vals[lo - 1] z = vals[lo] # y < xx < z guaranteed; z - y > 0 frac = (x - y) / (z - y) r = (lo - 1) + frac else: # xx equals a tie block -> average the tied ranks (0-based) r = 0.5 * (lo + (hi - 1)) if exc: pct = (r + 1) / (n + 1) else: pct = r / (n - 1) return math.trunc(pct * 10 ** sig) / 10 ** sig
_percentrank_kw = { 'excluded': {0}, 'input_parser': lambda v, q, s=3: (v, _convert_args(q), _convert_args(s)), 'check_error': lambda *a: get_error(*a[::-1]), 'args_parser': lambda v, q, s=3: (sorted(_parse_ranges( v, [], error_x_row=False, raise_diff_len=False )[0]), replace_empty(q), replace_empty(s)) } FUNCTIONS['_XLFN.PERCENTRANK.EXC'] = FUNCTIONS['PERCENTRANK.EXC'] = wrap_ufunc( functools.partial(xpercentrank, True), **_percentrank_kw ) FUNCTIONS['_XLFN.PERCENTRANK.INC'] = FUNCTIONS['PERCENTRANK.INC'] = wrap_ufunc( functools.partial(xpercentrank, False), **_percentrank_kw ) _percentile_kw = { 'excluded': {0}, 'input_parser': lambda v, q: (v, _convert_args(q)), 'check_error': lambda *a: get_error(*a[::-1]), 'args_parser': lambda v, q: ( list(flatten(v, drop_empty=True)), replace_empty(q) ) }
[docs] def xpercentile(v, p, exclusive=False): if len(v) == 0 or not is_number(p) or p < 0 or p > 1: return Error.errors['#NUM!'] if exclusive: n = len(v) rank = (n + 1) * p if rank < 1 or rank > n: return Error.errors['#NUM!'] return np.percentile(v, p * 100, method=exclusive and 'weibull' or 'linear')
FUNCTIONS['_XLFN.PERCENTILE.EXC'] = FUNCTIONS['PERCENTILE.EXC'] = wrap_ufunc( functools.partial(xpercentile, exclusive=True), **_percentile_kw ) FUNCTIONS['_XLFN.PERCENTILE.INC'] = FUNCTIONS['PERCENTILE.INC'] = wrap_ufunc( xpercentile, **_percentile_kw )
[docs] def xquartile(v, q, exclusive=False): if len(v) == 0: return Error.errors['#NUM!'] if exclusive: n = len(v) rank = (n + 1) * q * 0.25 if q <= 0 or q >= 4 or rank < 1 or rank > n: return Error.errors['#NUM!'] method = 'weibull' else: if q < 0 or q > 4: return Error.errors['#NUM!'] method = 'linear' return np.quantile(v, q * 0.25, method=method)
_quartile_kw = sh.combine_dicts(_percentile_kw, { 'excluded': {0}, 'input_parser': lambda v, q: (v, np.floor(_convert_args(q))), 'check_error': lambda *a: get_error(*a[::-1]), 'args_parser': lambda v, q: ( list(flatten(v, drop_empty=True)), replace_empty(q) ) }) FUNCTIONS['_XLFN.QUARTILE.EXC'] = FUNCTIONS['QUARTILE.EXC'] = wrap_ufunc( functools.partial(xquartile, exclusive=True), **_quartile_kw ) FUNCTIONS['_XLFN.QUARTILE.INC'] = FUNCTIONS['QUARTILE.INC'] = wrap_ufunc( xquartile, **_quartile_kw )
[docs] def xstdev(args, ddof=1, func=np.std): if len(args) <= ddof: return Error.errors['#DIV/0!'] return func(args, ddof=ddof)
FUNCTIONS['_XLFN.STDEV.S'] = FUNCTIONS['STDEV.S'] = wrap_func(functools.partial( xfunc, func=xstdev )) FUNCTIONS['_XLFN.STDEV.P'] = FUNCTIONS['STDEV.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0), default=None )) FUNCTIONS['STDEVA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=xstdev )) FUNCTIONS['STDEVPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0 ), default=None )) FUNCTIONS['_XLFN.VAR.S'] = FUNCTIONS['VAR.S'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, func=np.var) )) FUNCTIONS['_XLFN.VAR.P'] = FUNCTIONS['VAR.P'] = wrap_func(functools.partial( xfunc, func=functools.partial(xstdev, ddof=0, func=np.var), default=None )) FUNCTIONS['VARA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, func=np.var ) )) FUNCTIONS['VARPA'] = wrap_func(functools.partial( xfunc, convert=_convert, check=is_not_empty, func=functools.partial( xstdev, ddof=0, func=np.var ), default=None ))
[docs] def xpermut(number, number_chosen): number = int(number) number_chosen = int(number_chosen) if number_chosen < 0 or number < 0 or number < number_chosen: return Error.errors['#NUM!'] return math.factorial(number) / math.factorial(number - number_chosen)
[docs] def xpermutationa(number, number_chosen): if number_chosen < 0 or number < 0: return Error.errors['#NUM!'] return int(number) ** int(number_chosen)
FUNCTIONS['_XLFN.PERMUT'] = FUNCTIONS['PERMUT'] = wrap_ufunc(xpermut) FUNCTIONS['_XLFN.PERMUTATIONA'] = FUNCTIONS['PERMUTATIONA'] = wrap_ufunc( xpermutationa )
[docs] def xrank(method, number, ref, order=0): arr = np.asarray(ref, float) b = arr == float(number) if arr.size == 0 or not b.any(): return Error.errors['#N/A'] if int(order) == 0: arr = -arr r = stats.rankdata(arr, method=method) # 1-based ranks return float(r[np.where(b)[0][0]])
# ---- Registration ---- FUNCTIONS['_XLFN.RANK.EQ'] = FUNCTIONS['RANK.EQ'] = wrap_ufunc( functools.partial(xrank, "min"), excluded={1}, args_parser=lambda number, ref, order=0: ( replace_empty(number), ref, replace_empty(order) ), input_parser=lambda number, ref, order=0: ( _convert_args(number), sorted(_parse_ranges( ref, [], error_x_row=False, raise_diff_len=False )[0]), _convert_args(order) ) ) FUNCTIONS['_XLFN.RANK.AVG'] = FUNCTIONS['RANK.AVG'] = wrap_ufunc( functools.partial(xrank, "average"), excluded={1}, args_parser=lambda number, ref, order=0: ( replace_empty(number), ref, replace_empty(order) ), input_parser=lambda number, ref, order=0: ( _convert_args(number), _parse_ranges( ref, [], error_x_row=False, raise_diff_len=False )[0], _convert_args(order) ) )
[docs] def xrsq(known_ys, known_xs): """ RSQ(known_y's, known_x's) -> r^2 """ known_ys, known_xs = _parse_ranges(known_ys, known_xs, error_x_row=True) if len(known_ys) <= 1: return Error.errors['#DIV/0!'] return stats.linregress(known_ys, known_xs)[2] ** 2
[docs] def xsteyx(known_ys, known_xs): known_ys, known_xs = _parse_ranges(known_ys, known_xs, error_x_row=True) known_xs = np.asarray(known_xs, float) known_ys = np.asarray(known_ys, float) n = known_ys.size if n <= 2: return Error.errors['#DIV/0!'] x = known_xs - known_xs.mean() y = known_ys - known_ys.mean() see = np.dot(y, y) - np.dot(x, y) ** 2 / np.dot(x, x) return np.sqrt(max(0.0, see / (n - 2)))
FUNCTIONS['RSQ'] = wrap_func(xrsq) FUNCTIONS['STEYX'] = wrap_func(xsteyx)
[docs] def xstandardize(x, mean, standard_dev): if standard_dev <= 0: return Error.errors['#NUM!'] return (x - mean) / standard_dev
FUNCTIONS['STANDARDIZE'] = wrap_ufunc( xstandardize, input_parser=lambda *a: tuple(map(_convert_args, a)) )
[docs] def xtrimmean(array, percent): """ TRIMMEAN(array, percent) - percent in [0,1); trims floor(percent*n/2) from each tail - ignores text/booleans/blanks - errors: * no numeric data -> #NUM! * percent < 0 or >= 1 -> #NUM! * trims away all data (2k >= n) -> #NUM! """ # collect finite numerics only n = array.size k = int(math.floor(percent * n / 2.0)) if not n or not (0.0 <= percent < 1.0) or 2 * k >= n: return Error.errors['#NUM!'] trimmed = array[k:n - k] return float(trimmed.mean())
FUNCTIONS['TRIMMEAN'] = wrap_ufunc( xtrimmean, excluded={0}, args_parser=lambda array, percent: ( np.sort(np.array(_parse_ranges( array, [], error_x_row=False, raise_diff_len=False )[0], dtype=float)), replace_empty(percent) ), input_parser=lambda array, percent: (array, float(_convert_args(percent))) )
[docs] def xz_test(array, x, sigma=None): """ Z.TEST(array, x, [sigma]) -> one-tailed p-value - If sigma is omitted/empty, use sample stdev (ddof=1). - Returns #N/A if no numeric data (or n<2 when sigma omitted). - Returns #DIV/0! if sigma <= 0 (or sample stdev is 0). """ array = np.array(_parse_ranges( array, [], error_x_row=False, raise_diff_len=False )[0], dtype=float) n = array.size if n <= 1: return Error.errors['#DIV/0!'] x = float(x) if sigma is None: sigma = float(np.std(array, ddof=1)) if sigma <= 0.0: return Error.errors['#NUM!'] return 1 - NormalDist().cdf((np.mean(array) - x) / (sigma / np.sqrt(n)))
FUNCTIONS['_XLFN.Z.TEST'] = FUNCTIONS['Z.TEST'] = wrap_ufunc( xz_test, excluded={0}, check_error=lambda array, x, sigma=None: get_error(x) or get_error( sigma ) or get_error(array), args_parser=lambda array, x, sigma=None: ( array, replace_empty(x), replace_empty(sigma, None) ), input_parser=lambda array, x, sigma: ( array, float(_convert_args(x)), _convert_args(sigma) ) )
[docs] def xfrequency(data_array, bins_array): """ FREQUENCY(data_array, bins_array) -> list of counts counts[i] = # of data <= bins_sorted[i] and > previous bin counts[-1] = # of data > last bin """ raise_errors(data_array, bins_array) x = np.array(_parse_ranges( data_array, [], error_x_row=False, raise_diff_len=False )[0], dtype=float) bins = np.array(_parse_ranges( bins_array, [], error_x_row=False, raise_diff_len=False )[0] or [0], dtype=float) bins.sort() res = np.zeros(bins.size + 1, dtype=int) if x.size > 0: bins, i, n = np.unique(bins, return_index=True, return_counts=True) i = np.append(i, res.size - 1) - np.append([0], n - 1) idx = np.searchsorted(bins, x, side='left') idx, counts = np.unique_counts(idx) res[i[idx]] = counts return np.atleast_2d(res).T.view(Array)
FUNCTIONS['FREQUENCY'] = wrap_func(xfrequency)
[docs] def xprob(x_range, prob_range, lower_limit, upper_limit=None): """ PROB(x_range, prob_range, lower_limit, [upper_limit]) Excel-like behavior: - Pairwise: ignore pairs where either x or p is text/boolean. - Errors: * length mismatch (before filtering) -> #N/A (via _parse_yxp) * no valid numeric pairs -> #N/A * any probability < 0 or > 1 -> #NUM! * sum(probabilities) != 1 (±tol) -> #NUM! - If upper_limit omitted -> P(X == lower_limit). - If upper_limit provided -> P(lower_limit <= X <= upper_limit). - If lower_limit > upper_limit -> 0. """ xs, ps = _parse_ranges(x_range, prob_range) x = np.asarray(xs, dtype=float) p = np.asarray(ps, dtype=float) if x.size == 0: return Error.errors['#N/A'] if np.any((p < 0.0) | (p > 1.0)) or not np.isclose(p.sum(), 1.0): return Error.errors['#NUM!'] lo = lower_limit if upper_limit is None: return p[x == lo].sum() hi = float(upper_limit) if lo > hi: return 0.0 return p[(x >= lo) & (x <= hi)].sum()
FUNCTIONS['PROB'] = wrap_ufunc( xprob, excluded={0, 1}, check_nan=False, args_parser=lambda x_range, prob_range, lower_limit, upper_limit=None: ( x_range, prob_range, replace_empty(lower_limit), replace_empty(upper_limit) ), input_parser=lambda x_range, prob_range, lower_limit, upper_limit=None: ( x_range, prob_range, float(_convert_args(lower_limit)), _convert_args(upper_limit) ) ) def _parse_linest(known_y, known_x=None, new_x=None): assert all(isinstance(v, (float, int)) for v in flatten(known_y, None)) y = np.asarray(known_y, float) if known_x is None: y = y.ravel() x = np.arange(1, y.size + 1, dtype=float) else: assert all(isinstance(v, (float, int)) for v in flatten(known_x, None)) x = np.asarray(known_x, float) if x.shape == y.shape: x = np.atleast_2d(x.ravel()) y = np.atleast_2d(y.ravel()) if new_x is None: _new_x = x else: assert all(isinstance(v, (float, int)) for v in flatten(new_x, None)) _new_x = np.asarray(new_x, float) if 1 not in y.shape or not ( (y.shape[0] == 1 and y.shape[1] == x.shape[1]) or (y.shape[1] == 1 and y.shape[0] == x.shape[0]) ) or not ( (y.shape[0] == 1 and x.shape[0] == _new_x.shape[0]) or (y.shape[1] == 1 and x.shape[1] == _new_x.shape[1]) ): raise FoundError(err=Error.errors['#REF!']) elif y.shape[0] == 1: y = y.T x = x.T _new_x = _new_x.T return x, y, _new_x def _xlinest_stats(x, y, p, const, res): yhat = x @ p ybar = np.mean(y) if const else 0 res[4, 0] = ssreg = np.sum((yhat - ybar) ** 2) resid = y - yhat res[4, 1] = sse = (resid.T @ resid).item() sstot = sse + ssreg res[2, 0] = r2 = ssreg / sstot res[3, 1] = df = x.shape[0] - x.shape[1] sigma2 = sse / df res[2, 1] = sey = np.sqrt(sigma2) XtX_inv = np.linalg.inv(x.T @ x) se = np.sqrt(np.maximum(0.0, np.diag(sigma2 * XtX_inv))) res[1, :se.size] = se res[3, 0] = f = ssreg / sse * df / x.shape[1] def _xlinest(x, y, const, _stats): if const: x = np.column_stack([np.ones(y.shape, dtype=float), x]) p = linalg.lstsq(x, y)[0] res = np.empty((5 if _stats else 1, x.shape[1] + int(not const)), object) res[:, :] = Error.errors['#N/A'] if _stats: _xlinest_stats(x, y, p, const, res) if not const: p = np.append([0], p) res[0, :] = p[::-1].ravel() return res def _xlinest_parse(known_y, known_x=None, const=True, _stats=False, new_x=None): const = _convert_args(next(flatten([const], None))) _stats = _convert_args(next(flatten([_stats], None))) if get_error(known_y, known_x, new_x, const, _stats): raise FoundError(err=Error.errors['#VALUE!']) x, y, _new_x = _parse_linest(known_y, known_x, new_x) return x, y, const, _stats, _new_x
[docs] def xlinest(known_y, known_x=None, const=True, _stats=False): x, y, const, _stats = _xlinest_parse( known_y, known_x, const, _stats )[:-1] return _xlinest(x, y, const, _stats).view(Array)
[docs] def xlogest(known_y, known_x=None, const=True, _stats=False): x, y, const, _stats = _xlinest_parse( known_y, known_x, const, _stats )[:-1] res = _xlinest(x, np.log(y), const, _stats) res[0, :] = np.exp(res[0, :].astype(float)) return res.view(Array)
FUNCTIONS['LINEST'] = wrap_func(xlinest) FUNCTIONS['LOGEST'] = wrap_func(xlogest)
[docs] def xtrend(known_y, known_x=None, new_x=None, const=True): x, y, const, _stats, new_x = _xlinest_parse( known_y, known_x, const, False, new_x ) if const: new_x = np.column_stack([np.ones(new_x.shape[0], dtype=float), new_x]) res = new_x @ _xlinest(x, y, const, _stats).T[::-1] return res.view(Array)
[docs] def xgrowth(known_y, known_x=None, new_x=None, const=True): x, y, const, _stats, new_x = _xlinest_parse( known_y, known_x, const, False, new_x ) if const: new_x = np.column_stack([np.ones(new_x.shape[0], dtype=float), new_x]) res = new_x @ _xlinest(x, np.log(y), const, _stats).T[::-1] res = np.exp(res.astype(float)) return res.view(Array)
FUNCTIONS['TREND'] = wrap_func(xtrend) FUNCTIONS['GROWTH'] = wrap_func(xgrowth)