from __future__ import annotations
from typing import Any, Optional, Sequence, Union
import os
import warnings
import numpy
import psutil
from openfisca_core import (
errors,
commons,
data_storage as storage,
indexed_enums as enums,
periods,
tools,
types,
)
from .memory_usage import MemoryUsage
[docs]class Holder:
"""
A holder keeps tracks of a variable values after they have been calculated, or set as an input.
"""
def __init__(self, variable, population):
self.population = population
self.variable = variable
self.simulation = population.simulation
self._memory_storage = storage.InMemoryStorage(is_eternal = (self.variable.definition_period == periods.ETERNITY))
# By default, do not activate on-disk storage, or variable dropping
self._disk_storage = None
self._on_disk_storable = False
self._do_not_store = False
if self.simulation and self.simulation.memory_config:
if self.variable.name not in self.simulation.memory_config.priority_variables:
self._disk_storage = self.create_disk_storage()
self._on_disk_storable = True
if self.variable.name in self.simulation.memory_config.variables_to_drop:
self._do_not_store = True
[docs] def clone(self, population):
"""
Copy the holder just enough to be able to run a new simulation without modifying the original simulation.
"""
new = commons.empty_clone(self)
new_dict = new.__dict__
for key, value in self.__dict__.items():
if key not in ('population', 'formula', 'simulation'):
new_dict[key] = value
new_dict['population'] = population
new_dict['simulation'] = population.simulation
return new
def create_disk_storage(self, directory = None, preserve = False):
if directory is None:
directory = self.simulation.data_storage_dir
storage_dir = os.path.join(directory, self.variable.name)
if not os.path.isdir(storage_dir):
os.mkdir(storage_dir)
return storage.OnDiskStorage(
storage_dir,
is_eternal = (self.variable.definition_period == periods.ETERNITY),
preserve_storage_dir = preserve
)
[docs] def delete_arrays(self, period = None):
"""
If ``period`` is ``None``, remove all known values of the variable.
If ``period`` is not ``None``, only remove all values for any period included in period (e.g. if period is "2017", values for "2017-01", "2017-07", etc. would be removed)
"""
self._memory_storage.delete(period)
if self._disk_storage:
self._disk_storage.delete(period)
[docs] def get_array(self, period):
"""
Get the value of the variable for the given period.
If the value is not known, return ``None``.
"""
if self.variable.is_neutralized:
return self.default_array()
value = self._memory_storage.get(period)
if value is not None:
return value
if self._disk_storage:
return self._disk_storage.get(period)
[docs] def get_memory_usage(self) -> MemoryUsage:
"""Get data about the virtual memory usage of the Holder.
Returns:
Memory usage data.
Examples:
>>> from pprint import pprint
>>> from openfisca_core import (
... entities,
... populations,
... simulations,
... taxbenefitsystems,
... variables,
... )
>>> entity = entities.Entity("", "", "", "")
>>> class MyVariable(variables.Variable):
... definition_period = "year"
... entity = entity
... value_type = int
>>> population = populations.Population(entity)
>>> variable = MyVariable()
>>> holder = Holder(variable, population)
>>> tbs = taxbenefitsystems.TaxBenefitSystem([entity])
>>> entities = {entity.key: population}
>>> simulation = simulations.Simulation(tbs, entities)
>>> holder.simulation = simulation
>>> pprint(holder.get_memory_usage(), indent = 3)
{ 'cell_size': nan,
'dtype': <class 'numpy.int32'>,
'nb_arrays': 0,
'nb_cells_by_array': 0,
'total_nb_bytes': 0...
"""
usage = MemoryUsage(
nb_cells_by_array = self.population.count,
dtype = self.variable.dtype,
)
usage.update(self._memory_storage.get_memory_usage())
if self.simulation.trace:
nb_requests = self.simulation.tracer.get_nb_requests(self.variable.name)
usage.update(dict(
nb_requests = nb_requests,
nb_requests_by_array = nb_requests / float(usage['nb_arrays']) if usage['nb_arrays'] > 0 else numpy.nan
))
return usage
[docs] def get_known_periods(self):
"""
Get the list of periods the variable value is known for.
"""
return list(self._memory_storage.get_known_periods()) + list((
self._disk_storage.get_known_periods() if self._disk_storage else []))
def _to_array(self, value):
if not isinstance(value, numpy.ndarray):
value = numpy.asarray(value)
if value.ndim == 0:
# 0-dim arrays are casted to scalar when they interact with float. We don't want that.
value = value.reshape(1)
if len(value) != self.population.count:
raise ValueError(
'Unable to set value "{}" for variable "{}", as its length is {} while there are {} {} in the simulation.'
.format(value, self.variable.name, len(value), self.population.count, self.population.entity.plural))
if self.variable.value_type == enums.Enum:
value = self.variable.possible_values.encode(value)
if value.dtype != self.variable.dtype:
try:
value = value.astype(self.variable.dtype)
except ValueError:
raise ValueError(
'Unable to set value "{}" for variable "{}", as the variable dtype "{}" does not match the value dtype "{}".'
.format(value, self.variable.name, self.variable.dtype, value.dtype))
return value
def _set(self, period, value):
value = self._to_array(value)
if self.variable.definition_period != periods.ETERNITY:
if period is None:
raise ValueError('A period must be specified to set values, except for variables with periods.ETERNITY as as period_definition.')
if (self.variable.definition_period != period.unit or period.size > 1):
name = self.variable.name
period_size_adj = f'{period.unit}' if (period.size == 1) else f'{period.size}-{period.unit}s'
error_message = os.linesep.join([
f'Unable to set a value for variable "{name}" for {period_size_adj}-long period "{period}".',
f'"{name}" can only be set for one {self.variable.definition_period} at a time. Please adapt your input.',
f'If you are the maintainer of "{name}", you can consider adding it a set_input attribute to enable automatic period casting.'
])
raise errors.PeriodMismatchError(
self.variable.name,
period,
self.variable.definition_period,
error_message
)
should_store_on_disk = (
self._on_disk_storable and
self._memory_storage.get(period) is None and # If there is already a value in memory, replace it and don't put a new value in the disk storage
psutil.virtual_memory().percent >= self.simulation.memory_config.max_memory_occupation_pc
)
if should_store_on_disk:
self._disk_storage.put(value, period)
else:
self._memory_storage.put(value, period)
def put_in_cache(self, value, period):
if self._do_not_store:
return
if (self.simulation.opt_out_cache and
self.simulation.tax_benefit_system.cache_blacklist and
self.variable.name in self.simulation.tax_benefit_system.cache_blacklist):
return
self._set(period, value)
[docs] def default_array(self):
"""
Return a new array of the appropriate length for the entity, filled with the variable default values.
"""
return self.variable.default_array(self.population.count)