Skip to content

Commit

Permalink
Refactor MPIEvaluator to optimize model handling and runner instantia…
Browse files Browse the repository at this point in the history
…tion

This commit introduces two primary architectural modifications to the MPIEvaluator class, aimed at improving efficiency when running experiments on HPC systems:

1. **Singleton ExperimentRunner**:
    Previously, the `ExperimentRunner` was instantiated for every individual experiment. This approach could introduce unnecessary overhead, especially when dealing with a large number of experiments. Instead, we've adopted a pattern where the `ExperimentRunner` is instantiated once and shared among all the worker processes in the MPI pool. This is achieved using an initializer function `mpi_initializer` which sets up a global `ExperimentRunner` for all the worker processes.

2. **Optimized Model Packing**:
    Before this change, all models were packed and sent with each experiment. This was potentially inefficient, especially when the size of the model objects was large. Now, we've altered the architecture to send only the model name with each experiment. Since the `ExperimentRunner` already has access to all models (being initialized once with all of them), it can easily fetch the necessary model using the provided model name.

The primary motivation behind these changes is to reduce the overhead related to object instantiation and data transfer, especially when running experiments on large-scale HPC systems with SLURM.
  • Loading branch information
EwoutH committed Sep 18, 2023
1 parent 247c576 commit b8bf0e7
Showing 1 changed file with 22 additions and 13 deletions.
35 changes: 22 additions & 13 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import threading
import warnings

from mpi4py.futures import MPIPoolExecutor

from ema_workbench.em_framework.samplers import AbstractSampler
from .callbacks import DefaultCallback
from .points import experiment_generator, Scenario, Policy
Expand Down Expand Up @@ -415,17 +417,29 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
add_tasks(self.n_processes, self._pool, ex_gen, callback)


# Create a global ExperimentRunner that will be used by all the worker processes
experiment_runner = None


def mpi_initializer(models):
global experiment_runner
experiment_runner = ExperimentRunner(models)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

def __init__(self, msis, **kwargs):
from mpi4py.futures import MPIPoolExecutor

super().__init__(msis, **kwargs)
self._pool = None

def initialize(self):
self._pool = MPIPoolExecutor()
# Instead of instantiating the ExperimentRunner for each experiment, instantiate it once here
models = NamedObjectMap(AbstractModel)
models.extend(self._msis)

# Use the initializer function to set up the ExperimentRunner for all the worker processes
self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,))
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
return self

Expand All @@ -437,12 +451,8 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen) # Convert generator to list

# Create the model map just like in SequentialEvaluator
models = NamedObjectMap(AbstractModel)
models.extend(self._msis)

# Pack models with each experiment
packed = [(experiment, models) for experiment in experiments]
# Instead of sending all models for each experiment, send only the model_name
packed = [(experiment, experiment.model_name) for experiment in experiments]

# Use the pool to execute in parallel
results = self._pool.map(run_experiment_mpi, packed)
Expand All @@ -452,11 +462,10 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial


def run_experiment_mpi(packed_data):
experiment, all_models = packed_data
model = all_models[experiment.model_name]
experiment, model_name = packed_data

runner = ExperimentRunner({experiment.model_name: model})
outcomes = runner.run_experiment(experiment)
# Use the global ExperimentRunner created by the initializer
outcomes = experiment_runner.run_experiment(experiment)

return experiment, outcomes

Expand Down

0 comments on commit b8bf0e7

Please sign in to comment.