Source code for openfisca_core.parameters.vectorial_parameter_node_at_instant

from typing import NoReturn

import numpy

from openfisca_core import parameters
from openfisca_core.errors import ParameterNotFoundError
from openfisca_core.indexed_enums import Enum, EnumArray
from openfisca_core.parameters import helpers


[docs] class VectorialParameterNodeAtInstant: """Parameter node of the legislation at a given instant which has been vectorized. Vectorized parameters allow requests such as parameters.housing_benefit[zipcode], where zipcode is a vector. """ @staticmethod def build_from_node(node): VectorialParameterNodeAtInstant.check_node_vectorisable(node) subnodes_name = sorted(node._children.keys()) # Recursively vectorize the children of the node vectorial_subnodes = tuple( [ ( VectorialParameterNodeAtInstant.build_from_node( node[subnode_name], ).vector if isinstance(node[subnode_name], parameters.ParameterNodeAtInstant) else node[subnode_name] ) for subnode_name in subnodes_name ], ) # A vectorial node is a wrapper around a numpy recarray # We first build the recarray recarray = numpy.array( [vectorial_subnodes], dtype=[ ( subnode_name, subnode.dtype if isinstance(subnode, numpy.recarray) else "float", ) for (subnode_name, subnode) in zip(subnodes_name, vectorial_subnodes) ], ) return VectorialParameterNodeAtInstant( node._name, recarray.view(numpy.recarray), node._instant_str, )
[docs] @staticmethod def check_node_vectorisable(node) -> None: """Check that a node can be casted to a vectorial node, in order to be able to use fancy indexing.""" MESSAGE_PART_1 = "Cannot use fancy indexing on parameter node '{}', as" MESSAGE_PART_3 = ( "To use fancy indexing on parameter node, its children must be homogeneous." ) MESSAGE_PART_4 = "See more at <https://openfisca.org/doc/coding-the-legislation/legislation_parameters#computing-a-parameter-that-depends-on-a-variable-fancy-indexing>." def raise_key_inhomogeneity_error( node_with_key, node_without_key, missing_key ) -> NoReturn: message = f"{MESSAGE_PART_1} '{{}}' exists, but '{{}}' doesn't. {MESSAGE_PART_3} {MESSAGE_PART_4}".format( node._name, f"{node_with_key}.{missing_key}", f"{node_without_key}.{missing_key}", ) raise ValueError(message) def raise_type_inhomogeneity_error(node_name, non_node_name) -> NoReturn: message = f"{MESSAGE_PART_1} '{{}}' is a node, but '{{}}' is not. {MESSAGE_PART_3} {MESSAGE_PART_4}".format( node._name, node_name, non_node_name, ) raise ValueError(message) def raise_not_implemented(node_name, node_type) -> NoReturn: message = f"{MESSAGE_PART_1} '{{}}' is a '{{}}', and fancy indexing has not been implemented yet on this kind of parameters. {MESSAGE_PART_4}".format( node._name, node_name, node_type, ) raise NotImplementedError(message) def extract_named_children(node): return { f"{node._name}.{key}": value for key, value in node._children.items() } def check_nodes_homogeneous(named_nodes) -> None: """Check than several nodes (or parameters, or baremes) have the same structure.""" names = list(named_nodes.keys()) nodes = list(named_nodes.values()) first_node = nodes[0] first_name = names[0] if isinstance(first_node, parameters.ParameterNodeAtInstant): children = extract_named_children(first_node) for node, name in list(zip(nodes, names))[1:]: if not isinstance(node, parameters.ParameterNodeAtInstant): raise_type_inhomogeneity_error(first_name, name) first_node_keys = first_node._children.keys() node_keys = node._children.keys() if first_node_keys != node_keys: missing_keys = set(first_node_keys).difference(node_keys) if missing_keys: # If the first_node has a key that node hasn't raise_key_inhomogeneity_error( first_name, name, missing_keys.pop(), ) else: # If If the node has a key that first_node doesn't have missing_key = ( set(node_keys).difference(first_node_keys).pop() ) raise_key_inhomogeneity_error(name, first_name, missing_key) children.update(extract_named_children(node)) check_nodes_homogeneous(children) elif isinstance(first_node, (float, int)): for node, name in list(zip(nodes, names))[1:]: if isinstance(node, (int, float)): pass elif isinstance(node, parameters.ParameterNodeAtInstant): raise_type_inhomogeneity_error(name, first_name) else: raise_not_implemented(name, type(node).__name__) else: raise_not_implemented(first_name, type(first_node).__name__) check_nodes_homogeneous(extract_named_children(node))
def __init__(self, name, vector, instant_str) -> None: self.vector = vector self._name = name self._instant_str = instant_str def __getattr__(self, attribute): result = getattr(self.vector, attribute) if isinstance(result, numpy.recarray): return VectorialParameterNodeAtInstant(result) return result def __getitem__(self, key): # If the key is a string, just get the subnode if isinstance(key, str): return self.__getattr__(key) # If the key is a vector, e.g. ['zone_1', 'zone_2', 'zone_1'] if isinstance(key, numpy.ndarray): if not numpy.issubdtype(key.dtype, numpy.str_): # In case the key is not a string vector, stringify it if key.dtype == object and issubclass(type(key[0]), Enum): enum = type(key[0]) key = numpy.select( [key == item for item in enum], [item.name for item in enum], ) elif isinstance(key, EnumArray): enum = key.possible_values key = numpy.select( [key == item.index for item in enum], [item.name for item in enum], ) else: key = key.astype("str") names = list( self.dtype.names, ) # Get all the names of the subnodes, e.g. ['zone_1', 'zone_2'] default = numpy.full_like( self.vector[key[0]], numpy.nan, ) # In case of unexpected key, we will set the corresponding value to NaN. conditions = [key == name for name in names] values = [self.vector[name] for name in names] result = numpy.select(conditions, values, default) if helpers.contains_nan(result): unexpected_key = set(key).difference(self.vector.dtype.names).pop() msg = f"{self._name}.{unexpected_key}" raise ParameterNotFoundError( msg, self._instant_str, ) # If the result is not a leaf, wrap the result in a vectorial node. if numpy.issubdtype(result.dtype, numpy.record) or numpy.issubdtype( result.dtype, numpy.void, ): return VectorialParameterNodeAtInstant( self._name, result.view(numpy.recarray), self._instant_str, ) return result return None