Source code for macrostat.sample.sampler

"""
Class designed to facilitate the sampling of the model's
parameter space
"""

__author__ = ["Karl Naumann-Woleske"]
__credits__ = ["Karl Naumann-Woleske"]
__license__ = "MIT"
__maintainer__ = ["Karl Naumann-Woleske"]

# Default libraries
import copy
import logging
import multiprocessing as mp
import os
from pathlib import Path

# Third-party libraries
import pandas as pd

import macrostat.util.batchprocessing as msbatchprocessing
from macrostat.core import Model

logger = logging.getLogger(__name__)


[docs] class BaseSampler: def __init__( self, model: Model, bounds: dict | None = None, logspace: bool = False, worker_function: callable = msbatchprocessing.timeseries_worker, simulation_args: tuple = (), output_folder: str = "samples", cpu_count: int = 1, batchsize: int = None, output_filetype: str = "csv", output_compression: str | None = None, ): """Generalized class to facilitate the sampling of the model's parameterspace using python's multiprocessing library. Parameters ---------- model: Model Model to be sampled worker_function: callable (default batchprocessing.timeseries_worker) Function to be used for the parallel processing output_folder: str (default "samples") Folder to save the output files cpu_count: int (default 1) Number of CPUs to use for the parallel processing batchsize: int (default None) Size of each batch to be processed in parallel output_filetype: str (default "csv") Filetype to use for the output files. Options are "csv", "parquet" output_compression: str (default None) Compression method to use for the output files. Options are None (default), "gzip" or "zstd """ # Model parameters self.model = model self.modelclass = type(model) self.base_parameters = copy.deepcopy(model.parameters) # Boundaries for the parameters self.logspace = logspace self.bounds = ( bounds if bounds is not None else self.model.parameters.get_bounds() ) self.verify_bounds(self.bounds) # Computation parameters self.worker_function = worker_function self.cpu_count = min([mp.cpu_count(), cpu_count]) self.batchsize = batchsize self.simulation_args = simulation_args # Set up the output folder self.output_folder = Path(output_folder) self.output_filetype = output_filetype self.output_compression = output_compression os.makedirs(output_folder, exist_ok=True)
[docs] def generate_parameters(self): """Generate parameters for the parallel processor based on the Sobol sequence for the model's parameterspace using the bounds set in the class. """ raise NotImplementedError("This method should be implemented in a subclass")
[docs] def generate_tasks(self): """Generate tasks for the parallel processor based on the parameters generated by the `generate_parameters` method. Returns ------- list[tuple] List of tuples containing the model and the task to be processed """ # Generate the Sobol points points = self.generate_parameters() tasks = [] for i in points.index: # Copy base parameters to ensure a full set of parameters newparams = copy.deepcopy(self.base_parameters) for key in points.columns: newparams[key] = points.loc[i, key] # Generate the task to execute newmodel = copy.deepcopy(self.model) newmodel.parameters = newparams tasks.append((i, newmodel, *self.simulation_args)) return tasks
[docs] def sample(self, tqdm_info: str = "Sampling"): """Run in parallel the sampling of the model's parameterspace by generating a set of tasks and executing them in parallel Parameters ---------- tqdm_info: str (default "Sampling") Information to be displayed in the tqdm progress bar """ # Generate the tasks to run self.tasks = self.generate_tasks() # Save the parameters parameters = {v[0]: v[1].parameters.get_values() for v in self.tasks} parameters = pd.DataFrame(parameters).T.to_csv( self.output_folder / "parameters.csv", index_label="id" ) # Run the parallel processing in batches to conserve memory # This will write results to disk, clear memory, and proceed if self.batchsize is None: self.batchsize = len(self.tasks) batchcount = int(len(self.tasks) / self.batchsize) + ( len(self.tasks) % self.batchsize > 0 ) for batch in range(batchcount): # Set tasks to run now start = batch * self.batchsize end = min([(batch + 1) * self.batchsize, len(self.tasks)]) batch_tasks = self.tasks[start:end] # Execute those tasks raw_outputs = msbatchprocessing.parallel_processor( tasks=batch_tasks, worker=self.worker_function, cpu_count=self.cpu_count, tqdm_info=tqdm_info, ) # Save the outputs to disk self.save_outputs(raw_outputs, batch=batch)
[docs] def save_outputs(self, raw_outputs: list, batch: int): """Save the raw outputs to disk. The model's outputs are in the form of a pandas DataFrame. This method should save the outputs to disk in a format that can be easily read back in later. Generically, it writes a CSV file with the outputs in a MultiIndex format. However, this can be overwritten to save in a different format. Parameters ---------- raw_outputs: list List of outputs from the parallel processing. By default, batchprocessing.timeseries_worker returns a tuple of (*task_arguments, output) batch: int Batch number to save the outputs. Assumes that the batchsize is constant. """ # Concatenate the outputs index_names = list(raw_outputs[0][-1].index.names) data = {v[0]: v[-1] for v in raw_outputs} data = pd.concat( data.values(), keys=data.keys(), names=["ID"] + index_names, axis=0 ) # Save the outputs to batch-specific files if self.output_filetype == "csv": data.to_csv( self.output_folder / f"outputs_{batch}.csv", compression=self.output_compression, ) elif self.output_filetype == "parquet": data.to_parquet( self.output_folder / f"outputs_{batch}.parquet", compression=self.output_compression, ) else: raise ValueError(f"Invalid output filetype: {self.output_filetype}")
[docs] def verify_bounds(self, bounds: dict) -> None: """Verify that the bounds are correctly set, in particular 0. Check that the parameters are in the model 1. That there is a lower and upper bound for each parameter 2. That the lower bound is smaller than the upper bound 3. That the bounds are in the correct order 4. If the bounds are in logspace, that the bounds are either both positive or both negative 5. If the bounds are in logspace, that either bound is not zero Parameters ---------- bounds: dict[str, tuple] Dictionary containing the bounds for each parameter to be sampled logspace: bool Whether to sample the parameters in logspace Returns ------- None Raises ------ ValueError If the bounds are not correctly set """ # Check that the bounds are correctly set for param, bound in bounds.items(): if param not in self.model.parameters: raise ValueError(f"Parameter {param} not in the model's parameters") if len(bound) != 2: raise ValueError( f"Bounds should be a list-like of length 2. {param}: {bound}" ) if self.logspace and (bound[0] < 0) != (bound[1] < 0): msg = "Bounds should be either both positive or both negative" raise ValueError(f"{msg}. {param}: {bound}") if self.logspace and (bound[0] == 0 or bound[1] == 0): raise ValueError( f"Bounds cannot be zero when using logspace. {param}: {bound}" ) if bound[0] >= bound[1]: msg = "Lower bound should be smaller than the upper bound" raise ValueError(f"{msg}. {param}: {bound}")