"""
Class designed to facilitate the sampling of the model's
parameter space
"""
__author__ = ["Karl Naumann-Woleske"]
__credits__ = ["Karl Naumann-Woleske"]
__license__ = "MIT"
__maintainer__ = ["Karl Naumann-Woleske"]
# Default libraries
import copy
import gc
import logging
import multiprocessing as mp
import os
from datetime import datetime as dt
from pathlib import Path
# Third-party libraries
import pandas as pd
import macrostat.util.batchprocessing as msbatchprocessing
from macrostat.core import Model
logger = logging.getLogger(__name__)
[docs]
class BaseSampler:
def __init__(
self,
model: Model,
bounds: dict | None = None,
logspace: bool = False,
worker_function: callable = msbatchprocessing.timeseries_worker,
simulation_args: tuple = (),
output_folder: str = "samples",
cpu_count: int = 1,
batchsize: int = None,
output_filetype: str = "csv",
output_compression: str | None = None,
):
"""Generalized class to facilitate the sampling of the model's
parameterspace using python's multiprocessing library.
Parameters
----------
model: Model
Model to be sampled
worker_function: callable (default batchprocessing.timeseries_worker)
Function to be used for the parallel processing
output_folder: str (default "samples")
Folder to save the output files
cpu_count: int (default 1)
Number of CPUs to use for the parallel processing
batchsize: int (default None)
Size of each batch to be processed in parallel
output_filetype: str (default "csv")
Filetype to use for the output files. Options are
"csv", "parquet"
output_compression: str (default None)
Compression method to use for the output files. Options are
None (default), "gzip" or "zstd
"""
# Model parameters
self.model = model
self.modelclass = type(model)
self.base_parameters = copy.deepcopy(model.parameters)
# Boundaries for the parameters
self.logspace = logspace
self.bounds = (
bounds if bounds is not None else self.model.parameters.get_bounds()
)
self.verify_bounds(self.bounds)
# Computation parameters
self.worker_function = worker_function
self.cpu_count = min([mp.cpu_count(), cpu_count])
self.batchsize = batchsize
self.simulation_args = simulation_args
# Set up the output folder
self.output_folder = Path(output_folder)
self.output_filetype = output_filetype
self.output_compression = output_compression
os.makedirs(output_folder, exist_ok=True)
[docs]
def generate_parameters(self):
"""Generate parameters for the parallel processor"""
raise NotImplementedError("This method should be implemented in a subclass")
[docs]
def generate_tasks(self, points: pd.DataFrame):
"""Generate tasks for the parallel processor based on the parameters
generated by the `generate_parameters` method.
Parameters
----------
points: pd.DataFrame
DataFrame containing the points to be processed
Returns
-------
list[tuple]
List of tuples containing the model and the task to be processed
"""
tasks = []
for i in points.index:
# Keep all the information, just change the values
values = self.model.parameters.get_default_parameters()
for k, v in points.loc[i].to_dict().items():
values[k]["value"] = v
values[k]["lower bound"] = self.bounds[k][0]
values[k]["upper bound"] = self.bounds[k][1]
newparams = self.model.parameters.__class__(
parameters=values,
hyperparameters=self.model.parameters.hyper,
)
# Create new model instance with new parameters
newmodel = self.model.__class__(
parameters=newparams,
scenarios=self.model.scenarios,
variables=self.model.variables,
behavior=self.model.behavior,
log_level=logging.CRITICAL, # Suppress logging
)
# Generate the task to execute
tasks.append((i, newmodel, *self.simulation_args))
return tasks
[docs]
def sample(self, verbose: bool = False, points: pd.DataFrame = None):
"""Run in parallel the sampling of the model's parameterspace
by generating a set of tasks and executing them in parallel
Parameters
----------
verbose: bool (default False)
Whether to print progress information
"""
try:
# Generate the Sobol points
if points is None:
self.points = self.generate_parameters()
else:
self.points = points
# Run the parallel processing in batches to conserve memory
# This will write results to disk, clear memory, and proceed
if self.batchsize is None:
self.batchsize = self.points.shape[0]
batchcount = int(self.points.shape[0] / self.batchsize) + (
self.points.shape[0] % self.batchsize > 0
)
start_time = dt.now()
logger.info(
f"Processing {self.points.shape[0]} tasks starting at {start_time}"
)
logger.info(f"Expecting to use {batchcount} batches")
for batch in range(batchcount):
try:
if verbose and batch != 0:
elapsed = dt.now() - start_time
logger.info(
f"Processing batch {batch+1:05d} of {batchcount:05d}. Elapsed {elapsed} ({elapsed/batch} per batch)"
)
# Generate the tasks to run
# logger.info("Generating tasks")
end = min([(batch + 1) * self.batchsize, self.points.shape[0]])
batch_tasks = self.generate_tasks(
points=self.points.iloc[batch * self.batchsize : end]
)
# Save the parameters
# logger.info("Saving parameters")
parameters = {
v[0]: v[1].parameters.get_values() for v in batch_tasks
}
parameters = pd.DataFrame(parameters).T.to_csv(
self.output_folder / f"parameters_{batch}.csv", index_label="id"
)
# Execute those tasks
raw_outputs = msbatchprocessing.parallel_processor(
tasks=batch_tasks,
worker=self.worker_function,
cpu_count=self.cpu_count,
)
# Save the outputs to disk
self.save_outputs(raw_outputs, batch=batch)
# Clean up batch resources
del raw_outputs
gc.collect()
except Exception as e:
logger.error(f"Error processing batch {batch}: {str(e)}")
raise
except Exception as e:
logger.error(f"Error in sampling process: {str(e)}")
raise
finally:
# Clean up any remaining resources
logger.info("Performing final cleanup")
if hasattr(self, "tasks"):
del self.tasks
gc.collect()
[docs]
def save_outputs(self, raw_outputs: list, batch: int):
"""Save the raw outputs to disk.
The model's outputs are in the form of a pandas DataFrame.
This method should save the outputs to disk in a format that
can be easily read back in later. Generically, it writes a
CSV file with the outputs in a MultiIndex format. However,
this can be overwritten to save in a different format.
Parameters
----------
raw_outputs: list
List of outputs from the parallel processing. By default,
batchprocessing.timeseries_worker returns a tuple of
(*task_arguments, output)
batch: int
Batch number to save the outputs. Assumes that
the batchsize is constant.
"""
# Concatenate the outputs
index_names = list(raw_outputs[0][-1].index.names)
if all(x is None for x in index_names):
index_names = [f"index{i+1}" for i in range(len(index_names))]
data = {v[0]: v[-1] for v in raw_outputs}
data = pd.concat(
data.values(), keys=data.keys(), names=["ID"] + index_names, axis=0
)
# Save the outputs to batch-specific files
if self.output_filetype == "csv":
data.to_csv(
self.output_folder / f"outputs_{batch}.csv",
compression=self.output_compression,
)
elif self.output_filetype == "parquet":
data.to_parquet(
self.output_folder / f"outputs_{batch}.parquet",
compression=self.output_compression,
)
else:
raise ValueError(f"Invalid output filetype: {self.output_filetype}")
[docs]
def verify_bounds(self, bounds: dict) -> None:
"""Verify that the bounds are correctly set, in particular
0. Check that the parameters are in the model
1. That there is a lower and upper bound for each parameter
2. That the lower bound is smaller than the upper bound
3. That the bounds are in the correct order
4. If the bounds are in logspace, that the bounds are either
both positive or both negative
5. If the bounds are in logspace, that either bound is not zero
Parameters
----------
bounds: dict[str, tuple]
Dictionary containing the bounds for each parameter to be sampled
logspace: bool
Whether to sample the parameters in logspace
Returns
-------
None
Raises
------
ValueError
If the bounds are not correctly set
"""
# Check that the bounds are correctly set
for param, bound in bounds.items():
if param not in self.model.parameters:
raise ValueError(f"Parameter {param} not in the model's parameters")
if len(bound) != 2:
raise ValueError(
f"Bounds should be a list-like of length 2. {param}: {bound}"
)
if self.logspace and (bound[0] < 0) != (bound[1] < 0):
msg = "Bounds should be either both positive or both negative"
raise ValueError(f"{msg}. {param}: {bound}")
if self.logspace and (bound[0] == 0 or bound[1] == 0):
raise ValueError(
f"Bounds cannot be zero when using logspace. {param}: {bound}"
)
if bound[0] >= bound[1]:
msg = "Lower bound should be smaller than the upper bound"
raise ValueError(f"{msg}. {param}: {bound}")