Skip to content

Commit

Permalink
to remove this msg[1]['run_id'] = self.run_id
Browse files Browse the repository at this point in the history
from the monitoring router, the interchange should fully construct these
messages - c.f. how the task wrapper knows the run id

so add run id as an interchange parameter and populate there

... which means htex needs to know about run_ids

... which is fine as it knows about monitoring generally, in order to
send the messages into monitoring...

this is tested by
test_row_counts
which fails without the run_id populated
due to:

> sqlite3.IntegrityError: NOT NULL constraint failed: node.run_id

in the database manager.
  • Loading branch information
benclifford committed Jul 26, 2024
1 parent 138b8a7 commit 7e9d4a9
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 10 deletions.
2 changes: 1 addition & 1 deletion parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(self, config: Config) -> None:
if self.monitoring:
if self.monitoring.logdir is None:
self.monitoring.logdir = self.run_dir
self.monitoring.start(self.run_id, self.run_dir, self.config.run_dir)
self.monitoring.start(self.run_dir, self.config.run_dir)

self.time_began = datetime.datetime.now()
self.time_completed: Optional[datetime.datetime] = None
Expand Down
1 change: 1 addition & 0 deletions parsl/executors/high_throughput/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ def _start_local_interchange_process(self) -> None:
"poll_period": self.poll_period,
"logging_level": logging.DEBUG if self.worker_debug else logging.INFO,
"cert_dir": self.cert_dir,
"run_id": self.run_id,
}

config_pickle = pickle.dumps(interchange_config)
Expand Down
4 changes: 4 additions & 0 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def __init__(self,
logging_level: int,
poll_period: int,
cert_dir: Optional[str],
run_id: str,
) -> None:
"""
Parameters
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self,
self.command_channel.connect("tcp://{}:{}".format(client_address, client_ports[2]))
logger.info("Connected to client")

self.run_id = run_id

self.hub_address = hub_address
self.hub_zmq_port = hub_zmq_port

Expand Down Expand Up @@ -224,6 +227,7 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender
d: Dict = cast(Dict, manager.copy())
d['timestamp'] = datetime.datetime.now()
d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])
d['run_id'] = self.run_id

monitoring_radio.send((MessageType.NODE_INFO, d))

Expand Down
3 changes: 1 addition & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __init__(self,
self.resource_monitoring_enabled = resource_monitoring_enabled
self.resource_monitoring_interval = resource_monitoring_interval

def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:
def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> None:

logger.debug("Starting MonitoringHub")

Expand Down Expand Up @@ -154,7 +154,6 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
"zmq_port_range": self.hub_port_range,
"logdir": self.logdir,
"logging_level": logging.DEBUG if self.monitoring_debug else logging.INFO,
"run_id": run_id
},
name="Monitoring-Router-Process",
daemon=True,
Expand Down
7 changes: 1 addition & 6 deletions parsl/monitoring/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def __init__(self,

monitoring_hub_address: str = "127.0.0.1",
logdir: str = ".",
run_id: str,
logging_level: int = logging.INFO,
atexit_timeout: int = 3, # in seconds
resource_msgs: "Queue[AddressedMonitoringMessage]",
Expand Down Expand Up @@ -69,7 +68,6 @@ def __init__(self,

self.hub_address = hub_address
self.atexit_timeout = atexit_timeout
self.run_id = run_id

self.loop_freq = 10.0 # milliseconds

Expand Down Expand Up @@ -171,7 +169,6 @@ def start_zmq_listener(self) -> None:
msg_0 = (msg, 0)

if msg[0] == MessageType.NODE_INFO:
msg[1]['run_id'] = self.run_id
self.resource_msgs.put(msg_0)
elif msg[0] == MessageType.RESOURCE_INFO or msg[0] == MessageType.BLOCK_INFO:
self.resource_msgs.put(msg_0)
Expand Down Expand Up @@ -214,16 +211,14 @@ def router_starter(*,
zmq_port_range: Tuple[int, int],

logdir: str,
logging_level: int,
run_id: str) -> None:
logging_level: int) -> None:
setproctitle("parsl: monitoring router")
try:
router = MonitoringRouter(hub_address=hub_address,
udp_port=udp_port,
zmq_port_range=zmq_port_range,
logdir=logdir,
logging_level=logging_level,
run_id=run_id,
resource_msgs=resource_msgs,
exit_event=exit_event)
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion parsl/tests/test_htex/test_zmq_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def make_interchange(*, interchange_address: Optional[str], cert_dir: Optional[s
heartbeat_threshold=60,
logdir=".",
logging_level=logging.INFO,
poll_period=10)
poll_period=10,
run_id="test_run_id")


@pytest.fixture
Expand Down

0 comments on commit 7e9d4a9

Please sign in to comment.