#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
It provides Excel model class.
"""
import numpy as np
import os.path as osp
import schedula as sh
from .ranges import Ranges
from .cell import Cell, RangesAssembler
from .tokens.operand import range2parts, XlError
from .functions import flatten
BOOK = sh.Token('Book')
SHEETS = sh.Token('Sheets')
CIRCULAR = sh.Token('CIRCULAR')
ERR_CIRCULAR = XlError('0')
def _get_name(name, names):
if name not in names:
name = name.upper()
for n in names:
if n.upper() == name:
return n
return name
[docs]class ExcelModel:
compile_class = sh.DispatchPipe
[docs] def __init__(self):
self.dsp = sh.Dispatcher(name='ExcelModel')
self.cells = {}
self.books = {}
[docs] def calculate(self, *args, **kwargs):
return self.dsp.dispatch(*args, **kwargs)
def __getstate__(self):
return {'dsp': self.dsp, 'cells': {}, 'books': {}}
@staticmethod
def _yield_refs(book, context=None):
ctx = context.copy()
for n in book.defined_names.definedName:
if n.value == '#REF!':
continue
ctx['ref'], i = n.name.upper(), n.localSheetId
rng = Ranges().push(n.value, context=context).ranges[0]['name']
sheet_names = book.sheetnames
if i is not None:
sheet_names = sheet_names[i:i + 1]
for sn in sheet_names:
name = range2parts(None, sheet=sn, **ctx)
yield name['name'], rng
[docs] def loads(self, *file_names):
for filename in file_names:
self.load(filename)
return self
[docs] def load(self, filename):
book, context = self.add_book(filename)
self.pushes(*book.worksheets, context=context)
return self
[docs] def pushes(self, *worksheets, context=None):
for ws in worksheets:
self.push(ws, context=context)
return self
[docs] def push(self, worksheet, context):
worksheet, context = self.add_sheet(worksheet, context)
get_in = sh.get_nested_dicts
references = get_in(self.books, context['excel'], 'references')
d = get_in(self.books, context['excel'], SHEETS, context['sheet'])
formula_references = d['formula_references']
formula_ranges = d['formula_ranges']
for row in worksheet.iter_rows():
for c in row:
if hasattr(c, 'value'):
self.add_cell(
c, context, references=references,
formula_references=formula_references,
formula_ranges=formula_ranges
)
return self
[docs] def add_book(self, book, context=None, data_only=False):
context = context or {}
are_in, get_in = sh.are_in_nested_dicts, sh.get_nested_dicts
if 'excel' in context:
context = context.copy()
context['excel'] = context['excel'].upper()
if are_in(self.books, context.get('excel'), BOOK):
book = get_in(self.books, context['excel'], BOOK)
else:
if isinstance(book, str):
context.update({'excel': osp.basename(book).upper(),
'directory': osp.dirname(osp.abspath(book))})
if not are_in(self.books, context['excel'], BOOK):
from openpyxl import load_workbook
book = load_workbook(book, data_only=data_only)
book = get_in(
self.books, context['excel'], BOOK, default=lambda: book
)
if not are_in(self.books, context['excel'], 'references'):
get_in(
self.books, context['excel'], 'references',
default=lambda: dict(self._yield_refs(book, context=context))
)
if not are_in(self.books, context['excel'], 'external_links'):
external_links = {str(l.file_link.idx_base + 1): l.file_link.Target
for l in book._external_links}
get_in(
self.books, context['excel'], 'external_links',
default=lambda: external_links
)
return book, context
[docs] def add_sheet(self, worksheet, context):
get_in = sh.get_nested_dicts
if isinstance(worksheet, str):
book = get_in(self.books, context['excel'], BOOK)
worksheet = book[_get_name(worksheet, book.sheetnames)]
ctx = {'sheet': worksheet.title.upper()}
ctx.update(context)
d = get_in(self.books, ctx['excel'], SHEETS, ctx['sheet'])
if 'formula_references' not in d:
d['formula_references'] = formula_references = {
k: v['ref'] for k, v in worksheet.formula_attributes.items()
if v.get('t') == 'array' and 'ref' in v
}
else:
formula_references = d['formula_references']
if 'formula_ranges' not in d:
d['formula_ranges'] = {
Ranges().push(ref, context=ctx)
for ref in formula_references.values()
}
return worksheet, ctx
[docs] def add_cell(self, cell, context, references=None, formula_references=None,
formula_ranges=None, external_links=None):
get_in = sh.get_nested_dicts
if formula_references is None:
formula_references = get_in(
self.books, context['excel'], SHEETS, context['sheet'],
'formula_references'
)
if formula_ranges is None:
formula_ranges = get_in(
self.books, context['excel'], SHEETS, context['sheet'],
'formula_ranges'
)
if references is None:
references = get_in(self.books, context['excel'], 'references')
if external_links is None:
external_links = get_in(
self.books, context['excel'], 'external_links'
)
ctx = {'external_links': external_links}
ctx.update(context)
crd = cell.coordinate
crd = formula_references.get(crd, crd)
cell = Cell(crd, cell.value, context=ctx).compile()
if cell.output in self.cells:
return
if cell.value is not sh.EMPTY:
if any(not (cell.range - rng).ranges for rng in formula_ranges):
return
cell.update_inputs(references=references)
if cell.add(self.dsp, context=ctx):
self.cells[cell.output] = cell
return cell
[docs] def complete(self):
nodes = self.dsp.nodes
stack = list(sorted(set(self.dsp.data_nodes) - set(self.cells)))
while stack:
n_id = stack.pop()
if isinstance(n_id, sh.Token):
continue
rng = Ranges().push(n_id).ranges[0]
book = osp.abspath(
osp.join(nodes[n_id].get('directory', '.'), rng['excel'])
)
context = self.add_book(book)[1]
worksheet, context = self.add_sheet(rng['sheet'], context)
rng = '{c1}{r1}:{c2}{r2}'.format(**rng)
for c in flatten(worksheet[rng], None):
if hasattr(c, 'value'):
cell = self.add_cell(c, context)
if cell:
stack.extend(cell.inputs or ())
[docs] def finish(self, complete=True, circular=False):
if complete:
self.complete()
for n_id in sorted(set(self.dsp.data_nodes) - set(self.cells)):
if isinstance(n_id, sh.Token):
continue
ra = RangesAssembler(n_id)
for k, c in sorted(self.cells.items()):
ra.push(c)
if not ra.missing.ranges:
break
self.dsp.add_function(None, ra, ra.inputs or None, [ra.output])
if circular:
self.solve_circular()
return self
[docs] def write(self, books=None, solution=None):
books = {} if books is None else books
solution = self.dsp.solution if solution is None else solution
are_in, get_in = sh.are_in_nested_dicts, sh.get_nested_dicts
for k, r in solution.items():
if isinstance(k, sh.Token):
continue
rng = r.ranges[0]
filename, sheet_name = _get_name(rng['excel'], books), rng['sheet']
if not are_in(books, filename, BOOK):
from openpyxl import Workbook
book = get_in(books, filename, BOOK, default=Workbook)
for ws in book.worksheets:
book.remove(ws)
else:
book = books[filename][BOOK]
sheet_names = book.sheetnames
sheet_name = _get_name(sheet_name, sheet_names)
if sheet_name not in sheet_names:
book.create_sheet(sheet_name)
sheet = book[sheet_name]
ref = '{c1}{r1}:{c2}{r2}'.format(**rng)
for c, v in zip(flatten(sheet[ref], None), flatten(r.value, None)):
if hasattr(c, 'value'):
if v is sh.EMPTY:
v = None
if isinstance(v, np.generic):
v = v.item()
elif isinstance(v, XlError):
v = str(v)
c.value = v
return books
[docs] def compile(self, inputs, outputs):
dsp = self.dsp.shrink_dsp(outputs=outputs)
dsp.default_values = sh.selector(
set(dsp.default_values) - set(inputs), dsp.default_values
)
res = dsp()
dsp = dsp.get_sub_dsp_from_workflow(
outputs, graph=dsp.dmap, reverse=True, blockers=res,
wildcard=False
)
keys = set(dsp.data_nodes) - set(dsp.default_values)
for k, v in sh.selector(keys, res, allow_miss=True).items():
dsp.set_default_value(k, v.value)
func = self.compile_class(
dsp=dsp,
function_id=self.dsp.name,
inputs=inputs,
outputs=outputs
)
return func
[docs] def solve_circular(self):
import networkx as nx
mod, dsp = {}, self.dsp
f_nodes, d_nodes, dmap = dsp.function_nodes, dsp.data_nodes, dsp.dmap
for cycle in sorted(map(set, nx.simple_cycles(dmap))):
for k in sorted(cycle.intersection(f_nodes)):
if _check_cycles(dmap, k, f_nodes, cycle, mod):
break
else:
dist = sh.inf(len(cycle) + 1, 0)
for k in sorted(cycle.intersection(d_nodes)):
dsp.set_default_value(k, ERR_CIRCULAR, dist)
if mod: # Update dsp.
dsp.add_data(CIRCULAR, ERR_CIRCULAR)
for k, v in mod.items():
d = f_nodes[k]
d['inputs'] = [CIRCULAR if i in v else i for i in d['inputs']]
dmap.remove_edges_from(((i, k) for i in v))
dmap.add_edge(CIRCULAR, k)
return self
def _check_cycles(dmap, node_id, nodes, cycle, mod=None):
node, mod = nodes[node_id], {} if mod is None else mod
_map = dict(zip(node['function'].inputs, node['inputs']))
pred, res = dmap.predecessors, ()
check = lambda j: isinstance(nodes[j]['function'], RangesAssembler)
if not any(any(map(check, pred(k))) for k in _map.values() if k in cycle):
cycle = [i for i, j in _map.items() if j in cycle]
try:
res = tuple(map(_map.get, node['function'].check_cycles(cycle)))
res and sh.get_nested_dicts(mod, node_id, default=set).update(res)
except AttributeError:
pass
return res