diff --git a/ema_workbench/em_framework/evaluators.py b/ema_workbench/em_framework/evaluators.py index f0035b1bf..fc64b86fd 100644 --- a/ema_workbench/em_framework/evaluators.py +++ b/ema_workbench/em_framework/evaluators.py @@ -14,6 +14,8 @@ import threading import warnings +from mpi4py.futures import MPIPoolExecutor + from ema_workbench.em_framework.samplers import AbstractSampler from .callbacks import DefaultCallback from .points import experiment_generator, Scenario, Policy @@ -415,17 +417,29 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial add_tasks(self.n_processes, self._pool, ex_gen, callback) +# Create a global ExperimentRunner that will be used by all the worker processes +experiment_runner = None + + +def mpi_initializer(models): + global experiment_runner + experiment_runner = ExperimentRunner(models) + + class MPIEvaluator(BaseEvaluator): """Evaluator for experiments using MPI Pool Executor from mpi4py""" def __init__(self, msis, **kwargs): - from mpi4py.futures import MPIPoolExecutor - super().__init__(msis, **kwargs) self._pool = None def initialize(self): - self._pool = MPIPoolExecutor() + # Instead of instantiating the ExperimentRunner for each experiment, instantiate it once here + models = NamedObjectMap(AbstractModel) + models.extend(self._msis) + + # Use the initializer function to set up the ExperimentRunner for all the worker processes + self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,)) _logger.info(f"MPI pool started with {self._pool._max_workers} workers") return self @@ -437,12 +451,8 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine) experiments = list(ex_gen) # Convert generator to list - # Create the model map just like in SequentialEvaluator - models = NamedObjectMap(AbstractModel) - models.extend(self._msis) - - # Pack models with each experiment - packed = [(experiment, models) for experiment in experiments] + # Instead of sending all models for each experiment, send only the model_name + packed = [(experiment, experiment.model_name) for experiment in experiments] # Use the pool to execute in parallel results = self._pool.map(run_experiment_mpi, packed) @@ -452,11 +462,10 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial def run_experiment_mpi(packed_data): - experiment, all_models = packed_data - model = all_models[experiment.model_name] + experiment, model_name = packed_data - runner = ExperimentRunner({experiment.model_name: model}) - outcomes = runner.run_experiment(experiment) + # Use the global ExperimentRunner created by the initializer + outcomes = experiment_runner.run_experiment(experiment) return experiment, outcomes