Skip to content

Commit

Permalink
[WIP] MPI update (#328)
Browse files Browse the repository at this point in the history
* adds a simple example using the MPI evaluator
* Make MPI work with WorkingDirectoryModels
* Code reorganization, moving mpi-specific code into a separate module
* Adds an initializer for mpi worker processes
* update to tests to cover new code
* update to docs and mpi tutorial to reflect updated code

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
quaquel and pre-commit-ci[bot] authored Dec 20, 2023
1 parent f132400 commit a968cc8
Show file tree
Hide file tree
Showing 10 changed files with 526 additions and 215 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
run:
pytest --ignore=./test/test_connectors -v --cov=ema_workbench/em_framework --cov=ema_workbench/util --cov=ema_workbench/analysis
- name: Coveralls
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.10'
if: matrix.os == 'ubuntu-latest' && matrix.python-version == '3.11'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: coveralls --service=github
Expand Down
295 changes: 158 additions & 137 deletions docs/source/indepth_tutorial/mpi-evaluator.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ema_workbench/em_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
SequentialEvaluator,
Samplers,
)

from .optimization import (
Convergence,
EpsilonProgress,
Expand Down
9 changes: 7 additions & 2 deletions ema_workbench/em_framework/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def finalize(self):
"""finalize the evaluator"""
raise NotImplementedError

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
"""used by ema_workbench"""
raise NotImplementedError

Expand Down Expand Up @@ -181,6 +181,7 @@ def perform_experiments(
lever_sampling=Samplers.LHS,
callback=None,
combine="factorial",
**kwargs,
):
"""convenience method for performing experiments.
Expand All @@ -202,6 +203,7 @@ def perform_experiments(
lever_sampling=lever_sampling,
callback=callback,
combine=combine,
**kwargs,
)

def optimize(
Expand Down Expand Up @@ -307,6 +309,7 @@ def perform_experiments(
return_callback=False,
combine="factorial",
log_progress=False,
**kwargs,
):
"""sample uncertainties and levers, and perform the resulting experiments
on each of the models
Expand Down Expand Up @@ -336,6 +339,8 @@ def perform_experiments(
then combined by cycling over the shortest of the the two sets
of designs until the longest set of designs is exhausted.
Additional keyword arguments are passed on to evaluate_experiments of the evaluator
Returns
-------
tuple
Expand Down Expand Up @@ -397,7 +402,7 @@ def perform_experiments(
if not evaluator:
evaluator = SequentialEvaluator(models)

evaluator.evaluate_experiments(scenarios, policies, callback, combine=combine)
evaluator.evaluate_experiments(scenarios, policies, callback, combine=combine, **kwargs)

if callback.i != nr_of_exp:
raise EMAError(
Expand Down
4 changes: 2 additions & 2 deletions ema_workbench/em_framework/futures_ipyparallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,11 @@ def finalize(self):
self.logwatcher.stop()
cleanup(self.client)

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)

lb_view = self.client.load_balanced_view()
results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False)
results = lb_view.map(_run_experiment, ex_gen, ordered=False, block=False, **kwargs)

for entry in results:
callback(*entry)
163 changes: 143 additions & 20 deletions ema_workbench/em_framework/futures_mpi.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,144 @@
import atexit
import copy
import logging
import os
import shutil
import threading
import time
import warnings

from logging.handlers import QueueHandler

from .evaluators import BaseEvaluator, experiment_generator
from .futures_util import setup_working_directories, finalizer, determine_rootdir
from .util import NamedObjectMap
from .model import AbstractModel
from .experiment_runner import ExperimentRunner
from ..util import get_module_logger
from ..util import get_module_logger, get_rootlogger

from ..util import ema_logging

__all__ = ["MPIEvaluator"]

_logger = get_module_logger(__name__)

experiment_runner = None

def run_experiment_mpi(packed_data):
from mpi4py.MPI import COMM_WORLD

rank = COMM_WORLD.Get_rank()
class RankFilter(logging.Filter):
"""Filter for adding mpi rank to log message"""

experiment, model_name, msis = packed_data
_logger.debug(f"MPI Rank {rank}: starting {repr(experiment)}")
def __init__(self, rank):
super().__init__()
self.rank = rank

models = NamedObjectMap(AbstractModel)
models.extend(msis)
experiment_runner = ExperimentRunner(models)
def filter(self, record):
record.rank = self.rank
return True


def mpi_initializer(models, log_level, root_dir):
global experiment_runner
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()

# setup the experiment runner
msis = NamedObjectMap(AbstractModel)
msis.extend(models)
experiment_runner = ExperimentRunner(msis)

# setup the logging
info = MPI.INFO_NULL
service = "logwatcher"
port = MPI.Lookup_name(service)
logcomm = MPI.COMM_WORLD.Connect(port, info, 0)

root_logger = get_rootlogger()

handler = MPIHandler(logcomm)
handler.addFilter(RankFilter(rank))
handler.setLevel(log_level)
handler.setFormatter(logging.Formatter("[worker %(rank)s/%(levelname)s] %(message)s"))
root_logger.addHandler(handler)

# setup the working directories
tmpdir = setup_working_directories(models, root_dir)
if tmpdir:
atexit.register(finalizer, os.path.abspath(tmpdir))

# _logger.info(f"worker {rank} initialized")
root_logger.info(f"worker {rank} initialized")


def logwatcher(stop_event):
from mpi4py import MPI

rank = MPI.COMM_WORLD.Get_rank()

info = MPI.INFO_NULL
port = MPI.Open_port(info)
# print(f"client: {rank} {port}")
_logger.debug(f"opened port: {port}")

service = "logwatcher"
MPI.Publish_name(service, info, port)
_logger.debug(f"published service: {service}")

root = 0
_logger.debug("waiting for client connection...")
comm = MPI.COMM_WORLD.Accept(port, info, root)
_logger.debug("client connected...")

while not stop_event.is_set():
if rank == root:
record = comm.recv(None, MPI.ANY_SOURCE, tag=0)
try:
logger = logging.getLogger(record.name)
except Exception as e:
# AttributeError if record does not have a name attribute
# TypeError record.name is not a string
raise e
else:
logger.callHandlers(record)


def run_experiment_mpi(experiment):
_logger.debug(f"starting {experiment.experiment_id}")

outcomes = experiment_runner.run_experiment(experiment)

_logger.debug(f"MPI Rank {rank}: completed {experiment}")
_logger.debug(f"completed {experiment.experiment_id}")

return experiment, outcomes


class MPIHandler(QueueHandler):
"""
This handler sends events from the worker process to the master process
"""

def __init__(self, communicator):
"""
Initialise an instance, using the passed queue.
"""
logging.Handler.__init__(self)
self.communicator = communicator

def emit(self, record):
"""
Emit a record.
Writes the LogRecord to the queue, preparing it for pickling first.
"""
record = self.prepare(record)
try:
self.communicator.send(record, 0, 0)
except Exception:
self.handleError(record)


class MPIEvaluator(BaseEvaluator):
"""Evaluator for experiments using MPI Pool Executor from mpi4py"""

Expand All @@ -43,12 +150,27 @@ def __init__(self, msis, n_processes=None, **kwargs):
FutureWarning,
)
self._pool = None
self.root_dir = None
self.stop_event = None
self.n_processes = n_processes

def initialize(self):
# Only import mpi4py if the MPIEvaluator is used, to avoid unnecessary dependencies.
from mpi4py.futures import MPIPoolExecutor

self.stop_event = threading.Event()
self.logwatcher_thread = threading.Thread(
name="logwatcher", target=logwatcher, daemon=True, args=(self.stop_event,)
)
self.logwatcher_thread.start()

self.root_dir = determine_rootdir(self._msis)
self._pool = MPIPoolExecutor(
max_workers=self.n_processes,
initializer=mpi_initializer,
initargs=(self._msis, _logger.level, self.root_dir),
) # Removed initializer arguments

self._pool = MPIPoolExecutor(max_workers=self.n_processes) # Removed initializer arguments
_logger.info(f"MPI pool started with {self._pool._max_workers} workers")
if self._pool._max_workers <= 10:
Expand All @@ -59,20 +181,21 @@ def initialize(self):

def finalize(self):
self._pool.shutdown()
self.stop_event.set()
_logger.info("MPI pool has been shut down")

def evaluate_experiments(self, scenarios, policies, callback, combine="factorial"):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)
if self.root_dir:
shutil.rmtree(self.root_dir)

