From 276f67a8e8c80dcb8195c317905b13056e171a56 Mon Sep 17 00:00:00 2001 From: Jason Munro Date: Thu, 16 Feb 2023 21:20:18 -0800 Subject: [PATCH 1/4] Add CLI pydantic settings class --- src/maggma/cli/__init__.py | 20 ++++++++++++++++++++ src/maggma/cli/distributed.py | 13 ++++++------- src/maggma/cli/multiprocessing.py | 3 +-- src/maggma/cli/rabbitmq.py | 10 +++++----- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index c73a0c497..bbb4feb13 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -16,9 +16,29 @@ from maggma.cli.source_loader import ScriptFinder, load_builder_from_source from maggma.utils import ReportingHandler, TqdmLoggingHandler + +from pydantic import BaseSettings, Field + sys.meta_path.append(ScriptFinder()) +class CLISettings(BaseSettings): + + WORKER_TIMEOUT: int = Field( + 3600, + description="Timeout in seconds for a distributed worker", + ) + + MANAGER_TIMEOUT: int = Field( + 900, + description="Timeout in seconds for the worker manager", + ) + + class Config: + env_prefix = "MAGGMA_" + extra = "ignore" + + @click.command() @click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True) @click.option( diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 10dbaf458..7ea1c2169 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -13,15 +13,14 @@ from monty.json import jsanitize from monty.serialization import MontyDecoder -from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT +from maggma.cli.multiprocessing import multi from maggma.core import Builder from maggma.utils import tqdm +from maggma.cli import CLISettings import zmq -import zmq.asyncio as azmq - -WORKER_TIMEOUT = 5400 # max timeout in seconds for a worker +settings = CLISettings() def find_port(): sock = pysocket.socket() @@ -175,7 +174,7 @@ def handle_dead_workers(workers, socket): if len(workers) == 1: # Use global timeout identity = list(workers.keys())[0] - if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT: + if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT: attempt_graceful_shutdown(workers, socket) raise RuntimeError("Worker has timed out. Stopping distributed build.") @@ -229,7 +228,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool): socket.send("READY_{}".format(hostname).encode("utf-8")) # Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout - connections = dict(poller.poll(MANAGER_TIMEOUT * 1000)) + connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000)) if not connections: socket.close() raise RuntimeError("Stopping work as manager timed out.") @@ -267,7 +266,7 @@ def ping_manager(socket, poller): socket.send_string("PING") # Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout - connections = dict(poller.poll(MANAGER_TIMEOUT * 1000)) + connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000)) if not connections: socket.close() raise RuntimeError("Stopping work as manager timed out.") diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index ddc0b583a..912a99cb5 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -16,8 +16,7 @@ from tqdm import tqdm from maggma.utils import primed - -MANAGER_TIMEOUT = 300 # max timeout in seconds for manager +from maggma.cli import CLISettings logger = getLogger("MultiProcessor") diff --git a/src/maggma/cli/rabbitmq.py b/src/maggma/cli/rabbitmq.py index 9623574dc..c61d26da5 100644 --- a/src/maggma/cli/rabbitmq.py +++ b/src/maggma/cli/rabbitmq.py @@ -13,17 +13,17 @@ from monty.json import jsanitize from monty.serialization import MontyDecoder -from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT +from maggma.cli.multiprocessing import multi from maggma.core import Builder from maggma.utils import tqdm, Timeout +from maggma.cli import CLISettings try: import pika except ImportError: raise ImportError("Both pika and aio-pika are required to use RabbitMQ as a broker") -WORKER_TIMEOUT = 3600 # max timeout in seconds for a worker - +settings = CLISettings() def find_port(): sock = pysocket.socket() @@ -192,7 +192,7 @@ def handle_dead_workers(connection, workers, channel, worker_queue): if len(workers) == 1: # Use global timeout identity = list(workers.keys())[0] - if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT: + if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT: attempt_graceful_shutdown(connection, workers, channel, worker_queue) raise RuntimeError("Worker has timed out. Stopping distributed build.") @@ -241,7 +241,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool, queue_prefix: running = True while running: # Wait for work from manager - with Timeout(seconds=MANAGER_TIMEOUT): + with Timeout(seconds=settings.MANAGER_TIMEOUT): _, _, body = channel.basic_get(queue=worker_queue, auto_ack=True) if body is not None: From f5f60851ad73d38cd0d79ec085f037d1002d9784 Mon Sep 17 00:00:00 2001 From: Jason Munro Date: Thu, 16 Feb 2023 21:21:08 -0800 Subject: [PATCH 2/4] Add pydantic to requirements --- requirements.txt | 1 + setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index c86881cbd..938e2affa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ msgpack-python==0.5.6 orjson==3.8.0 boto3==1.24.42 python-dateutil==2.8.2 +pydantic diff --git a/setup.py b/setup.py index 6096af4ec..3de499302 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ include_package_data=True, install_requires=[ "setuptools", + "pydantic", "pymongo>=4.2.0", "monty>=1.0.2", "mongomock>=3.10.0", From 0a143c3145f9d98c482f6bc0ddcbcf07e21dd7cb Mon Sep 17 00:00:00 2001 From: Jason Munro Date: Thu, 16 Feb 2023 21:24:20 -0800 Subject: [PATCH 3/4] Linting --- src/maggma/cli/distributed.py | 1 + src/maggma/cli/rabbitmq.py | 1 + 2 files changed, 2 insertions(+) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 7ea1c2169..2abb49e17 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -22,6 +22,7 @@ settings = CLISettings() + def find_port(): sock = pysocket.socket() sock.bind(("", 0)) diff --git a/src/maggma/cli/rabbitmq.py b/src/maggma/cli/rabbitmq.py index c61d26da5..9e35d26f3 100644 --- a/src/maggma/cli/rabbitmq.py +++ b/src/maggma/cli/rabbitmq.py @@ -25,6 +25,7 @@ settings = CLISettings() + def find_port(): sock = pysocket.socket() sock.bind(("", 0)) From 2d132feb5099a1e0300ca65da2ee5d9f4d88a08e Mon Sep 17 00:00:00 2001 From: Jason Munro Date: Thu, 16 Feb 2023 21:28:32 -0800 Subject: [PATCH 4/4] Move settings to separate module --- src/maggma/cli/__init__.py | 22 +--------------------- src/maggma/cli/distributed.py | 2 +- src/maggma/cli/multiprocessing.py | 1 - src/maggma/cli/rabbitmq.py | 2 +- src/maggma/cli/settings.py | 18 ++++++++++++++++++ 5 files changed, 21 insertions(+), 24 deletions(-) create mode 100644 src/maggma/cli/settings.py diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index bbb4feb13..899ce1a34 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -16,28 +16,8 @@ from maggma.cli.source_loader import ScriptFinder, load_builder_from_source from maggma.utils import ReportingHandler, TqdmLoggingHandler - -from pydantic import BaseSettings, Field - sys.meta_path.append(ScriptFinder()) - - -class CLISettings(BaseSettings): - - WORKER_TIMEOUT: int = Field( - 3600, - description="Timeout in seconds for a distributed worker", - ) - - MANAGER_TIMEOUT: int = Field( - 900, - description="Timeout in seconds for the worker manager", - ) - - class Config: - env_prefix = "MAGGMA_" - extra = "ignore" - + @click.command() @click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 2abb49e17..5d3143e2b 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -14,9 +14,9 @@ from monty.serialization import MontyDecoder from maggma.cli.multiprocessing import multi +from maggma.cli.settings import CLISettings from maggma.core import Builder from maggma.utils import tqdm -from maggma.cli import CLISettings import zmq diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index 912a99cb5..8666a3d39 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -16,7 +16,6 @@ from tqdm import tqdm from maggma.utils import primed -from maggma.cli import CLISettings logger = getLogger("MultiProcessor") diff --git a/src/maggma/cli/rabbitmq.py b/src/maggma/cli/rabbitmq.py index 9e35d26f3..17ab8af22 100644 --- a/src/maggma/cli/rabbitmq.py +++ b/src/maggma/cli/rabbitmq.py @@ -14,9 +14,9 @@ from monty.serialization import MontyDecoder from maggma.cli.multiprocessing import multi +from maggma.cli.settings import CLISettings from maggma.core import Builder from maggma.utils import tqdm, Timeout -from maggma.cli import CLISettings try: import pika diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py new file mode 100644 index 000000000..674b6f3a0 --- /dev/null +++ b/src/maggma/cli/settings.py @@ -0,0 +1,18 @@ +from pydantic import BaseSettings, Field + + +class CLISettings(BaseSettings): + + WORKER_TIMEOUT: int = Field( + 3600, + description="Timeout in seconds for a distributed worker", + ) + + MANAGER_TIMEOUT: int = Field( + 900, + description="Timeout in seconds for the worker manager", + ) + + class Config: + env_prefix = "MAGGMA_" + extra = "ignore"