diff --git a/ema_workbench/em_framework/evaluators.py b/ema_workbench/em_framework/evaluators.py index 18d208c8b..b8c8ec063 100644 --- a/ema_workbench/em_framework/evaluators.py +++ b/ema_workbench/em_framework/evaluators.py @@ -13,6 +13,7 @@ import sys import threading import warnings +import logging from ema_workbench.em_framework.samplers import AbstractSampler from .callbacks import DefaultCallback @@ -419,10 +420,13 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial experiment_runner = None -def mpi_initializer(models): +def mpi_initializer(models, logger_level): global experiment_runner experiment_runner = ExperimentRunner(models) + # Configure logger based on the passed level and adjusted format + logging.basicConfig(level=logger_level, format="[%(processName)s/%(levelname)s] %(message)s") + class MPIEvaluator(BaseEvaluator): """Evaluator for experiments using MPI Pool Executor from mpi4py""" @@ -440,8 +444,12 @@ def initialize(self): models.extend(self._msis) # Use the initializer function to set up the ExperimentRunner for all the worker processes - self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models,)) + self._pool = MPIPoolExecutor(initializer=mpi_initializer, initargs=(models, _logger.level)) _logger.info(f"MPI pool started with {self._pool._max_workers} workers") + if self._pool._max_workers <= 10: + _logger.warning( + f"With only a few workers ({self._pool._max_workers}), the MPIEvaluator may be slower than the Sequential- or MultiprocessingEvaluator" + ) return self def finalize(self): @@ -456,18 +464,30 @@ def evaluate_experiments(self, scenarios, policies, callback, combine="factorial packed = [(experiment, experiment.model_name) for experiment in experiments] # Use the pool to execute in parallel + _logger.info( + f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers" + ) results = self._pool.map(run_experiment_mpi, packed) + _logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments") for experiment, outcomes in results: callback(experiment, outcomes) + _logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments") def run_experiment_mpi(packed_data): + from mpi4py.MPI import COMM_WORLD + + rank = COMM_WORLD.Get_rank() + experiment, model_name = packed_data + _logger.info(f"MPI Rank {rank}: starting {repr(experiment)}") # Use the global ExperimentRunner created by the initializer outcomes = experiment_runner.run_experiment(experiment) + _logger.info(f"MPI Rank {rank}: completed {experiment}") + return experiment, outcomes diff --git a/ema_workbench/util/ema_logging.py b/ema_workbench/util/ema_logging.py index 47538eb9e..678746ea9 100644 --- a/ema_workbench/util/ema_logging.py +++ b/ema_workbench/util/ema_logging.py @@ -178,7 +178,7 @@ def get_rootlogger(): return _rootlogger -def log_to_stderr(level=None): +def log_to_stderr(level=None, set_root_logger_levels=False): """ Turn on logging and add a handler which prints to stderr @@ -206,4 +206,8 @@ def log_to_stderr(level=None): logger.addHandler(handler) logger.propagate = False + if set_root_logger_levels: + for _, mod_logger in _module_loggers.items(): + mod_logger.setLevel(level) + return logger