diff --git a/requirements.txt b/requirements.txt index c86881cbd..938e2affa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ msgpack-python==0.5.6 orjson==3.8.0 boto3==1.24.42 python-dateutil==2.8.2 +pydantic diff --git a/setup.py b/setup.py index 6096af4ec..3de499302 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ include_package_data=True, install_requires=[ "setuptools", + "pydantic", "pymongo>=4.2.0", "monty>=1.0.2", "mongomock>=3.10.0", diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index c73a0c497..899ce1a34 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -17,7 +17,7 @@ from maggma.utils import ReportingHandler, TqdmLoggingHandler sys.meta_path.append(ScriptFinder()) - + @click.command() @click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True) diff --git a/src/maggma/cli/distributed.py b/src/maggma/cli/distributed.py index 10dbaf458..5d3143e2b 100644 --- a/src/maggma/cli/distributed.py +++ b/src/maggma/cli/distributed.py @@ -13,14 +13,14 @@ from monty.json import jsanitize from monty.serialization import MontyDecoder -from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT +from maggma.cli.multiprocessing import multi +from maggma.cli.settings import CLISettings from maggma.core import Builder from maggma.utils import tqdm import zmq -import zmq.asyncio as azmq -WORKER_TIMEOUT = 5400 # max timeout in seconds for a worker +settings = CLISettings() def find_port(): @@ -175,7 +175,7 @@ def handle_dead_workers(workers, socket): if len(workers) == 1: # Use global timeout identity = list(workers.keys())[0] - if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT: + if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT: attempt_graceful_shutdown(workers, socket) raise RuntimeError("Worker has timed out. Stopping distributed build.") @@ -229,7 +229,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool): socket.send("READY_{}".format(hostname).encode("utf-8")) # Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout - connections = dict(poller.poll(MANAGER_TIMEOUT * 1000)) + connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000)) if not connections: socket.close() raise RuntimeError("Stopping work as manager timed out.") @@ -267,7 +267,7 @@ def ping_manager(socket, poller): socket.send_string("PING") # Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout - connections = dict(poller.poll(MANAGER_TIMEOUT * 1000)) + connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000)) if not connections: socket.close() raise RuntimeError("Stopping work as manager timed out.") diff --git a/src/maggma/cli/multiprocessing.py b/src/maggma/cli/multiprocessing.py index ddc0b583a..8666a3d39 100644 --- a/src/maggma/cli/multiprocessing.py +++ b/src/maggma/cli/multiprocessing.py @@ -17,8 +17,6 @@ from maggma.utils import primed -MANAGER_TIMEOUT = 300 # max timeout in seconds for manager - logger = getLogger("MultiProcessor") diff --git a/src/maggma/cli/rabbitmq.py b/src/maggma/cli/rabbitmq.py index 9623574dc..17ab8af22 100644 --- a/src/maggma/cli/rabbitmq.py +++ b/src/maggma/cli/rabbitmq.py @@ -13,7 +13,8 @@ from monty.json import jsanitize from monty.serialization import MontyDecoder -from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT +from maggma.cli.multiprocessing import multi +from maggma.cli.settings import CLISettings from maggma.core import Builder from maggma.utils import tqdm, Timeout @@ -22,7 +23,7 @@ except ImportError: raise ImportError("Both pika and aio-pika are required to use RabbitMQ as a broker") -WORKER_TIMEOUT = 3600 # max timeout in seconds for a worker +settings = CLISettings() def find_port(): @@ -192,7 +193,7 @@ def handle_dead_workers(connection, workers, channel, worker_queue): if len(workers) == 1: # Use global timeout identity = list(workers.keys())[0] - if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT: + if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT: attempt_graceful_shutdown(connection, workers, channel, worker_queue) raise RuntimeError("Worker has timed out. Stopping distributed build.") @@ -241,7 +242,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool, queue_prefix: running = True while running: # Wait for work from manager - with Timeout(seconds=MANAGER_TIMEOUT): + with Timeout(seconds=settings.MANAGER_TIMEOUT): _, _, body = channel.basic_get(queue=worker_queue, auto_ack=True) if body is not None: diff --git a/src/maggma/cli/settings.py b/src/maggma/cli/settings.py new file mode 100644 index 000000000..674b6f3a0 --- /dev/null +++ b/src/maggma/cli/settings.py @@ -0,0 +1,18 @@ +from pydantic import BaseSettings, Field + + +class CLISettings(BaseSettings): + + WORKER_TIMEOUT: int = Field( + 3600, + description="Timeout in seconds for a distributed worker", + ) + + MANAGER_TIMEOUT: int = Field( + 900, + description="Timeout in seconds for the worker manager", + ) + + class Config: + env_prefix = "MAGGMA_" + extra = "ignore"