#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2020 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It provides AstBuilder class.
"""
import functools
import collections
import schedula as sh
from .errors import FormulaError, RangeValueError
from .tokens.operator import Operator
from .tokens.function import Function
from .tokens.operand import Operand
from .functions import wrap_ranges_func
from .ranges import Ranges
from schedula.utils.alg import get_unused_node_id
@functools.lru_cache(None)
def _default_filter():
return wrap_ranges_func(sh.bypass),
[docs]class AstBuilder:
[docs] def __init__(self, dsp=None, nodes=None, match=None):
self._deque = collections.deque()
self.match = match
self.dsp = dsp or sh.Dispatcher(
raises=lambda e: not isinstance(e, (NotImplementedError, RangeValueError))
)
self.nodes = nodes or {}
self.missing_operands = set()
def __len__(self):
return len(self._deque)
def __getitem__(self, index):
return self._deque[index]
[docs] def pop(self):
return self._deque.pop()
[docs] def append(self, token):
if isinstance(token, (Operator, Function)):
try:
tokens = [self.pop() for _ in range(token.get_n_args)][::-1]
except IndexError:
raise FormulaError()
token.update_input_tokens(*tokens)
inputs = [self.get_node_id(i) for i in tokens]
token.set_expr(*tokens)
out, dmap, get_id = token.node_id, self.dsp.dmap, get_unused_node_id
if out not in self.dsp.nodes:
func = token.compile()
kw = dict(
function_id=get_id(dmap, token.name),
function=func,
inputs=inputs or None,
outputs=[out],
)
if isinstance(func, dict):
_inputs = func.get('extra_inputs', {})
for k, v in _inputs.items():
if v is not sh.NONE:
self.dsp.add_data(k, v)
kw['inputs'] = (list(_inputs) + inputs) or None
kw.update(func)
self.dsp.add_function(**kw)
else:
self.nodes[token] = n_id = get_id(dmap, out, 'c%d>{}')
self.dsp.add_function(None, sh.bypass, [out], [n_id])
elif isinstance(token, Operand):
self.missing_operands.add(token)
self._deque.append(token)
[docs] def get_node_id(self, token):
if token in self.nodes:
return self.nodes[token]
if isinstance(token, Operand):
self.missing_operands.remove(token)
token.set_expr()
kw = {}
if not token.attr.get('is_reference', False):
kw['default_value'] = token.compile()
node_id = self.dsp.add_data(data_id=token.node_id, **kw)
else:
node_id = token.node_id
self.nodes[token] = node_id
return node_id
[docs] def finish(self):
for token in list(self.missing_operands):
self.get_node_id(token)
[docs] def compile(self, references=None, **inputs):
dsp, inp = self.dsp, inputs.copy()
for k in set(dsp.data_nodes).intersection(references or {}):
ref = references[k]
if ref is not None:
inp[k] = ref if isinstance(ref, Ranges) else Ranges().push(ref)
res, o = dsp(inp), self.get_node_id(self[-1])
dsp = dsp.get_sub_dsp_from_workflow(
[o], graph=dsp.dmap, reverse=True, blockers=res,
wildcard=False
)
dsp.nodes.update({k: v.copy() for k, v in dsp.nodes.items()})
i = collections.OrderedDict()
for k in sorted(dsp.data_nodes):
if not dsp.dmap.pred[k]:
if k in res:
v = res[k]
if k not in inputs and isinstance(v, Ranges) and v.ranges:
i[k] = v
else:
dsp.add_data(data_id=k, default_value=v)
else:
try:
i[k] = Ranges().push(k)
except ValueError:
i[k] = None
dsp.raises = True
dsp.nodes[o]['filters'] = _default_filter()
return sh.DispatchPipe(dsp, '=%s' % o, i, [o], wildcard=False)