Skip to content

Commit

Permalink
Merge pull request #793 from materialsproject/bugfix/disable_worker_t…
Browse files Browse the repository at this point in the history
…imeouts

Disable worker timeouts by default
  • Loading branch information
Jason Munro authored Jun 6, 2023
2 parents d2b8bf0 + 46bc4b8 commit ba7d1fd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 20 deletions.
57 changes: 40 additions & 17 deletions src/maggma/cli/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def find_port():
return sock.getsockname()[1]


def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int):
def manager( # noqa: C901
url: str, port: int, builders: List[Builder], num_chunks: int, num_workers: int
):
"""
Really simple manager for distributed processing that uses a builder prechunk to modify
the builder and send out modified builders for each worker to run.
Expand All @@ -43,15 +45,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w
raise ValueError("Both num_chunks and num_workers must be non-zero")

logger.info(f"Binding to Manager URL {url}:{port}")
context = zmq.Context()
context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1)
context.setsockopt(opt=zmq.SNDHWM, value=0)
context.setsockopt(opt=zmq.RCVHWM, value=0)
socket = context.socket(zmq.ROUTER)
socket.bind(f"{url}:{port}")

poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
# Setup socket and polling
socket, poll = setup(url, port)

workers = {} # type: ignore

Expand All @@ -63,7 +59,10 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w

try:
builder.connect()
chunk_dicts = [{"chunk": d, "distributed": False, "completed": False} for d in builder.prechunk(num_chunks)]
chunk_dicts = [
{"chunk": d, "distributed": False, "completed": False}
for d in builder.prechunk(num_chunks)
]
pbar_distributed = tqdm(
total=len(chunk_dicts),
desc="Distributed chunks for {}".format(builder.__class__.__name__),
Expand All @@ -78,7 +77,9 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w

except NotImplementedError:
attempt_graceful_shutdown(workers, socket)
raise RuntimeError(f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists.")
raise RuntimeError(
f"Can't distribute process {builder.__class__.__name__} as no prechunk method exists."
)

completed = False

Expand Down Expand Up @@ -117,15 +118,19 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w

# If everything is distributed, send EXIT to the worker
if all(chunk["distributed"] for chunk in chunk_dicts):
logger.debug(f"Sending exit signal to worker: {msg.split('_')[1]}")
logger.debug(
f"Sending exit signal to worker: {msg.split('_')[1]}"
)
socket.send_multipart([identity, b"", b"EXIT"])
workers.pop(identity)

elif "ERROR" in msg:
# Remove worker and requeue work sent to it
attempt_graceful_shutdown(workers, socket)
raise RuntimeError(
"At least one worker has stopped with error message: {}".format(msg.split("_")[1])
"At least one worker has stopped with error message: {}".format(
msg.split("_")[1]
)
)

elif msg == "PING":
Expand All @@ -135,7 +140,8 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w
workers[identity]["heartbeats"] += 1

# Decide if any workers are dead and need to be removed
handle_dead_workers(workers, socket)
if settings.WORKER_TIMEOUT is not None:
handle_dead_workers(workers, socket)

for work_index, chunk_dict in enumerate(chunk_dicts):
if not chunk_dict["distributed"]:
Expand Down Expand Up @@ -165,6 +171,19 @@ def manager(url: str, port: int, builders: List[Builder], num_chunks: int, num_w
attempt_graceful_shutdown(workers, socket)


def setup(url, port):
context = zmq.Context()
context.setsockopt(opt=zmq.SocketOption.ROUTER_MANDATORY, value=1)
context.setsockopt(opt=zmq.SNDHWM, value=0)
context.setsockopt(opt=zmq.RCVHWM, value=0)
socket = context.socket(zmq.ROUTER)
socket.bind(f"{url}:{port}")

poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
return socket, poll


def attempt_graceful_shutdown(workers, socket):
for identity in workers:
socket.send_multipart([identity, b"", b"EXIT"])
Expand Down Expand Up @@ -199,7 +218,9 @@ def handle_dead_workers(workers, socket):
z_score = 0.6745 * (workers[identity]["heartbeats"] - median) / mad
if z_score <= -3.5:
attempt_graceful_shutdown(workers, socket)
raise RuntimeError("At least one worker has timed out. Stopping distributed build.")
raise RuntimeError(
"At least one worker has timed out. Stopping distributed build."
)


def worker(url: str, port: int, num_processes: int, no_bars: bool):
Expand All @@ -210,7 +231,7 @@ def worker(url: str, port: int, num_processes: int, no_bars: bool):
identity = "%04X-%04X" % (randint(0, 0x10000), randint(0, 0x10000))
logger = getLogger(f"Worker {identity}")

logger.info(f"Connnecting to Manager at {url}:{port}")
logger.info(f"Connecting to Manager at {url}:{port}")
context = zmq.Context()
socket: zmq.Socket = context.socket(zmq.REQ)

Expand Down Expand Up @@ -275,4 +296,6 @@ def ping_manager(socket, poller):
message: bytes = socket.recv()
if message.decode("utf-8") != "PONG":
socket.close()
raise RuntimeError("Stopping work as manager did not respond to heartbeat from worker.")
raise RuntimeError(
"Stopping work as manager did not respond to heartbeat from worker."
)
5 changes: 2 additions & 3 deletions src/maggma/cli/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@


class CLISettings(BaseSettings):

WORKER_TIMEOUT: int = Field(
3600,
None,
description="Timeout in seconds for a distributed worker",
)

MANAGER_TIMEOUT: int = Field(
900,
3600,
description="Timeout in seconds for the worker manager",
)

Expand Down

0 comments on commit ba7d1fd

Please sign in to comment.