Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pydantic CLI settings #778

Merged
merged 4 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ msgpack-python==0.5.6
orjson==3.8.0
boto3==1.24.42
python-dateutil==2.8.2
pydantic
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
include_package_data=True,
install_requires=[
"setuptools",
"pydantic",
"pymongo>=4.2.0",
"monty>=1.0.2",
"mongomock>=3.10.0",
Expand Down
2 changes: 1 addition & 1 deletion src/maggma/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from maggma.utils import ReportingHandler, TqdmLoggingHandler

sys.meta_path.append(ScriptFinder())


@click.command()
@click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True)
Expand Down
12 changes: 6 additions & 6 deletions src/maggma/cli/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
from monty.json import jsanitize
from monty.serialization import MontyDecoder

from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT
from maggma.cli.multiprocessing import multi
from maggma.cli.settings import CLISettings
from maggma.core import Builder
from maggma.utils import tqdm

import zmq
import zmq.asyncio as azmq

WORKER_TIMEOUT = 5400 # max timeout in seconds for a worker
settings = CLISettings()


def find_port():
Expand Down Expand Up @@ -175,7 +175,7 @@ def handle_dead_workers(workers, socket):
if len(workers) == 1:
# Use global timeout
identity = list(workers.keys())[0]
if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT:
if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT:
attempt_graceful_shutdown(workers, socket)
raise RuntimeError("Worker has timed out. Stopping distributed build.")

Expand Down Expand Up @@ -229,7 +229,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool):
socket.send("READY_{}".format(hostname).encode("utf-8"))

# Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout
connections = dict(poller.poll(MANAGER_TIMEOUT * 1000))
connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000))
if not connections:
socket.close()
raise RuntimeError("Stopping work as manager timed out.")
Expand Down Expand Up @@ -267,7 +267,7 @@ def ping_manager(socket, poller):
socket.send_string("PING")

# Poll for MANAGER_TIMEOUT seconds, if nothing is given then assume manager is dead and timeout
connections = dict(poller.poll(MANAGER_TIMEOUT * 1000))
connections = dict(poller.poll(settings.MANAGER_TIMEOUT * 1000))
if not connections:
socket.close()
raise RuntimeError("Stopping work as manager timed out.")
Expand Down
2 changes: 0 additions & 2 deletions src/maggma/cli/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

from maggma.utils import primed

MANAGER_TIMEOUT = 300 # max timeout in seconds for manager

logger = getLogger("MultiProcessor")


Expand Down
9 changes: 5 additions & 4 deletions src/maggma/cli/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from monty.json import jsanitize
from monty.serialization import MontyDecoder

from maggma.cli.multiprocessing import multi, MANAGER_TIMEOUT
from maggma.cli.multiprocessing import multi
from maggma.cli.settings import CLISettings
from maggma.core import Builder
from maggma.utils import tqdm, Timeout

Expand All @@ -22,7 +23,7 @@
except ImportError:
raise ImportError("Both pika and aio-pika are required to use RabbitMQ as a broker")

WORKER_TIMEOUT = 3600 # max timeout in seconds for a worker
settings = CLISettings()


def find_port():
Expand Down Expand Up @@ -192,7 +193,7 @@ def handle_dead_workers(connection, workers, channel, worker_queue):
if len(workers) == 1:
# Use global timeout
identity = list(workers.keys())[0]
if (perf_counter() - workers[identity]["last_ping"]) >= WORKER_TIMEOUT:
if (perf_counter() - workers[identity]["last_ping"]) >= settings.WORKER_TIMEOUT:
attempt_graceful_shutdown(connection, workers, channel, worker_queue)
raise RuntimeError("Worker has timed out. Stopping distributed build.")

Expand Down Expand Up @@ -241,7 +242,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool, queue_prefix:
running = True
while running:
# Wait for work from manager
with Timeout(seconds=MANAGER_TIMEOUT):
with Timeout(seconds=settings.MANAGER_TIMEOUT):
_, _, body = channel.basic_get(queue=worker_queue, auto_ack=True)

if body is not None:
Expand Down
18 changes: 18 additions & 0 deletions src/maggma/cli/settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pydantic import BaseSettings, Field


class CLISettings(BaseSettings):

WORKER_TIMEOUT: int = Field(
3600,
description="Timeout in seconds for a distributed worker",
)

MANAGER_TIMEOUT: int = Field(
900,
description="Timeout in seconds for the worker manager",
)

class Config:
env_prefix = "MAGGMA_"
extra = "ignore"