Source code for formulas.builder

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2023 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides AstBuilder class.
"""

import functools
import collections
import schedula as sh
from .errors import (
    FormulaError, RangeValueError, InvalidRangeError, InvalidRangeName
)
from .tokens.operator import Operator
from .tokens.function import Function
from .tokens.operand import Operand
from .functions import wrap_ranges_func, COMPILING
from .ranges import Ranges
from schedula.utils.utl import get_unused_node_id


@functools.lru_cache(None)
def _default_filter():
    return wrap_ranges_func(sh.bypass),


[docs] class AstBuilder: compile_class = sh.DispatchPipe
[docs] def __init__(self, dsp=None, nodes=None, match=None): self._deque = collections.deque() self.match = match self.dsp = dsp or sh.Dispatcher( raises=lambda e: not isinstance(e, ( NotImplementedError, RangeValueError, InvalidRangeError )) ) self.nodes = nodes or {} self.missing_operands = set()
def __len__(self): return len(self._deque) def __getitem__(self, index): return self._deque[index]
[docs] def pop(self): return self._deque.pop()
[docs] def append(self, token): if isinstance(token, (Operator, Function)): try: tokens = [self.pop() for _ in range(token.get_n_args)][::-1] except IndexError: raise FormulaError() token.update_input_tokens(*tokens) inputs = [self.get_node_id(i) for i in tokens] token.set_expr(*tokens) out, dmap, get_id = token.node_id, self.dsp.dmap, get_unused_node_id if out not in self.dsp.nodes: func = token.compile() kw = { 'function_id': get_id(dmap, token.name), 'function': func, 'inputs': inputs or None, 'outputs': [out] } if isinstance(func, dict): _inputs = func.get('extra_inputs', {}) for k, v in _inputs.items(): if v is not sh.NONE: self.dsp.add_data(k, v) kw['inputs'] = (list(_inputs) + inputs) or None kw.update(func) self.dsp.add_function(**kw) else: self.nodes[token] = n_id = get_id(dmap, out, 'c%d>{}') self.dsp.add_function(None, sh.bypass, [out], [n_id]) elif isinstance(token, Operand): self.missing_operands.add(token) self._deque.append(token)
[docs] def get_node_id(self, token): if token in self.nodes: return self.nodes[token] if isinstance(token, Operand): self.missing_operands.remove(token) token.set_expr() kw = {} if not token.attr.get('is_reference', False): kw['default_value'] = token.compile() node_id = self.dsp.add_data(data_id=token.node_id, **kw) else: node_id = token.node_id self.nodes[token] = node_id return node_id
[docs] def finish(self): for token in list(self.missing_operands): self.get_node_id(token)
[docs] def compile(self, references=None, context=None, **inputs): dsp, inp = self.dsp, inputs.copy() for k, ref in (references or {}).items(): if k in dsp.data_nodes: if isinstance(ref, Ranges): inp[k] = ref elif ref is not None: inp[k] = Ranges().push(ref) inp[COMPILING] = True res, o = dsp(inp), self.get_node_id(self[-1]) dsp = dsp.get_sub_dsp_from_workflow( [o], graph=dsp.dmap, reverse=True, blockers=res, wildcard=False ) res[COMPILING] = False dsp.nodes.update({k: v.copy() for k, v in dsp.nodes.items()}) i = collections.OrderedDict() for k in sorted(dsp.data_nodes): if not dsp.dmap.pred[k]: if k in res: v = res[k] if k not in inputs and isinstance(v, Ranges) and v.ranges: i[k] = v else: dsp.add_data(data_id=k, default_value=v) else: try: i[k] = Ranges().push(k, context=context) except InvalidRangeName: i[k] = None dsp.raises = True dsp.nodes[o]['filters'] = _default_filter() return self.compile_class( dsp, '=%s' % o, i, [o], wildcard=False, shrink=False )