From 4b3ee47822d4423e02ee13f7b584871c9650bb3b Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 6 Jun 2023 13:22:58 -0700 Subject: [PATCH 1/5] Disable worker timeoutes by default --- src/maggma/cli/distributed.py | 32 ++++++++++++++++++++++++-------- src/maggma/cli/settings.py | 3 +-- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 5d3143e2b..7211b060e 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -29,7 +29,9 @@ def find_port(): return sock.getsockname()[1] -def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int): +def manager( + url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int +): """ Really simple manager for distributed processing that uses a builder prechunk to modify the builder and send out modified builders for each worker to run. @@ -63,7 +65,10 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w try: builder.connect() - chunk_dicts = [{"chunk": d, "distributed": False, "completed": False} for d in builder.prechunk(num_chunks)] + chunk_dicts = [ + {"chunk": d, "distributed": False, "completed": False} + for d in builder.prechunk(num_chunks) + ] pbar_distributed = tqdm( total=len(chunk_dicts), desc="Distributed chunks for {}".format(builder.__class__.__name__), @@ -78,7 +83,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w except NotImplementedError: attempt_graceful_shutdown(workers, socket) - raise RuntimeError(f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists.") + raise RuntimeError( + f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists." + ) completed = False @@ -117,7 +124,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w # If everything is distributed, send EXIT to the worker if all(chunk["distributed"] for chunk in chunk_dicts): - logger.debug(f"Sending exit signal to worker: {msg.split('_')[1]}") + logger.debug( + f"Sending exit signal to worker: {msg.split('_')[1]}" + ) socket.send_multipart([identity, b"", b"EXIT"]) workers.pop(identity) @@ -125,7 +134,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w # Remove worker and requeue work sent to it attempt_graceful_shutdown(workers, socket) raise RuntimeError( - "At least one worker has stopped with error message: {}".format(msg.split("_")[1]) + "At least one worker has stopped with error message: {}".format( + msg.split("_")[1] + ) ) elif msg == "PING": @@ -135,7 +146,8 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w workers[identity]["heartbeats"] += 1 # Decide if any workers are dead and need to be removed - handle_dead_workers(workers, socket) + if settings.WORKER_TIMEOUT is not None: + handle_dead_workers(workers, socket) for work_index, chunk_dict in enumerate(chunk_dicts): if not chunk_dict["distributed"]: @@ -199,7 +211,9 @@ def handle_dead_workers(workers, socket): z_score = 0.6745 * (workers[identity]["heartbeats"] - median) / mad if z_score <= -3.5: attempt_graceful_shutdown(workers, socket) - raise RuntimeError("At least one worker has timed out. Stopping distributed build.") + raise RuntimeError( + "At least one worker has timed out. Stopping distributed build." + ) def worker(url: str, port: int, num_processes: int, no_bars: bool): @@ -275,4 +289,6 @@ def ping_manager(socket, poller): message: bytes = socket.recv() if message.decode("utf-8") != "PONG": socket.close() - raise RuntimeError("Stopping work as manager did not respond to heartbeat from worker.") + raise RuntimeError( + "Stopping work as manager did not respond to heartbeat from worker." + ) diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py index 674b6f3a0..641459ebb 100644 --- a/src/maggma/cli/settings.py +++ b/src/maggma/cli/settings.py @@ -2,9 +2,8 @@ class CLISettings(BaseSettings): - WORKER_TIMEOUT: int = Field( - 3600, + None, description="Timeout in seconds for a distributed worker", ) From 116bd45db277603064344217199ca95766c6c7ee Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 6 Jun 2023 13:30:44 -0700 Subject: [PATCH 2/5] Reduce manager complexity --- src/maggma/cli/distributed.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 7211b060e..542eb71c5 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -45,15 +45,9 @@ def manager( raise ValueError("Both num_chunks and num_workers must be non-zero") logger.info(f"Binding to Manager URL {url}:{port}") - context = zmq.Context() - context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1) - context.setsockopt(opt=zmq.SNDHWM, value=0) - context.setsockopt(opt=zmq.RCVHWM, value=0) - socket = context.socket(zmq.ROUTER) - socket.bind(f"{url}:{port}") - poll = zmq.Poller() - poll.register(socket, zmq.POLLIN) + # Setup socket and polling + socket, poll = setup(url, port) workers = {} # type: ignore @@ -177,6 +171,19 @@ def manager( attempt_graceful_shutdown(workers, socket) +def setup(url, port): + context = zmq.Context() + context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1) + context.setsockopt(opt=zmq.SNDHWM, value=0) + context.setsockopt(opt=zmq.RCVHWM, value=0) + socket = context.socket(zmq.ROUTER) + socket.bind(f"{url}:{port}") + + poll = zmq.Poller() + poll.register(socket, zmq.POLLIN) + return socket, poll + + def attempt_graceful_shutdown(workers, socket): for identity in workers: socket.send_multipart([identity, b"", b"EXIT"]) From c4f7d6e7c7ce5b2758605976d57ec335555a334f Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 6 Jun 2023 13:31:12 -0700 Subject: [PATCH 3/5] Fix typo --- src/maggma/cli/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 542eb71c5..984d8b1d0 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -231,7 +231,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool): identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000)) logger = getLogger(f"Worker {identity}") - logger.info(f"Connnecting to Manager at {url}:{port}") + logger.info(f"Connecting to Manager at {url}:{port}") context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) From d7b4c65ff81fbded2915c54b98bcfa70bd270fbb Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 6 Jun 2023 13:35:28 -0700 Subject: [PATCH 4/5] Flake8 override --- src/maggma/cli/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 984d8b1d0..f74638ac2 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -29,7 +29,7 @@ def find_port(): return sock.getsockname()[1] -def manager( +def manager( # noqa: C901 url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int ): """ From 46bc4b82242ea3ceccd6f6a1092a0fa7e3f8984e Mon Sep 17 00:00:00 2001 From: Jason Date: Tue, 6 Jun 2023 13:38:20 -0700 Subject: [PATCH 5/5] Increase manager timeout --- src/maggma/cli/settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py index 641459ebb..98e745e60 100644 --- a/src/maggma/cli/settings.py +++ b/src/maggma/cli/settings.py @@ -8,7 +8,7 @@ class CLISettings(BaseSettings): ) MANAGER_TIMEOUT: int = Field( - 900, + 3600, description="Timeout in seconds for the worker manager", )