diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 5d3143e2b..f74638ac2 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -29,7 +29,9 @@ def find_port(): return sock.getsockname()[1] -def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int): +def manager( # noqa: C901 + url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int +): """ Really simple manager for distributed processing that uses a builder prechunk to modify the builder and send out modified builders for each worker to run. @@ -43,15 +45,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w raise ValueError("Both num_chunks and num_workers must be non-zero") logger.info(f"Binding to Manager URL {url}:{port}") - context = zmq.Context() - context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1) - context.setsockopt(opt=zmq.SNDHWM, value=0) - context.setsockopt(opt=zmq.RCVHWM, value=0) - socket = context.socket(zmq.ROUTER) - socket.bind(f"{url}:{port}") - poll = zmq.Poller() - poll.register(socket, zmq.POLLIN) + # Setup socket and polling + socket, poll = setup(url, port) workers = {} # type: ignore @@ -63,7 +59,10 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w try: builder.connect() - chunk_dicts = [{"chunk": d, "distributed": False, "completed": False} for d in builder.prechunk(num_chunks)] + chunk_dicts = [ + {"chunk": d, "distributed": False, "completed": False} + for d in builder.prechunk(num_chunks) + ] pbar_distributed = tqdm( total=len(chunk_dicts), desc="Distributed chunks for {}".format(builder.__class__.__name__), @@ -78,7 +77,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w except NotImplementedError: attempt_graceful_shutdown(workers, socket) - raise RuntimeError(f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists.") + raise RuntimeError( + f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists." + ) completed = False @@ -117,7 +118,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w # If everything is distributed, send EXIT to the worker if all(chunk["distributed"] for chunk in chunk_dicts): - logger.debug(f"Sending exit signal to worker: {msg.split('_')[1]}") + logger.debug( + f"Sending exit signal to worker: {msg.split('_')[1]}" + ) socket.send_multipart([identity, b"", b"EXIT"]) workers.pop(identity) @@ -125,7 +128,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w # Remove worker and requeue work sent to it attempt_graceful_shutdown(workers, socket) raise RuntimeError( - "At least one worker has stopped with error message: {}".format(msg.split("_")[1]) + "At least one worker has stopped with error message: {}".format( + msg.split("_")[1] + ) ) elif msg == "PING": @@ -135,7 +140,8 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w workers[identity]["heartbeats"] += 1 # Decide if any workers are dead and need to be removed - handle_dead_workers(workers, socket) + if settings.WORKER_TIMEOUT is not None: + handle_dead_workers(workers, socket) for work_index, chunk_dict in enumerate(chunk_dicts): if not chunk_dict["distributed"]: @@ -165,6 +171,19 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w attempt_graceful_shutdown(workers, socket) +def setup(url, port): + context = zmq.Context() + context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1) + context.setsockopt(opt=zmq.SNDHWM, value=0) + context.setsockopt(opt=zmq.RCVHWM, value=0) + socket = context.socket(zmq.ROUTER) + socket.bind(f"{url}:{port}") + + poll = zmq.Poller() + poll.register(socket, zmq.POLLIN) + return socket, poll + + def attempt_graceful_shutdown(workers, socket): for identity in workers: socket.send_multipart([identity, b"", b"EXIT"]) @@ -199,7 +218,9 @@ def handle_dead_workers(workers, socket): z_score = 0.6745 * (workers[identity]["heartbeats"] - median) / mad if z_score <= -3.5: attempt_graceful_shutdown(workers, socket) - raise RuntimeError("At least one worker has timed out. Stopping distributed build.") + raise RuntimeError( + "At least one worker has timed out. Stopping distributed build." + ) def worker(url: str, port: int, num_processes: int, no_bars: bool): @@ -210,7 +231,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool): identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000)) logger = getLogger(f"Worker {identity}") - logger.info(f"Connnecting to Manager at {url}:{port}") + logger.info(f"Connecting to Manager at {url}:{port}") context = zmq.Context() socket: zmq.Socket = context.socket(zmq.REQ) @@ -275,4 +296,6 @@ def ping_manager(socket, poller): message: bytes = socket.recv() if message.decode("utf-8") != "PONG": socket.close() - raise RuntimeError("Stopping work as manager did not respond to heartbeat from worker.") + raise RuntimeError( + "Stopping work as manager did not respond to heartbeat from worker." + ) diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py index 674b6f3a0..98e745e60 100644 --- a/src/maggma/cli/settings.py +++ b/src/maggma/cli/settings.py @@ -2,14 +2,13 @@ class CLISettings(BaseSettings): - WORKER_TIMEOUT: int = Field( - 3600, + None, description="Timeout in seconds for a distributed worker", ) MANAGER_TIMEOUT: int = Field( - 900, + 3600, description="Timeout in seconds for the worker manager", )