Skip to content

Commit

Permalink
remove priority queue leaving everything in the resource queue
Browse files Browse the repository at this point in the history
  • Loading branch information
benclifford committed Jul 26, 2024
1 parent 2ec41fa commit 138b8a7
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 31 deletions.
19 changes: 5 additions & 14 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,23 +302,15 @@ def __init__(self,
self.batching_interval = batching_interval
self.batching_threshold = batching_threshold

self.pending_priority_queue = queue.Queue() # type: queue.Queue[TaggedMonitoringMessage]
self.pending_priority_queue: queue.Queue[TaggedMonitoringMessage] = queue.Queue()
self.pending_node_queue = queue.Queue() # type: queue.Queue[MonitoringMessage]
self.pending_block_queue = queue.Queue() # type: queue.Queue[MonitoringMessage]
self.pending_resource_queue = queue.Queue() # type: queue.Queue[MonitoringMessage]

def start(self,
priority_queue: "mpq.Queue[TaggedMonitoringMessage]",
resource_queue: "mpq.Queue[MonitoringMessage]") -> None:

self._kill_event = threading.Event()
self._priority_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
priority_queue, self._kill_event,),
name="Monitoring-migrate-priority",
daemon=True,
)
self._priority_queue_pull_thread.start()

self._resource_queue_pull_thread = threading.Thread(target=self._migrate_logs_to_internal,
args=(
Expand Down Expand Up @@ -354,18 +346,18 @@ def start(self,
while (not self._kill_event.is_set() or
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
priority_queue.qsize() != 0 or resource_queue.qsize() != 0):
resource_queue.qsize() != 0):

"""
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
"""
try:
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}, {}""".format(
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
self._kill_event.is_set(),
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
priority_queue.qsize() != 0, resource_queue.qsize() != 0))
resource_queue.qsize() != 0))

# This is the list of resource messages which can be reprocessed as if they
# had just arrived because the corresponding first task message has been
Expand Down Expand Up @@ -684,7 +676,6 @@ def close(self) -> None:
@wrap_with_logs(target="database_manager")
@typeguard.typechecked
def dbm_starter(exception_q: "mpq.Queue[Tuple[str, str]]",
priority_msgs: "mpq.Queue[TaggedMonitoringMessage]",
resource_msgs: "mpq.Queue[MonitoringMessage]",
db_url: str,
logdir: str,
Expand All @@ -701,7 +692,7 @@ def dbm_starter(exception_q: "mpq.Queue[Tuple[str, str]]",
logdir=logdir,
logging_level=logging_level)
logger.info("Starting dbm in dbm starter")
dbm.start(priority_msgs, resource_msgs)
dbm.start(resource_msgs)
except KeyboardInterrupt:
logger.exception("KeyboardInterrupt signal caught")
dbm.close()
Expand Down
14 changes: 4 additions & 10 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import time
from multiprocessing import Event, Process
from multiprocessing.queues import Queue
from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast
from typing import TYPE_CHECKING, Any, Literal, Optional, Tuple, Union, cast

import typeguard

Expand Down Expand Up @@ -138,10 +138,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.exception_q: Queue[Tuple[str, str]]
self.exception_q = SizedQueue(maxsize=10)

self.priority_msgs: Queue[Tuple[Any, int]]
self.priority_msgs = SizedQueue()

self.resource_msgs: Queue[AddressedMonitoringMessage]
self.resource_msgs: Queue[Union[AddressedMonitoringMessage, Tuple[Literal["STOP"], Literal[0]]]]
self.resource_msgs = SizedQueue()

self.router_exit_event: ms.Event
Expand All @@ -150,7 +147,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.router_proc = ForkProcess(target=router_starter,
kwargs={"comm_q": comm_q,
"exception_q": self.exception_q,
"priority_msgs": self.priority_msgs,
"resource_msgs": self.resource_msgs,
"exit_event": self.router_exit_event,
"hub_address": self.hub_address,
Expand All @@ -166,7 +162,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.router_proc.start()

self.dbm_proc = ForkProcess(target=dbm_starter,
args=(self.exception_q, self.priority_msgs, self.resource_msgs,),
args=(self.exception_q, self.resource_msgs,),
kwargs={"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"db_url": self.logging_endpoint,
Expand Down Expand Up @@ -242,7 +238,7 @@ def close(self) -> None:
logger.debug("Finished waiting for router termination")
if len(exception_msgs) == 0:
logger.debug("Sending STOP to DBM")
self.priority_msgs.put(("STOP", 0))
self.resource_msgs.put(("STOP", 0))
else:
logger.debug("Not sending STOP to DBM, because there were DBM exceptions")
logger.debug("Waiting for DB termination")
Expand All @@ -260,8 +256,6 @@ def close(self) -> None:
logger.info("Closing monitoring multiprocessing queues")
self.exception_q.close()
self.exception_q.join_thread()
self.priority_msgs.close()
self.priority_msgs.join_thread()
self.resource_msgs.close()
self.resource_msgs.join_thread()
logger.info("Closed monitoring multiprocessing queues")
Expand Down
10 changes: 3 additions & 7 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def __init__(self,
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
priority_msgs: "Queue[AddressedMonitoringMessage]",
resource_msgs: "Queue[AddressedMonitoringMessage]",
exit_event: Event,
):
Expand All @@ -56,7 +55,7 @@ def __init__(self,
Logging level as defined in the logging module. Default: logging.INFO
atexit_timeout : float, optional
The amount of time in seconds to terminate the hub without receiving any messages, after the last dfk workflow message is received.
*_msgs : Queue
XXX TODO not-so-many-left-now... *_msgs : Queue
Four multiprocessing queues to receive messages, routed by type tag, and sometimes modified according to type tag.
exit_event : Event
Expand Down Expand Up @@ -102,7 +101,6 @@ def __init__(self,
min_port=zmq_port_range[0],
max_port=zmq_port_range[1])

self.priority_msgs = priority_msgs
self.resource_msgs = resource_msgs
self.exit_event = exit_event

Expand Down Expand Up @@ -178,9 +176,9 @@ def start_zmq_listener(self) -> None:
elif msg[0] == MessageType.RESOURCE_INFO or msg[0] == MessageType.BLOCK_INFO:
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.TASK_INFO:
self.priority_msgs.put(msg_0)
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.WORKFLOW_INFO:
self.priority_msgs.put(msg_0)
self.resource_msgs.put(msg_0)
else:
# There is a type: ignore here because if msg[0]
# is of the correct type, this code is unreachable,
Expand Down Expand Up @@ -208,7 +206,6 @@ def start_zmq_listener(self) -> None:
def router_starter(*,
comm_q: "Queue[Union[Tuple[int, int], str]]",
exception_q: "Queue[Tuple[str, str]]",
priority_msgs: "Queue[AddressedMonitoringMessage]",
resource_msgs: "Queue[AddressedMonitoringMessage]",
exit_event: Event,

Expand All @@ -227,7 +224,6 @@ def router_starter(*,
logdir=logdir,
logging_level=logging_level,
run_id=run_id,
priority_msgs=priority_msgs,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
Expand Down

0 comments on commit 138b8a7

Please sign in to comment.