packed = [(experiment, experiment.model_name, self._msis) for experiment in experiments]
time.sleep(0.1)
_logger.info("MPI pool has been shut down")

_logger.info(
f"MPIEvaluator: Starting {len(packed)} experiments using MPI pool with {self._pool._max_workers} workers"
)
results = self._pool.map(run_experiment_mpi, packed)
def evaluate_experiments(self, scenarios, policies, callback, combine="factorial", **kwargs):
ex_gen = experiment_generator(scenarios, self._msis, policies, combine=combine)
experiments = list(ex_gen)

_logger.info(f"MPIEvaluator: Completed all {len(packed)} experiments")
results = self._pool.map(run_experiment_mpi, experiments, **kwargs)
for experiment, outcomes in results:
callback(experiment, outcomes)
_logger.info(f"MPIEvaluator: Callback completed for all {len(packed)} experiments")

_logger.info(f"MPIEvaluator: Completed all {len(experiments)} experiments")
19 changes: 11 additions & 8 deletions ema_workbench/examples/example_mpi_lake_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
"""
import math
import sys
import time

# FIXME
import sys

sys.path.insert(0, "/Users/jhkwakkel/Documents/GitHub/EMAworkbench")

import numpy as np
Expand All @@ -22,8 +23,8 @@
Constant,
ema_logging,
MPIEvaluator,
save_results,
)
from ema_workbench.em_framework.evaluators import Samplers


def lake_problem(
Expand Down Expand Up @@ -86,10 +87,10 @@ def lake_problem(


if __name__ == "__main__":
# run with mpiexec -n 4 python -m mpi4py.futures example_mpi_lake_model.py
starttime = time.time()
# run with mpiexec -n 1 -usize {ntasks} python example_mpi_lake_model.py
starttime = time.perf_counter()

ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=False)
ema_logging.log_to_stderr(ema_logging.INFO, pass_root_logger_level=True)

# instantiate the model
lake_model = Model("lakeproblem", function=lake_problem)
Expand Down Expand Up @@ -119,10 +120,12 @@ def lake_problem(
lake_model.constants = [Constant("alpha", 0.41), Constant("nsamples", 150)]

# generate some random policies by sampling over levers
n_scenarios = 1000
n_scenarios = 10000
n_policies = 4

with MPIEvaluator(lake_model) as evaluator:
res = evaluator.perform_experiments(n_scenarios, n_policies)
res = evaluator.perform_experiments(n_scenarios, n_policies, chunksize=250)

save_results(res, "test.tar.gz")

print(time.time() - starttime)
print(time.perf_counter() - starttime)
22 changes: 22 additions & 0 deletions ema_workbench/examples/slurm_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

#SBATCH --job-name="Python_test"
#SBATCH --time=00:02:00
#SBATCH --ntasks=10
#SBATCH --cpus-per-task=1
#SBATCH --partition=compute
#SBATCH --mem-per-cpu=4GB
#SBATCH --account=research-tpm-mas

module load 2023r1
module load openmpi
module load python
module load py-numpy
module load py-scipy
module load py-mpi4py
module load py-pip

pip install ipyparallel
pip install --user -e git+https://github.com/quaquel/EMAworkbench@mpi_update#egg=ema-workbench

mpiexec -n 1 python3 example_mpi_lake_model.py
Binary file added ema_workbench/examples/test.tar.gz
Binary file not shown.
Loading

0 comments on commit a968cc8

Please sign in to comment.