Source code for macrostat.util.batchprocessing
"""
Batch processing functionionality
"""
__author__ = ["Karl Naumann-Woleske"]
__credits__ = ["Karl Naumann-Woleske"]
__license__ = "MIT"
__version__ = "0.1.0"
__maintainer__ = ["Karl Naumann-Woleske"]
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
[docs]
def timeseries_worker(task: tuple):
"""Worker function for parallel_processor, which will execute a
simulation with the given parameters and return the output.
Parameters
----------
task : tuple
Tuple of (name, model, *args) where name is the name of the
simulation, model is the model to be simulated and *args are
the arguments to be passed to the model's simulate method.
Returns
-------
tuple
Tuple of (name, *args, output) where name is the name of the
simulation, *args are the arguments passed to the model's
simulate method and output is the output of the simulation.
"""
model = task[1]
_ = model.simulate(*task[2:])
return (task[0], *task[2:], model.output)
[docs]
def parallel_processor(
tasks: list = [],
worker: callable = timeseries_worker,
cpu_count: int = 1,
tqdm_info: str = "",
):
"""Run all of the tasks in parallel using the
ProcessPoolExecutor.
Parameters
----------
tasks : list[tuple]
List of tasks to be processed in parallel.
Each task should be a tuple
worker : callable
Worker function to be used for the parallel processing.
Each task will be passed to the worker function as a tuple
cpu_count : int (default=1)
Number of CPUs to be used for the parallel processing.
tqdm_info : str (default="")
Information to be displayed in the tqdm progress bar.
Returns
-------
list
List of tuple results from the worker function
"""
if len(tasks) == 0:
raise ValueError("No tasks to process.")
results = []
process_count = min(cpu_count, len(tasks))
tqdmargs = dict(total=len(tasks), desc=tqdm_info)
with ProcessPoolExecutor(max_workers=process_count) as executor:
for i in tqdm(executor.map(worker, tasks), **tqdmargs):
results.append(i)
return results