From b8bf0e7f151cb5239da125dbf6b316a7a1123583 Mon Sep 17 00:00:00 2001 From: Ewout ter Hoeven Date: Mon, 18 Sep 2023 09:48:16 +0200 Subject: [PATCH] Refactor MPIEvaluator to optimize model handling and runner instantiation This commit introduces two primary architectural modifications to the MPIEvaluator class, aimed at improving efficiency when running experiments on HPC systems: 1. **Singleton ExperimentRunner**: Previously, the `ExperimentRunner` was instantiated for every individual experiment. This approach could introduce unnecessary overhead, especially when dealing with a large number of experiments. Instead, we've adopted a pattern where the `ExperimentRunner` is instantiated once and shared among all the worker processes in the MPI pool. This is achieved using an initializer function `mpi_initializer` which sets up a global `ExperimentRunner` for all the worker processes. 2. **Optimized Model Packing**: Before this change, all models were packed and sent with each experiment. This was potentially inefficient, especially when the size of the model objects was large. Now, we've altered the architecture to send only the model name with each experiment. Since the `ExperimentRunner` already has access to all models (being initialized once with all of them), it can easily fetch the necessary model using the provided model name. The primary motivation behind these changes is to reduce the overhead related to object instantiation and data transfer, especially when running experiments on large-scale HPC systems with SLURM. --- ema_workbench/em_framework/evaluators.py | 35 +++++++++++++++--------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/ema_workbench/em_framework/evaluators.py b/ema_workbench/em_framework/evaluators.py index f0035b1bf..fc64b86fd 100644 --- a/ema_workbench/em_framework/evaluators.py +++ b/ema_workbench/em_framework/evaluators.py @@ -14,6 +14,8 @@ import threading import warnings +from mpi4py.futures import MPIPoolExecutor + from ema_workbench.em_framework.samplers import AbstractSampler from .callbacks import DefaultCallback from .points import experiment_generator, Scenario, Policy @@ -415,17 +417,29 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial add_tasks(self.n_processes, self._pool, ex_gen, callback) +# Create a global ExperimentRunner that will be used by all the worker processes +experiment_runner = None + + +def mpi_initializer(models): + global experiment_runner + experiment_runner = ExperimentRunner(models) + + class MPIEvaluator(BaseEvaluator): """Evaluator for experiments using MPI Pool Executor from mpi4py""" def __init__(self, msis, **kwargs): - from mpi4py.futures import MPIPoolExecutor - super().__init__(msis, **kwargs) self._pool = None def initialize(self): - self._pool = MPIPoolExecutor() + # Instead of instantiating the ExperimentRunner for each experiment, instantiate it once here + models = NamedObjectMap(AbstractModel) + models.extend(self._msis) + + # Use the initializer function to set up the ExperimentRunner for all the worker processes + self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,)) _logger.info(f"MPI pool started with {self._pool._max_workers} workers") return self @@ -437,12 +451,8 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine) experiments = list(ex_gen) # Convert generator to list - # Create the model map just like in SequentialEvaluator - models = NamedObjectMap(AbstractModel) - models.extend(self._msis) - - # Pack models with each experiment - packed = [(experiment, models) for experiment in experiments] + # Instead of sending all models for each experiment, send only the model_name + packed = [(experiment, experiment.model_name) for experiment in experiments] # Use the pool to execute in parallel results = self._pool.map(run_experiment_mpi, packed) @@ -452,11 +462,10 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial def run_experiment_mpi(packed_data): - experiment, all_models = packed_data - model = all_models[experiment.model_name] + experiment, model_name = packed_data - runner = ExperimentRunner({experiment.model_name: model}) - outcomes = runner.run_experiment(experiment) + # Use the global ExperimentRunner created by the initializer + outcomes = experiment_runner.run_experiment(experiment) return experiment, outcomes