from __future__ import annotations

import copy
import os
import typing

from openfisca_core import commons, parameters, tools
from . import config, helpers, AtInstantLike, Parameter, ParameterNodeAtInstant

[docs]class ParameterNode(AtInstantLike): """ A node in the legislation `parameter tree <>`_. """ _allowed_keys: typing.Optional[typing.Iterable[str]] = None # By default, no restriction on the keys def __init__(self, name = "", directory_path = None, data = None, file_path = None): """ Instantiate a ParameterNode either from a dict, (using `data`), or from a directory containing YAML files (using `directory_path`). :param str name: Name of the node, eg "taxes.some_tax". :param str directory_path: Directory containing YAML files describing the node. :param dict data: Object representing the parameter node. It usually has been extracted from a YAML file. :param str file_path: YAML file from which the `data` has been extracted from. Instantiate a ParameterNode from a dict: >>> node = ParameterNode('basic_income', data = { 'amount': { 'values': { "2015-01-01": {'value': 550}, "2016-01-01": {'value': 600} } }, 'min_age': { 'values': { "2015-01-01": {'value': 25}, "2016-01-01": {'value': 18} } }, }) Instantiate a ParameterNode from a directory containing YAML parameter files: >>> node = ParameterNode('benefits', directory_path = '/path/to/country_package/parameters/benefits') """ str = name self.children: typing.Dict[str, typing.Union[ParameterNode, Parameter, parameters.ParameterScale]] = {} self.description: str = None self.documentation: str = None self.file_path: str = None self.metadata: typing.Dict = {} if directory_path: self.file_path = directory_path for child_name in os.listdir(directory_path): child_path = os.path.join(directory_path, child_name) if os.path.isfile(child_path): child_name, ext = os.path.splitext(child_name) # We ignore non-YAML files if ext not in config.FILE_EXTENSIONS: continue if child_name == 'index': data = helpers._load_yaml_file(child_path) or {} helpers._validate_parameter(self, data, allowed_keys = config.COMMON_KEYS) self.description = data.get('description') self.documentation = data.get('documentation') helpers._set_backward_compatibility_metadata(self, data) self.metadata.update(data.get('metadata', {})) else: child_name_expanded = helpers._compose_name(name, child_name) child = helpers.load_parameter_file(child_path, child_name_expanded) self.add_child(child_name, child) elif os.path.isdir(child_path): child_name = os.path.basename(child_path) child_name_expanded = helpers._compose_name(name, child_name) child = ParameterNode(child_name_expanded, directory_path = child_path) self.add_child(child_name, child) else: self.file_path = file_path helpers._validate_parameter(self, data, data_type = dict, allowed_keys = self._allowed_keys) self.description = data.get('description') self.documentation = data.get('documentation') helpers._set_backward_compatibility_metadata(self, data) self.metadata.update(data.get('metadata', {})) for child_name, child in data.items(): if child_name in config.COMMON_KEYS: continue # do not treat reserved keys as subparameters. child_name = str(child_name) child_name_expanded = helpers._compose_name(name, child_name) child = helpers._parse_child(child_name_expanded, child, file_path) self.add_child(child_name, child)
[docs] def merge(self, other): """ Merges another ParameterNode into the current node. In case of child name conflict, the other node child will replace the current node child. """ for child_name, child in other.children.items(): self.add_child(child_name, child)
[docs] def add_child(self, name, child): """ Add a new child to the node. :param name: Name of the child that must be used to access that child. Should not contain anything that could interfere with the operator `.` (dot). :param child: The new child, an instance of :class:`.ParameterScale` or :class:`.Parameter` or :class:`.ParameterNode`. """ if name in self.children: raise ValueError("{} has already a child named {}".format(, name)) if not (isinstance(child, ParameterNode) or isinstance(child, Parameter) or isinstance(child, parameters.ParameterScale)): raise TypeError("child must be of type ParameterNode, Parameter, or Scale. Instead got {}".format(type(child))) self.children[name] = child setattr(self, name, child)
def __repr__(self): result = os.linesep.join( [os.linesep.join( ["{}:", "{}"]).format(name, tools.indent(repr(value))) for name, value in sorted(self.children.items())] ) return result
[docs] def get_descendants(self): """ Return a generator containing all the parameters and nodes recursively contained in this `ParameterNode` """ for child in self.children.values(): yield child yield from child.get_descendants()
def clone(self): clone = commons.empty_clone(self) clone.__dict__ = self.__dict__.copy() clone.metadata = copy.deepcopy(self.metadata) clone.children = { key: child.clone() for key, child in self.children.items() } for child_key, child in clone.children.items(): setattr(clone, child_key, child) return clone def _get_at_instant(self, instant): return ParameterNodeAtInstant(, self, instant)