Skip to content

Commit

Permalink
Add detailed logging for MPIEvaluator in EMAworkbench
Browse files Browse the repository at this point in the history
Introduced detailed logging capabilities for the MPIEvaluator to facilitate debugging and performance tracking in distributed environments.

Key changes include:
- Configured a logger specifically for the MPIEvaluator.
- Passed logger's level to each worker process to ensure consistent logging verbosity across all nodes.
- Added specific log messages to track the progress of experiments on individual MPI ranks.
- Improved the log format to display the MPI process name alongside the log level, making it easier to identify logs from different nodes.
- Modified `log_to_stderr` in `ema_logging` to adjust log levels for root logger based on an optional flag.

With this enhancement, users can now get a clearer insight into the functioning and performance of the MPIEvaluator in HPC systems, helping in both development and operational phases.
  • Loading branch information
EwoutH committed Oct 29, 2023
1 parent 0bc9e15 commit 15845d4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
24 changes: 22 additions & 2 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import sys
import threading
import warnings
import logging

from ema_workbench.em_framework.samplers import AbstractSampler
from .callbacks import DefaultCallback
Expand Down Expand Up @@ -419,10 +420,13 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
experiment_runner = None


def mpi_initializer(models):
def mpi_initializer(models, logger_level):
global experiment_runner
experiment_runner = ExperimentRunner(models)

# Configure logger based on the passed level and adjusted format
logging.basicConfig(level=logger_level, format="[%(processName)s/%(levelname)s] %(message)s")


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""
Expand All @@ -440,8 +444,12 @@ def initialize(self):
models.extend(self._msis)

# Use the initializer function to set up the ExperimentRunner for all the worker processes
self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,))
self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models, _logger.level))
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
_logger.warning(
f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator"
)
return self

def finalize(self):
Expand All @@ -456,18 +464,30 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial
packed = [(experiment, experiment.model_name) for experiment in experiments]

# Use the pool to execute in parallel
_logger.info(
f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers"
)
results = self._pool.map(run_experiment_mpi, packed)

_logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments")
for experiment, outcomes in results:
callback(experiment, outcomes)
_logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments")


def run_experiment_mpi(packed_data):
from mpi4py.MPI import COMM_WORLD

rank = COMM_WORLD.Get_rank()

experiment, model_name = packed_data
_logger.info(f"MPI Rank {rank}: starting {repr(experiment)}")

# Use the global ExperimentRunner created by the initializer
outcomes = experiment_runner.run_experiment(experiment)

_logger.info(f"MPI Rank {rank}: completed {experiment}")

return experiment, outcomes


Expand Down
6 changes: 5 additions & 1 deletion ema_workbench/util/ema_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def get_rootlogger():
return _rootlogger


def log_to_stderr(level=None):
def log_to_stderr(level=None, set_root_logger_levels=False):
"""
Turn on logging and add a handler which prints to stderr
Expand Down Expand Up @@ -206,4 +206,8 @@ def log_to_stderr(level=None):
logger.addHandler(handler)
logger.propagate = False

if set_root_logger_levels:
for _, mod_logger in _module_loggers.items():
mod_logger.setLevel(level)

return logger

0 comments on commit 15845d4

Please sign in to comment.