Source code for formulas.excel

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2016-2020 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl

"""
It provides Excel model class.

Sub-Modules:

.. currentmodule:: formulas.excel

.. autosummary::
    :nosignatures:
    :toctree: excel/

    ~cycle
    ~xlreader
"""
import os
import logging
import functools
import numpy as np
import os.path as osp
import schedula as sh
from ..ranges import Ranges
from ..functions import flatten
from ..cell import Cell, RangesAssembler, Ref
from ..tokens.operand import XlError

log = logging.getLogger(__name__)
BOOK = sh.Token('Book')
SHEETS = sh.Token('Sheets')
CIRCULAR = sh.Token('CIRCULAR')


[docs]class XlCircular(XlError): def __str__(self): return '0'
ERR_CIRCULAR = XlCircular('#CIRC!') def _get_name(name, names): if name not in names: name = name.upper() for n in names: if n.upper() == name: return n return name
[docs]class ExcelModel: compile_class = sh.DispatchPipe
[docs] def __init__(self): self.dsp = sh.Dispatcher(name='ExcelModel') self.cells = {} self.books = {}
[docs] def calculate(self, *args, **kwargs): return self.dsp.dispatch(*args, **kwargs)
def __getstate__(self): return {'dsp': self.dsp, 'cells': {}, 'books': {}}
[docs] def add_references(self, book, context=None): refs, nodes = {}, set() for n in book.defined_names.definedName: ref = Ref(n.name.upper(), '=%s' % n.value, context).compile() nodes.update(ref.add(self.dsp, context=context)) refs[ref.output] = None dsp = self.dsp.get_sub_dsp(nodes) dsp.raises = '' sol = dsp(dict.fromkeys(set(dsp.data_nodes) - set(refs), sh.EMPTY)) refs.update({ k: v for k, v in sol.items() if k in refs and isinstance(v, Ranges) }) return refs
[docs] def loads(self, *file_names): for filename in file_names: self.load(filename) return self
[docs] def load(self, filename): book, context = self.add_book(filename) self.pushes(*book.worksheets, context=context) return self
[docs] def pushes(self, *worksheets, context=None): for ws in worksheets: self.push(ws, context=context) return self
[docs] def push(self, worksheet, context): worksheet, context = self.add_sheet(worksheet, context) references = self.references formula_references = self.formula_references(context) formula_ranges = self.formula_ranges(context) external_links = self.external_links(context) for row in worksheet.iter_rows(): for c in row: if hasattr(c, 'value'): self.add_cell( c, context, references=references, formula_references=formula_references, formula_ranges=formula_ranges, external_links=external_links ) return self
[docs] def add_book(self, book=None, context=None, data_only=False): context = (context or {}).copy() are_in, get_in = sh.are_in_nested_dicts, sh.get_nested_dicts if isinstance(book, str): context['excel'] = book if 'directory' not in context: context['directory'] = osp.dirname(osp.abspath(context['excel'])) context['excel'] = osp.basename(context['excel']) fpath = osp.join(context['directory'], context['excel']) context['excel'] = context['excel'].upper() data = get_in(self.books, context['excel']) book = data.get(BOOK) if not book: from .xlreader import load_workbook data[BOOK] = book = load_workbook(fpath, data_only=data_only) if 'external_links' not in data: data['external_links'] = { str(el.file_link.idx_base + 1): el.file_link.Target for el in book._external_links } if 'references' not in data: data['references'] = self.add_references(book, context=context) return book, context
[docs] def add_sheet(self, worksheet, context): get_in = sh.get_nested_dicts if isinstance(worksheet, str): book = get_in(self.books, context['excel'], BOOK) worksheet = book[_get_name(worksheet, book.sheetnames)] ctx = {'sheet': worksheet.title.upper()} ctx.update(context) d = get_in(self.books, ctx['excel'], SHEETS, ctx['sheet']) if 'formula_references' not in d: d['formula_references'] = formula_references = { k: v['ref'] for k, v in worksheet.formula_attributes.items() if v.get('t') == 'array' and 'ref' in v } else: formula_references = d['formula_references'] if 'formula_ranges' not in d: d['formula_ranges'] = { Ranges().push(ref, context=ctx) for ref in formula_references.values() } return worksheet, ctx
@property def references(self): return sh.combine_dicts(*( d.get('references', {}) for d in self.books.values() ))
[docs] def formula_references(self, ctx): return sh.get_nested_dicts( self.books, ctx['excel'], SHEETS, ctx['sheet'], 'formula_references' )
[docs] def formula_ranges(self, ctx): return sh.get_nested_dicts( self.books, ctx['excel'], SHEETS, ctx['sheet'], 'formula_ranges' )
[docs] def add_cell(self, cell, context, references=None, formula_references=None, formula_ranges=None, external_links=None): if formula_references is None: formula_references = self.formula_references(context) if formula_ranges is None: formula_ranges = self.formula_ranges(context) if references is None: references = self.references if external_links is None: external_links = self.external_links(context) ctx = {'external_links': external_links} ctx.update(context) crd = cell.coordinate crd = formula_references.get(crd, crd) val = cell.value val = cell.data_type == 'f' and val[:2] == '==' and val[1:] or val cell = Cell(crd, val, context=ctx).compile(references=references) if cell.output in self.cells: return if cell.value is not sh.EMPTY: if any(not (cell.range - rng).ranges for rng in formula_ranges): return if cell.add(self.dsp, context=ctx): self.cells[cell.output] = cell return cell
[docs] def complete(self): nodes, data_nodes = self.dsp.nodes, self.dsp.data_nodes stack = sorted(set(data_nodes) - set(self.cells) - set(self.references)) while stack: n_id = stack.pop() if isinstance(n_id, sh.Token): continue rng = Ranges().push(n_id).ranges[0] book = osp.abspath( osp.join(nodes[n_id].get('directory', '.'), rng['excel']) ) context = self.add_book(book)[1] worksheet, context = self.add_sheet(rng['sheet'], context) rng = '{c1}{r1}:{c2}{r2}'.format(**rng) for c in flatten(worksheet[rng], None): if hasattr(c, 'value'): cell = self.add_cell(c, context) if cell: stack.extend(cell.inputs or ())
[docs] def finish(self, complete=True, circular=False): if complete: self.complete() cells, get = {}, sh.get_nested_dicts for c in self.cells.values(): rng = c.range.ranges[0] get(cells, rng['excel'], rng['sheet'], default=list).append(c) it = set(self.dsp.data_nodes) - set(self.cells) - set(self.references) for n_id in sorted(it): if isinstance(n_id, sh.Token): continue ra = RangesAssembler(n_id) rng = ra.range.ranges[0] for c in get(cells, rng['excel'], rng['sheet'], default=list): ra.push(c) if not ra.missing.ranges: break ra.add(self.dsp) if circular: self.solve_circular() return self
[docs] def write(self, books=None, solution=None, dirpath=None): books = {} if books is None else books solution = self.dsp.solution if solution is None else solution are_in, get_in = sh.are_in_nested_dicts, sh.get_nested_dicts for k, r in solution.items(): if isinstance(k, sh.Token): continue if isinstance(r, Ranges): rng = r.ranges[0] else: try: r = Ranges().push(k, r) rng = r.ranges[0] except ValueError: rng = {'excel': '', 'sheet': ''} filename, sheet_name = _get_name(rng['excel'], books), rng['sheet'] if not (filename and sheet_name): log.info('Node `%s` cannot be saved ' '(missing filename and/or sheet_name).' % k) continue if not are_in(books, filename, BOOK): from openpyxl import Workbook book = get_in(books, filename, BOOK, default=Workbook) for ws in book.worksheets: book.remove(ws) else: book = books[filename][BOOK] sheet_names = book.sheetnames sheet_name = _get_name(sheet_name, sheet_names) if sheet_name not in sheet_names: book.create_sheet(sheet_name) sheet = book[sheet_name] ref = '{c1}{r1}:{c2}{r2}'.format(**rng) for c, v in zip(flatten(sheet[ref], None), flatten(r.value, None)): try: if v is sh.EMPTY: v = None elif isinstance(v, np.generic): v = v.item() elif isinstance(v, XlError): v = str(v) c.value = v except AttributeError: pass if dirpath: os.makedirs(dirpath, exist_ok=True) for fname, d in books.items(): d[BOOK].save(osp.join(dirpath, fname)) return books
[docs] def compile(self, inputs, outputs): dsp = self.dsp.shrink_dsp(outputs=outputs) dsp.default_values = sh.selector( set(dsp.default_values) - set(inputs), dsp.default_values ) res = dsp() dsp = dsp.get_sub_dsp_from_workflow( outputs, graph=dsp.dmap, reverse=True, blockers=res, wildcard=False ) keys = set(dsp.data_nodes) - set(dsp.default_values) for k, v in sh.selector(keys, res, allow_miss=True).items(): dsp.set_default_value(k, v.value) func = self.compile_class( dsp=dsp, function_id=self.dsp.name, inputs=inputs, outputs=outputs ) return func
[docs] def solve_circular(self): from .cycle import simple_cycles from collections import Counter mod, dsp = {}, self.dsp f_nodes, d_nodes, dmap = dsp.function_nodes, dsp.data_nodes, dsp.dmap cycles = list(simple_cycles(dmap.succ)) cycles_nodes = Counter(sum(cycles, [])) for cycle in sorted(map(set, cycles)): cycles_nodes.subtract(cycle) active_nodes = {k for k, v in cycles_nodes.items() if v} for k in sorted(cycle.intersection(f_nodes)): if _check_cycles(dmap, k, f_nodes, cycle, active_nodes, mod): break else: cycles_nodes.update(cycle) dist = sh.inf(len(cycle) + 1, 0) for k in sorted(cycle.intersection(d_nodes)): dsp.set_default_value(k, ERR_CIRCULAR, dist) if mod: # Update dsp. dsp.add_data(CIRCULAR, ERR_CIRCULAR) for k, v in mod.items(): d = f_nodes[k] d['inputs'] = [CIRCULAR if i in v else i for i in d['inputs']] dmap.remove_edges_from(((i, k) for i in v)) dmap.add_edge(CIRCULAR, k) return self
def _check_range_all_cycles(nodes, active_nodes, j): if isinstance(nodes[j]['function'], RangesAssembler): return active_nodes.intersection(nodes[j]['inputs']) return False def _check_cycles(dmap, node_id, nodes, cycle, active_nodes, mod=None): node, mod = nodes[node_id], {} if mod is None else mod _map = dict(zip(node['function'].inputs, node['inputs'])) pred, res = dmap.pred, () check = functools.partial(_check_range_all_cycles, nodes, active_nodes) if not any(any(map(check, pred[k])) for k in _map.values() if k in cycle): cycle = [i for i, j in _map.items() if j in cycle] try: res = tuple(map(_map.get, node['function'].check_cycles(cycle))) res and sh.get_nested_dicts(mod, node_id, default=set).update(res) except AttributeError: pass return res