"""Class designed to facilitate the sampling of the model'sparameterspace"""__author__=["Karl Naumann-Woleske"]__credits__=["Karl Naumann-Woleske"]__license__="MIT"__version__="0.1.0"__maintainer__=["Karl Naumann-Woleske"]# Default librariesimportcopyimportinspectimportloggingimportmultiprocessingasmpimportosfrompathlibimportPathimportpicklelogger=logging.getLogger(__name__)# Third-party librariesimportnumpyasnpimportpandasaspdfromtqdmimporttqdm# Custom importsimportmacrostat.models.modelasmsmodelimportmacrostat.util.batchprocessingasmsbatchprocessing
[docs]classSampler():def__init__(self,model:msmodel.Model,worker_function:callable=msbatchprocessing.timeseries_worker,output_folder:str="samples",cpu_count:int=1,batchsize:int=None,):"""Generalized class to facilitate the sampling of the model's parameterspace using python's multiprocessing library. Parameters ---------- model: msmodel.Model Model to be sampled worker_function: callable (default batchprocessing.timeseries_worker) Function to be used for the parallel processing output_folder: str (default "samples") Folder to save the output files cpu_count: int (default 1) Number of CPUs to use for the parallel processing batchsize: int (default None) Size of each batch to be processed in parallel """# Model parametersself.model=modelself.modelclass=type(model)self.base_parameters=copy.deepcopy(model.parameters)self.bounds=None# Store all possible attributes set in the modelinitargs=[iforiininspect.signature(model.__init__).parametersifi!="self"]self.model_kwargs={a:getattr(self.model,a)foraininitargsifa!="parameters"}# Computation parametersself.worker_function=worker_functionself.cpu_count=min([mp.cpu_count(),cpu_count])self.batchsize=batchsizeself.output_folder=Path(output_folder)os.makedirs(output_folder,exist_ok=True)
[docs]defgenerate_tasks(self,*args,**kwargs)->list[tuple]:"""Generate tasks for the parallel processor. This method should return a list of tuples that will be passed to the worker function. By default, the first item in the tuple is the model object, and all remaining items are the arguments that will be passed to the model.simulate() function. """raiseNotImplementedError("This method should be implemented in a subclass")
[docs]defsample(self,tqdm_info:str="Sampling"):"""Run in parallel the sampling of the model's parameterspace by generating a set of tasks and executing them in parallel Parameters ---------- tqdm_info: str (default "Sampling") Information to be displayed in the tqdm progress bar """# Generate the tasks to runself.tasks=self.generate_tasks()# Save the parametersparameters={v[0]:v[1].parametersforvinself.tasks}parameters=pd.DataFrame(parameters).T.to_csv(self.output_folder/"parameters.csv",index_label="id")# Run the parallel processing in batches to conserve memory# This will write results to disk, clear memory, and proceedifself.batchsizeisNone:self.batchsize=len(self.tasks)batchcount=int(len(self.tasks)/self.batchsize)+(len(self.tasks)%self.batchsize>0)forbatchinrange(batchcount):# Set tasks to run nowbatch_tasks=self.tasks[batch*self.batchsize:min([(batch+1)*self.batchsize,len(self.tasks)])]# Execute those tasksraw_outputs=msbatchprocessing.parallel_processor(tasks=batch_tasks,worker=self.worker_function,cpu_count=self.cpu_count,tqdm_info=tqdm_info)# Save the outputs to diskself.save_outputs(raw_outputs,batch=batch)
[docs]defsave_outputs(self,raw_outputs:list,batch:int):""" Save the raw outputs to disk. The model's outputs are in the form of a pandas DataFrame. This method should save the outputs to disk in a format that can be easily read back in later. Generically, it writes a CSV file with the outputs in a MultiIndex format. However, this can be overwritten to save in a different format. Parameters ---------- raw_outputs: list List of outputs from the parallel processing. By default, batchprocessing.timeseries_worker returns a tuple of (*task_arguments, output) batch: int (default None) Batch number to save the outputs. Assumes that the batchsize is constant. """# Concatenate the outputsindex_names=list(raw_outputs[0][-1].index.names)data={v[0]:v[-1]forvinraw_outputs}data=pd.concat(data.values(),keys=data.keys(),names=["ID"]+index_names,axis=0)self.index_count=data.index.nlevelsself.header_count=data.columns.nlevels# Check if the file existsoutputfile=self.output_folder/"outputs.csv"ifnotos.path.exists(outputfile):data.to_csv(outputfile,header=True)else:data.to_csv(outputfile,mode="a",header=False)
[docs]defextract(self,columns:list=None,indices:list=None,chunksize:int=100000):"""Extract the results from the output file. The function uses a pandas chunkreader to extract the data from the output file. It is possible to extract only a subset of the columns, parameter IDs, or indices. This reduces the memory footprint when dealing with a large number of parameterizations. Parameters ---------- columns: list List of columns to extract pids: list List of parameter IDs to extract i.e. the batch number indices: list List of indices to extract chunksize: int (default 100000) Chunksize to read in the data """filename="outputs.csv"header_count=1ifcolumnsisnotNoneandisinstance(columns[0],tuple):header_count=len(columns[0])index_count=1ifindicesisnotNoneandisinstance(indices[0],tuple):index_count=len(indices[0])csv_kwargs=dict(header=np.arange(header_count),index_col=np.arange(index_count+1))# Get the columns to extract from the fileheader=pd.read_csv(self.output_folder/filename,nrows=0,**csv_kwargs)column_targets=header.columnsifcolumnsisNoneelsecolumns# Get the indices to extract from the file (add slice(None) to the front)# The indices may be a list of tuples or of non-iterable objectsifindicesisnotNone:index_targets=indiceselse:index_targets=None# Read in chunksreader=pd.read_csv(self.output_folder/filename,chunksize=chunksize,iterator=True,**csv_kwargs)# Extract the dataoutput=[]fori,chunkintqdm(enumerate(reader),desc="Chunk Reading"):# Match the columnsifcolumn_targetsisnotNone:ix=chunk.columns.isin(column_targets)chunk=chunk.loc[:,ix]# Match the indexifindex_targetsisnotNone:ifindex_count==1:chunk=chunk.loc[chunk.index.isin(index_targets)]else:masks=[True*np.ones(chunk.shape[0])]foriinnp.arange(index_count):masks.append(chunk.index.isin([j[i]forjinindex_targets],level=i))chunk=chunk.loc[np.all(masks,axis=0),:]output.append(chunk)output=pd.concat(output,axis=0)returnoutput
[docs]defsave(self,name:str="sampler"):"""Save the Sampler object as a PKL for later use"""filename=f"{self.output_folder}{os.sep}{name}.pkl"withopen(filename,"wb")asf:pickle.dump(self,f)
[docs]@classmethoddefload(cls,filename):"""Class method to load an instance of Sampler. Usage: sampler = Sampler.load(filename) Parameters ---------- filename: str or Path path to the targeted Sampler """withopen(filename,"rb")asf:new=pickle.load(f)returnnew