Source code for formulas.builder

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2018 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides AstBuilder class.
"""

import collections
import schedula as sh
from .errors import FormulaError
from .tokens.operator import Operator
from .tokens.function import Function
from .tokens.operand import Operand
from .functions import wrap_ranges_func
from .ranges import Ranges
from schedula.utils.alg import get_unused_node_id


[docs]class AstBuilder:
[docs] def __init__(self, dsp=None, nodes=None, match=None): self._deque = collections.deque() self.match = match self.dsp = dsp or sh.Dispatcher( raises=lambda ex: not isinstance(ex, FormulaError) ) self.nodes = nodes or {} self.missing_operands = set()
def __len__(self): return len(self._deque) def __getitem__(self, index): return self._deque[index]
[docs] def pop(self): return self._deque.pop()
[docs] def append(self, token): if isinstance(token, (Operator, Function)): try: tokens = [self.pop() for _ in range(token.get_n_args)][::-1] except IndexError: raise FormulaError() token.update_input_tokens(*tokens) inputs = [self.get_node_id(i) for i in tokens] token.set_expr(*tokens) out, dmap, get_id = token.node_id, self.dsp.dmap, get_unused_node_id if out not in self.dsp.nodes: func = token.compile() kw = dict( function_id=get_id(dmap, token.name), function=func, inputs=inputs or None, outputs=[out], ) if isinstance(func, dict): _inputs = func.get('extra_inputs', {}) for k, v in _inputs.items(): if v is not sh.NONE: self.dsp.add_data(k, v) kw = sh.combine_dicts( {'inputs': (list(_inputs) + inputs) or None}, func, base=kw ) self.dsp.add_function(**kw) else: self.nodes[token] = n_id = get_id(dmap, out, 'c%d>{}') self.dsp.add_function(None, sh.bypass, [out], [n_id]) elif isinstance(token, Operand): self.missing_operands.add(token) self._deque.append(token)
[docs] def get_node_id(self, token): if token in self.nodes: return self.nodes[token] if isinstance(token, Operand): self.missing_operands.remove(token) token.set_expr() kw = {} if not token.attr.get('is_reference', False): kw['default_value'] = token.compile() node_id = self.dsp.add_data(data_id=token.node_id, **kw) else: node_id = token.node_id self.nodes[token] = node_id return node_id
[docs] def finish(self): for token in list(self.missing_operands): self.get_node_id(token)
[docs] def compile(self, references=None, **inputs): dsp, inp = self.dsp, inputs.copy() for k in set(dsp.data_nodes).intersection(references or {}): inp[k] = Ranges().push(references[k]) res, o = dsp(inp), self.get_node_id(self[-1]) dsp = dsp.get_sub_dsp_from_workflow( [o], graph=dsp.dmap, reverse=True, blockers=res, wildcard=False ) dsp.nodes.update({k: v.copy() for k, v in dsp.nodes.items()}) i = collections.OrderedDict() for k in sorted(dsp.data_nodes): if not dsp.dmap.pred[k]: if k in res: v = res[k] if k not in inputs and isinstance(v, Ranges) and v.ranges: i[k] = v else: dsp.add_data(data_id=k, default_value=v) else: try: i[k] = Ranges().push(k) except ValueError: i[k] = None dsp.raises = True dsp.nodes[o]['filters'] = wrap_ranges_func(sh.bypass), return sh.DispatchPipe(dsp, '=%s' % o, i, [o], wildcard=False)