Skip to content

Commit

Permalink
Remove unused source address on some monitoring messages (#3666)
Browse files Browse the repository at this point in the history
This was an extra protocol complication, the source address was never
used beyond immediate logging, and the source address sometimes didn't
exist with a few different filler values used instead.

## Type of change

- Code maintenance/cleanup
  • Loading branch information
benclifford authored Oct 28, 2024
1 parent d0a5f11 commit 20faa35
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 21 deletions.
2 changes: 1 addition & 1 deletion parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def _migrate_logs_to_internal(self, logs_queue: queue.Queue, kill_event: threadi
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
kill_event.is_set(), logs_queue.qsize() != 0)
try:
x, addr = logs_queue.get(timeout=0.1)
x = logs_queue.get(timeout=0.1)
except queue.Empty:
continue
else:
Expand Down
10 changes: 5 additions & 5 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import AddressedMonitoringMessage
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import deserialize
Expand Down Expand Up @@ -138,7 +138,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.exception_q: Queue[Tuple[str, str]]
self.exception_q = SizedQueue(maxsize=10)

self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]]
self.resource_msgs: Queue[Union[TaggedMonitoringMessage, Literal["STOP"]]]
self.resource_msgs = SizedQueue()

self.router_exit_event: ms.Event
Expand Down Expand Up @@ -237,7 +237,7 @@ def close(self) -> None:
logger.debug("Finished waiting for router termination")
if len(exception_msgs) == 0:
logger.debug("Sending STOP to DBM")
self.resource_msgs.put(("STOP", 0))
self.resource_msgs.put("STOP")
else:
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
logger.debug("Waiting for DB termination")
Expand All @@ -261,7 +261,7 @@ def close(self) -> None:


@wrap_with_logs
def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]", run_dir: str) -> None:
def filesystem_receiver(logdir: str, q: "queue.Queue[TaggedMonitoringMessage]", run_dir: str) -> None:
logger = set_file_logger("{}/monitoring_filesystem_radio.log".format(logdir),
name="monitoring_filesystem_radio",
level=logging.INFO)
Expand All @@ -288,7 +288,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[AddressedMonitoringMessage]
message = deserialize(f.read())
logger.debug(f"Message received is: {message}")
assert isinstance(message, tuple)
q.put(cast(AddressedMonitoringMessage, message))
q.put(cast(TaggedMonitoringMessage, message))
os.remove(full_path_filename)
except Exception:
logger.exception(f"Exception processing {filename} - probably will be retried next iteration")
Expand Down
4 changes: 2 additions & 2 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def send(self, message: object) -> None:

tmp_filename = f"{self.tmp_path}/{unique_id}"
new_filename = f"{self.new_path}/{unique_id}"
buffer = (message, "NA")
buffer = message

# this will write the message out then atomically
# move it into new/, so that a partially written
Expand Down Expand Up @@ -187,7 +187,7 @@ def __init__(self, queue: Queue) -> None:
self.queue = queue

def send(self, message: object) -> None:
self.queue.put((message, 0))
self.queue.put(message)


class ZMQRadioSender(MonitoringRadioSender):
Expand Down
11 changes: 4 additions & 7 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import zmq

from parsl.log_utils import set_file_logger
from parsl.monitoring.types import AddressedMonitoringMessage, TaggedMonitoringMessage
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.process_loggers import wrap_with_logs
from parsl.utils import setproctitle

Expand Down Expand Up @@ -125,7 +125,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
resource_msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, resource_msg))
self.resource_msgs.put((resource_msg, addr))
self.resource_msgs.put(resource_msg)
except socket.timeout:
pass

Expand All @@ -136,7 +136,7 @@ def start_udp_listener(self) -> None:
data, addr = self.udp_sock.recvfrom(2048)
msg = pickle.loads(data)
self.logger.debug("Got UDP Message from {}: {}".format(addr, msg))
self.resource_msgs.put((msg, addr))
self.resource_msgs.put(msg)
last_msg_received_time = time.time()
except socket.timeout:
pass
Expand All @@ -160,10 +160,7 @@ def start_zmq_listener(self) -> None:
assert len(msg) >= 1, "ZMQ Receiver expects tuples of length at least 1, got {}".format(msg)
assert len(msg) == 2, "ZMQ Receiver expects message tuples of exactly length 2, got {}".format(msg)

msg_0: AddressedMonitoringMessage
msg_0 = (msg, 0)

self.resource_msgs.put(msg_0)
self.resource_msgs.put(msg)
except zmq.Again:
pass
except Exception:
Expand Down
9 changes: 3 additions & 6 deletions parsl/monitoring/types.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
from typing import Any, Dict, Tuple, Union
from typing import Any, Dict, Tuple

from typing_extensions import TypeAlias

from parsl.monitoring.message_type import MessageType

# A basic parsl monitoring message is wrapped by up to two wrappers:
# The basic monitoring message dictionary can first be tagged, giving
# a TaggedMonitoringMessage, and then that can be further tagged with
# an often unused sender address, giving an AddressedMonitoringMessage.
# A MonitoringMessage dictionary can be tagged, giving a
# TaggedMonitoringMessage.

MonitoringMessage: TypeAlias = Dict[str, Any]
TaggedMonitoringMessage: TypeAlias = Tuple[MessageType, MonitoringMessage]
AddressedMonitoringMessage: TypeAlias = Tuple[TaggedMonitoringMessage, Union[str, int]]

0 comments on commit 20faa35

Please sign in to comment.