from __future__ import annotations
from collections.abc import Sequence
from typing import Any
import os
import warnings
import numpy
import psutil
from openfisca_core import (
commons,
data_storage as storage,
errors,
indexed_enums as enums,
periods,
types,
)
from .memory_usage import MemoryUsage
[docs]
class Holder:
"""A holder keeps tracks of a variable values after they have been calculated, or set as an input."""
def __init__(self, variable, population) -> None:
self.population = population
self.variable = variable
self.simulation = population.simulation
self._eternal = self.variable.definition_period == periods.DateUnit.ETERNITY
self._memory_storage = storage.InMemoryStorage(is_eternal=self._eternal)
# By default, do not activate on-disk storage, or variable dropping
self._disk_storage = None
self._on_disk_storable = False
self._do_not_store = False
if self.simulation and self.simulation.memory_config:
if (
self.variable.name
not in self.simulation.memory_config.priority_variables
):
self._disk_storage = self.create_disk_storage()
self._on_disk_storable = True
if self.variable.name in self.simulation.memory_config.variables_to_drop:
self._do_not_store = True
[docs]
def clone(self, population):
"""Copy the holder just enough to be able to run a new simulation without modifying the original simulation."""
new = commons.empty_clone(self)
new_dict = new.__dict__
for key, value in self.__dict__.items():
if key not in ("population", "formula", "simulation"):
new_dict[key] = value
new_dict["population"] = population
new_dict["simulation"] = population.simulation
return new
def create_disk_storage(self, directory=None, preserve=False):
if directory is None:
directory = self.simulation.data_storage_dir
storage_dir = os.path.join(directory, self.variable.name)
if not os.path.isdir(storage_dir):
os.mkdir(storage_dir)
return storage.OnDiskStorage(
storage_dir,
self._eternal,
preserve_storage_dir=preserve,
)
[docs]
def delete_arrays(self, period=None) -> None:
"""If ``period`` is ``None``, remove all known values of the variable.
If ``period`` is not ``None``, only remove all values for any period included in period (e.g. if period is "2017", values for "2017-01", "2017-07", etc. would be removed)
"""
self._memory_storage.delete(period)
if self._disk_storage:
self._disk_storage.delete(period)
[docs]
def get_array(self, period):
"""Get the value of the variable for the given period.
If the value is not known, return ``None``.
"""
if self.variable.is_neutralized:
return self.default_array()
value = self._memory_storage.get(period)
if value is not None:
return value
if self._disk_storage:
return self._disk_storage.get(period)
return None
[docs]
def get_memory_usage(self) -> MemoryUsage:
"""Get data about the virtual memory usage of the Holder.
Returns:
Memory usage data.
Examples:
>>> from pprint import pprint
>>> from openfisca_core import (
... entities,
... populations,
... simulations,
... taxbenefitsystems,
... variables,
... )
>>> entity = entities.Entity("", "", "", "")
>>> class MyVariable(variables.Variable):
... definition_period = periods.DateUnit.YEAR
... entity = entity
... value_type = int
>>> population = populations.Population(entity)
>>> variable = MyVariable()
>>> holder = Holder(variable, population)
>>> tbs = taxbenefitsystems.TaxBenefitSystem([entity])
>>> entities = {entity.key: population}
>>> simulation = simulations.Simulation(tbs, entities)
>>> holder.simulation = simulation
>>> pprint(holder.get_memory_usage(), indent=3)
{ 'cell_size': nan,
'dtype': <class 'numpy.int32'>,
'nb_arrays': 0,
'nb_cells_by_array': 0,
'total_nb_bytes': 0...
"""
usage = MemoryUsage(
nb_cells_by_array=self.population.count,
dtype=self.variable.dtype,
)
usage.update(self._memory_storage.get_memory_usage())
if self.simulation.trace:
nb_requests = self.simulation.tracer.get_nb_requests(self.variable.name)
usage.update(
{
"nb_requests": nb_requests,
"nb_requests_by_array": (
nb_requests / float(usage["nb_arrays"])
if usage["nb_arrays"] > 0
else numpy.nan
),
},
)
return usage
[docs]
def get_known_periods(self):
"""Get the list of periods the variable value is known for."""
return list(self._memory_storage.get_known_periods()) + list(
self._disk_storage.get_known_periods() if self._disk_storage else [],
)
def _to_array(self, value):
if not isinstance(value, numpy.ndarray):
value = numpy.asarray(value)
if value.ndim == 0:
# 0-dim arrays are casted to scalar when they interact with float. We don't want that.
value = value.reshape(1)
if len(value) != self.population.count:
msg = f'Unable to set value "{value}" for variable "{self.variable.name}", as its length is {len(value)} while there are {self.population.count} {self.population.entity.plural} in the simulation.'
raise ValueError(
msg,
)
if self.variable.value_type == enums.Enum:
value = self.variable.possible_values.encode(value)
if value.dtype != self.variable.dtype:
try:
value = value.astype(self.variable.dtype)
except ValueError:
msg = f'Unable to set value "{value}" for variable "{self.variable.name}", as the variable dtype "{self.variable.dtype}" does not match the value dtype "{value.dtype}".'
raise ValueError(
msg,
)
return value
def _set(self, period, value) -> None:
value = self._to_array(value)
if not self._eternal:
if period is None:
msg = (
f"A period must be specified to set values, except for variables with "
f"{periods.DateUnit.ETERNITY.upper()} as as period_definition."
)
raise ValueError(
msg,
)
if self.variable.definition_period != period.unit or period.size > 1:
name = self.variable.name
period_size_adj = (
f"{period.unit}"
if (period.size == 1)
else f"{period.size}-{period.unit}s"
)
error_message = os.linesep.join(
[
f'Unable to set a value for variable "{name}" for {period_size_adj}-long period "{period}".',
f'"{name}" can only be set for one {self.variable.definition_period} at a time. Please adapt your input.',
f'If you are the maintainer of "{name}", you can consider adding it a set_input attribute to enable automatic period casting.',
],
)
raise errors.PeriodMismatchError(
self.variable.name,
period,
self.variable.definition_period,
error_message,
)
should_store_on_disk = (
self._on_disk_storable
and self._memory_storage.get(period) is None
and psutil.virtual_memory().percent # If there is already a value in memory, replace it and don't put a new value in the disk storage
>= self.simulation.memory_config.max_memory_occupation_pc
)
if should_store_on_disk:
self._disk_storage.put(value, period)
else:
self._memory_storage.put(value, period)
def put_in_cache(self, value, period) -> None:
if self._do_not_store:
return
if (
self.simulation.opt_out_cache
and self.simulation.tax_benefit_system.cache_blacklist
and self.variable.name in self.simulation.tax_benefit_system.cache_blacklist
):
return
self._set(period, value)
[docs]
def default_array(self):
"""Return a new array of the appropriate length for the entity, filled with the variable default values."""
return self.variable.default_array(self.population.count)