Skip to content

Commit

Permalink
Move all monitoring.py logging to lazy format style
Browse files Browse the repository at this point in the history
The is based on a review comment from @khk-globus on PR #3672
  • Loading branch information
benclifford committed Nov 1, 2024
1 parent 6dd5845 commit 0e65d78
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
daemon=True,
)
self.dbm_proc.start()
logger.info("Started the router process {} and DBM process {}".format(self.router_proc.pid, self.dbm_proc.pid))
logger.info("Started the router process %s and DBM process %s", self.router_proc.pid, self.dbm_proc.pid)

self.filesystem_proc = Process(target=filesystem_receiver,
args=(self.logdir, self.resource_msgs, dfk_run_dir),
name="Monitoring-Filesystem-Process",
daemon=True
)
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")
logger.info("Started filesystem radio receiver process %s", self.filesystem_proc.pid)

self.radio = MultiprocessingQueueRadioSender(self.resource_msgs)

Expand All @@ -190,7 +190,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
raise MonitoringHubStartError()

if isinstance(comm_q_result, str):
logger.error(f"MonitoringRouter sent an error message: {comm_q_result}")
logger.error("MonitoringRouter sent an error message: %s", comm_q_result)
raise RuntimeError(f"MonitoringRouter failed to start: {comm_q_result}")

udp_port, zmq_port = comm_q_result
Expand All @@ -202,7 +202,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.hub_zmq_port = zmq_port

def send(self, message: TaggedMonitoringMessage) -> None:
logger.debug("Sending message type {}".format(message[0]))
logger.debug("Sending message type %s", message[0])
self.radio.send(message)

def close(self) -> None:
Expand All @@ -219,10 +219,9 @@ def close(self) -> None:
if exception_msgs:
for exception_msg in exception_msgs:
logger.error(
"{} process delivered an exception: {}. Terminating all monitoring processes immediately.".format(
exception_msg[0],
exception_msg[1]
)
"%s process delivered an exception: %s. Terminating all monitoring processes immediately.",
exception_msg[0],
exception_msg[1]
)
self.router_proc.terminate()
self.dbm_proc.terminate()
Expand Down Expand Up @@ -269,7 +268,7 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[TaggedMonitoringMessage]",
base_path = f"{run_dir}/monitor-fs-radio/"
tmp_dir = f"{base_path}/tmp/"
new_dir = f"{base_path}/new/"
logger.debug(f"Creating new and tmp paths under {base_path}")
logger.debug("Creating new and tmp paths under %s", base_path)

os.makedirs(tmp_dir, exist_ok=True)
os.makedirs(new_dir, exist_ok=True)
Expand All @@ -280,15 +279,15 @@ def filesystem_receiver(logdir: str, q: "queue.Queue[TaggedMonitoringMessage]",
# iterate over files in new_dir
for filename in os.listdir(new_dir):
try:
logger.info(f"Processing filesystem radio file {filename}")
logger.info("Processing filesystem radio file %s", filename)
full_path_filename = f"{new_dir}/{filename}"
with open(full_path_filename, "rb") as f:
message = deserialize(f.read())
logger.debug(f"Message received is: {message}")
logger.debug("Message received is: %s", message)
assert isinstance(message, tuple)
q.put(cast(TaggedMonitoringMessage, message))
os.remove(full_path_filename)
except Exception:
logger.exception(f"Exception processing {filename} - probably will be retried next iteration")
logger.exception("Exception processing %s - probably will be retried next iteration", filename)

time.sleep(1) # whats a good time for this poll?

0 comments on commit 0e65d78

Please sign in to comment.