Skip to content

Commit

Permalink
split monitoring router into radio-specific receivers: zmq and UDP
Browse files Browse the repository at this point in the history
this is in preparation for pluggable/configurable radios: see PR #3315

in that future work, these receivers wont be launched unless configured... or at least, the UDP one won't (and similarly the filesystem one)

and in pluggable mode we would expect arbitrary new receivers to exist
so then theres no reason for these ones to be special

this opens up, in a future PR, the ability to not need to poll at such a high frequency - or at all... the poll-timeout is to allow the other poll to happen, but this PR makes those two polls happen in two threads.
  • Loading branch information
benclifford committed Jul 18, 2024
1 parent b741d4e commit 17a7497
Showing 1 changed file with 44 additions and 14 deletions.
58 changes: 44 additions & 14 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pickle
import queue
import socket
import threading
import time
from multiprocessing.synchronize import Event
from typing import Optional, Tuple, Union
Expand Down Expand Up @@ -108,7 +109,28 @@ def __init__(self,
self.resource_msgs = resource_msgs
self.exit_event = exit_event

@wrap_with_logs(target="monitoring_router")
def start(self) -> None:
self.logger.info("Starting UDP listener thread")
udp_radio_receiver_thread = threading.Thread(target=self.start_udp_listener)
udp_radio_receiver_thread.start()

self.logger.info("Starting ZMQ listener thread")
zmq_radio_receiver_thread = threading.Thread(target=self.start_zmq_listener)
zmq_radio_receiver_thread.start()

# exit when both of those have exiting
# TODO: this is to preserve the existing behaviour of start(), but it
# isn't necessarily the *right* thing to do...

self.logger.info("Joining on ZMQ listener thread")
zmq_radio_receiver_thread.join()
self.logger.info("Joining on UDP listener thread")
udp_radio_receiver_thread.join()
self.logger.info("Joined on both ZMQ and UDP listener threads")

@wrap_with_logs(target="monitoring_router")
def start_udp_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
Expand All @@ -119,6 +141,26 @@ def start(self) -> None:
except socket.timeout:
pass

self.logger.info("UDP listener draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("UDP listener finishing normally")
finally:
self.logger.info("UDP listener finished")

@wrap_with_logs(target="monitoring_router")
def start_zmq_listener(self) -> None:
try:
while not self.exit_event.is_set():
try:
dfk_loop_start = time.time()
while time.time() - dfk_loop_start < 1.0: # TODO make configurable
Expand Down Expand Up @@ -161,21 +203,9 @@ def start(self) -> None:
# thing to do.
self.logger.warning("Failure processing a ZMQ message", exc_info=True)

self.logger.info("Monitoring router draining")
last_msg_received_time = time.time()
while time.time() - last_msg_received_time < self.atexit_timeout:
try:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
last_msg_received_time = time.time()
except socket.timeout:
pass

self.logger.info("Monitoring router finishing normally")
self.logger.info("ZMQ listener finishing normally")
finally:
self.logger.info("Monitoring router finished")
self.logger.info("ZMQ listener finished")


@wrap_with_logs
Expand Down

0 comments on commit 17a7497

Please sign in to comment.