Source code for macrostat.sample.sampler

"""
Class designed to facilitate the sampling of the model's
parameter space
"""

__author__ = ["Karl Naumann-Woleske"]
__credits__ = ["Karl Naumann-Woleske"]
__license__ = "MIT"
__maintainer__ = ["Karl Naumann-Woleske"]

# Default libraries
import copy
import gc
import logging
import multiprocessing as mp
import os
from datetime import datetime as dt
from pathlib import Path

# Third-party libraries
import pandas as pd

import macrostat.util.batchprocessing as msbatchprocessing
from macrostat.core import Model

logger = logging.getLogger(__name__)


[docs] class BaseSampler: def __init__( self, model: Model, bounds: dict | None = None, logspace: bool = False, worker_function: callable = msbatchprocessing.timeseries_worker, simulation_args: tuple = (), output_folder: str = "samples", cpu_count: int = 1, batchsize: int = None, save_to_disk: bool = True, output_filetype: str = "csv", output_compression: str | None = None, ): """Generalized class to facilitate the sampling of the model's parameterspace using python's multiprocessing library. Parameters ---------- model: Model Model to be sampled worker_function: callable (default batchprocessing.timeseries_worker) Function to be used for the parallel processing output_folder: str (default "samples") Folder to save the output files cpu_count: int (default 1) Number of CPUs to use for the parallel processing batchsize: int (default None) Size of each batch to be processed in parallel save_to_disk: bool (default True) Save each of the batches to disk individually output_filetype: str (default "csv") Filetype to use for the output files. Options are "csv", "parquet" output_compression: str (default None) Compression method to use for the output files. Options are None (default), "gzip" or "zstd """ # Model parameters self.model = model self.modelclass = type(model) self.base_parameters = copy.deepcopy(model.parameters) # Boundaries for the parameters self.logspace = logspace if bounds is not None: self.bounds = bounds else: all_bounds = self.model.parameters.get_bounds() free_names = set(self.model.parameters.get_free_param_names()) self.bounds = {k: v for k, v in all_bounds.items() if k in free_names} self.verify_bounds(self.bounds) # Computation parameters self.worker_function = worker_function self.cpu_count = min([mp.cpu_count(), cpu_count]) self.batchsize = batchsize self.simulation_args = simulation_args # Set up the output folder self.save_to_disk = save_to_disk self.output_folder = Path(output_folder) self.output_filetype = output_filetype self.output_compression = output_compression os.makedirs(output_folder, exist_ok=True)
[docs] def generate_parameters(self): """Generate parameters for the parallel processor""" raise NotImplementedError("This method should be implemented in a subclass")
[docs] def generate_tasks(self, points: pd.DataFrame): """Generate tasks for the parallel processor based on the parameters generated by the `generate_parameters` method. Parameters ---------- points: pd.DataFrame DataFrame containing the points to be processed Returns ------- list[tuple] List of tuples containing the model and the task to be processed """ tasks = [] for i in points.index: # Keep all the information, just change the values values = self.model.parameters.get_default_parameters() for k, v in points.loc[i].to_dict().items(): values[k]["value"] = v values[k]["lower bound"] = self.bounds[k][0] values[k]["upper bound"] = self.bounds[k][1] newparams = self.model.parameters.__class__( parameters=values, hyperparameters=self.model.parameters.hyper, ) # Create new model instance with new parameters newmodel = self.model.__class__( parameters=newparams, scenarios=self.model.scenarios, variables=self.model.variables, log_level=logging.CRITICAL, # Suppress logging ) # Generate the task to execute tasks.append((i, newmodel, *self.simulation_args)) return tasks
[docs] def sample(self, verbose: bool = False, points: pd.DataFrame = None): """Run in parallel the sampling of the model's parameterspace by generating a set of tasks and executing them in parallel Parameters ---------- verbose: bool (default False) Whether to print progress information """ try: if points is None: self.points = self.generate_parameters() else: self.points = points # Run the parallel processing in batches to conserve memory if self.batchsize is None: self.batchsize = self.points.shape[0] batchcount = int(self.points.shape[0] / self.batchsize) + ( self.points.shape[0] % self.batchsize > 0 ) start_time = dt.now() logger.info( f"Processing {self.points.shape[0]} tasks starting at {start_time}" ) logger.info(f"Expecting to use {batchcount} batches") if not self.save_to_disk: all_outputs = {} for batch in range(batchcount): try: if verbose and batch != 0: elapsed = dt.now() - start_time logger.info( f"Processing batch {batch+1:05d} of {batchcount:05d}. Elapsed {elapsed} ({elapsed/batch} per batch)" ) end = min([(batch + 1) * self.batchsize, self.points.shape[0]]) batch_tasks = self.generate_tasks( points=self.points.iloc[batch * self.batchsize : end] ) parameters = { v[0]: v[1].parameters.get_values() for v in batch_tasks } parameters = pd.DataFrame(parameters).T.to_csv( self.output_folder / f"parameters_{batch}.csv", index_label="id" ) # Execute those tasks raw_outputs = msbatchprocessing.parallel_processor( tasks=batch_tasks, worker=self.worker_function, cpu_count=self.cpu_count, ) # Save the outputs to disk pd_outputs = self.transform_outputs(raw_outputs, batch=batch) if self.save_to_disk: self.save_outputs(pd_outputs, batch=batch) else: all_outputs[batch] = pd_outputs # Clean up batch resources del raw_outputs gc.collect() except Exception as e: logger.error(f"Error processing batch {batch}: {str(e)}") raise except Exception as e: logger.error(f"Error in sampling process: {str(e)}") raise finally: # Clean up any remaining resources logger.info("Performing final cleanup") if hasattr(self, "tasks"): del self.tasks gc.collect() if not self.save_to_disk: names = ["batch", *all_outputs[0].index.names] return pd.concat(all_outputs, axis=0, names=names)
[docs] def transform_outputs(self, raw_outputs: list, batch: int): """Concatenate the raw outputs into a single pandas dataframe Parameters ---------- raw_outputs: list List of outputs from the parallel processing. By default, batchprocessing.timeseries_worker returns a tuple of (*task_arguments, output) batch: int Batch number to save the outputs. Assumes that the batchsize is constant. Returns ------- output: pd.DataFrame """ index_names = list(raw_outputs[0][-1].index.names) if all(x is None for x in index_names): index_names = [f"index{i+1}" for i in range(len(index_names))] data = {v[0]: v[-1] for v in raw_outputs} data = pd.concat( data.values(), keys=data.keys(), names=["ID"] + index_names, axis=0 ) return data
[docs] def save_outputs(self, data: pd.DataFrame, batch: int): """Save the raw outputs to disk. The model's outputs are in the form of a pandas DataFrame. This method should save the outputs to disk in a format that can be easily read back in later. Generically, it writes a CSV file with the outputs in a MultiIndex format. However, this can be overwritten to save in a different format. Parameters ---------- data: pd.DataFrame The samples run in this dataset batch: int Batch number to save the outputs. Assumes that the batchsize is constant. """ # Concatenate the outputs if self.output_filetype == "csv": data.to_csv( self.output_folder / f"outputs_{batch}.csv", compression=self.output_compression, ) elif self.output_filetype == "parquet": data.to_parquet( self.output_folder / f"outputs_{batch}.parquet", compression=self.output_compression, ) else: raise ValueError(f"Invalid output filetype: {self.output_filetype}")
[docs] def verify_bounds(self, bounds: dict) -> None: """Verify that the bounds are correctly set, in particular 0. Check that the parameters are in the model 1. That there is a lower and upper bound for each parameter 2. That the lower bound is smaller than the upper bound 3. That the bounds are in the correct order 4. If the bounds are in logspace, that the bounds are either both positive or both negative 5. If the bounds are in logspace, that either bound is not zero Parameters ---------- bounds: dict[str, tuple] Dictionary containing the bounds for each parameter to be sampled logspace: bool Whether to sample the parameters in logspace Returns ------- None Raises ------ ValueError If the bounds are not correctly set """ # Check that the bounds are correctly set for param, bound in bounds.items(): if param not in self.model.parameters: raise ValueError(f"Parameter {param} not in the model's parameters") if len(bound) != 2: raise ValueError( f"Bounds should be a list-like of length 2. {param}: {bound}" ) if self.logspace and (bound[0] < 0) != (bound[1] < 0): msg = "Bounds should be either both positive or both negative" raise ValueError(f"{msg}. {param}: {bound}") if self.logspace and (bound[0] == 0 or bound[1] == 0): raise ValueError( f"Bounds cannot be zero when using logspace. {param}: {bound}" ) if bound[0] >= bound[1]: msg = "Lower bound should be smaller than the upper bound" raise ValueError(f"{msg}. {param}: {bound}")