Source code for openfisca_core.parameters.helpers

import os
import traceback

import numpy

from openfisca_core import parameters, periods
from openfisca_core.errors import ParameterParsingError
from openfisca_core.parameters import config


def contains_nan(vector):
    if numpy.issubdtype(vector.dtype, numpy.record) or numpy.issubdtype(
        vector.dtype,
        numpy.void,
    ):
        return any(contains_nan(vector[name]) for name in vector.dtype.names)
    return numpy.isnan(vector).any()


[docs] def load_parameter_file(file_path, name=""): """Load parameters from a YAML file (or a directory containing YAML files). :returns: An instance of :class:`.ParameterNode` or :class:`.ParameterScale` or :class:`.Parameter`. """ if not os.path.exists(file_path): msg = f"{file_path} does not exist" raise ValueError(msg) if os.path.isdir(file_path): return parameters.ParameterNode(name, directory_path=file_path) data = _load_yaml_file(file_path) return _parse_child(name, data, file_path)
def _compose_name(path, child_name=None, item_name=None): if not path: return child_name if child_name is not None: return f"{path}.{child_name}" if item_name is not None: return f"{path}[{item_name}]" return None def _load_yaml_file(file_path): with open(file_path) as f: try: return config.yaml.load(f, Loader=config.Loader) except (config.yaml.scanner.ScannerError, config.yaml.parser.ParserError): stack_trace = traceback.format_exc() msg = "Invalid YAML. Check the traceback above for more details." raise ParameterParsingError( msg, file_path, stack_trace, ) except Exception: stack_trace = traceback.format_exc() msg = "Invalid parameter file content. Check the traceback above for more details." raise ParameterParsingError( msg, file_path, stack_trace, ) def _parse_child(child_name, child, child_path): if "values" in child: return parameters.Parameter(child_name, child, child_path) if "brackets" in child: return parameters.ParameterScale(child_name, child, child_path) if isinstance(child, dict) and all( periods.INSTANT_PATTERN.match(str(key)) for key in child ): return parameters.Parameter(child_name, child, child_path) return parameters.ParameterNode(child_name, data=child, file_path=child_path) def _set_backward_compatibility_metadata(parameter, data) -> None: if data.get("unit") is not None: parameter.metadata["unit"] = data["unit"] if data.get("reference") is not None: parameter.metadata["reference"] = data["reference"] def _validate_parameter(parameter, data, data_type=None, allowed_keys=None) -> None: type_map = { dict: "object", list: "array", } if data_type is not None and not isinstance(data, data_type): msg = f"'{parameter.name}' must be of type {type_map[data_type]}." raise ParameterParsingError( msg, parameter.file_path, ) if allowed_keys is not None: keys = data.keys() for key in keys: if key not in allowed_keys: msg = f"Unexpected property '{key}' in '{parameter.name}'. Allowed properties are {list(allowed_keys)}." raise ParameterParsingError( msg, parameter.file_path, )