diff --git a/parsl/executors/base.py b/parsl/executors/base.py index b00aa55680..941f392e9f 100644 --- a/parsl/executors/base.py +++ b/parsl/executors/base.py @@ -5,7 +5,7 @@ from typing_extensions import Literal, Self -from parsl.monitoring.radios import MonitoringRadio +from parsl.monitoring.radios import MonitoringRadioSender class ParslExecutor(metaclass=ABCMeta): @@ -52,7 +52,7 @@ def __init__( *, hub_address: Optional[str] = None, hub_zmq_port: Optional[int] = None, - monitoring_radio: Optional[MonitoringRadio] = None, + monitoring_radio: Optional[MonitoringRadioSender] = None, run_dir: str = ".", run_id: Optional[str] = None, ): @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None: self._hub_zmq_port = value @property - def monitoring_radio(self) -> Optional[MonitoringRadio]: + def monitoring_radio(self) -> Optional[MonitoringRadioSender]: """Local radio for sending monitoring messages """ return self._monitoring_radio @monitoring_radio.setter - def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None: + def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None: self._monitoring_radio = value diff --git a/parsl/monitoring/monitoring.py b/parsl/monitoring/monitoring.py index 9133e35296..f86bf81e87 100644 --- a/parsl/monitoring/monitoring.py +++ b/parsl/monitoring/monitoring.py @@ -13,7 +13,7 @@ from parsl.log_utils import set_file_logger from parsl.monitoring.message_type import MessageType -from parsl.monitoring.radios import MultiprocessingQueueRadio +from parsl.monitoring.radios import MultiprocessingQueueRadioSender from parsl.monitoring.router import router_starter from parsl.monitoring.types import AddressedMonitoringMessage from parsl.multiprocessing import ForkProcess, SizedQueue @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat self.filesystem_proc.start() logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}") - self.radio = MultiprocessingQueueRadio(self.block_msgs) + self.radio = MultiprocessingQueueRadioSender(self.block_msgs) try: comm_q_result = comm_q.get(block=True, timeout=120) diff --git a/parsl/monitoring/radios.py b/parsl/monitoring/radios.py index 070869bdba..6c77fd37b1 100644 --- a/parsl/monitoring/radios.py +++ b/parsl/monitoring/radios.py @@ -15,14 +15,14 @@ logger = logging.getLogger(__name__) -class MonitoringRadio(metaclass=ABCMeta): +class MonitoringRadioSender(metaclass=ABCMeta): @abstractmethod def send(self, message: object) -> None: pass -class FilesystemRadio(MonitoringRadio): - """A MonitoringRadio that sends messages over a shared filesystem. +class FilesystemRadioSender(MonitoringRadioSender): + """A MonitoringRadioSender that sends messages over a shared filesystem. The messsage directory structure is based on maildir, https://en.wikipedia.org/wiki/Maildir @@ -36,7 +36,7 @@ class FilesystemRadio(MonitoringRadio): This avoids a race condition of reading partially written messages. This radio is likely to give higher shared filesystem load compared to - the UDPRadio, but should be much more reliable. + the UDP radio, but should be much more reliable. """ def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str): @@ -66,7 +66,7 @@ def send(self, message: object) -> None: os.rename(tmp_filename, new_filename) -class HTEXRadio(MonitoringRadio): +class HTEXRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -120,7 +120,7 @@ def send(self, message: object) -> None: return -class UDPRadio(MonitoringRadio): +class UDPRadioSender(MonitoringRadioSender): def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10): """ @@ -174,7 +174,7 @@ def send(self, message: object) -> None: return -class MultiprocessingQueueRadio(MonitoringRadio): +class MultiprocessingQueueRadioSender(MonitoringRadioSender): """A monitoring radio which connects over a multiprocessing Queue. This radio is intended to be used on the submit side, where components in the submit process, or processes launched by multiprocessing, will have diff --git a/parsl/monitoring/remote.py b/parsl/monitoring/remote.py index 98168aa858..055a013627 100644 --- a/parsl/monitoring/remote.py +++ b/parsl/monitoring/remote.py @@ -8,10 +8,10 @@ from parsl.monitoring.message_type import MessageType from parsl.monitoring.radios import ( - FilesystemRadio, - HTEXRadio, - MonitoringRadio, - UDPRadio, + FilesystemRadioSender, + HTEXRadioSender, + MonitoringRadioSender, + UDPRadioSender, ) from parsl.multiprocessing import ForkProcess from parsl.process_loggers import wrap_with_logs @@ -100,17 +100,17 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any: return (wrapped, args, new_kwargs) -def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio: - radio: MonitoringRadio +def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender: + radio: MonitoringRadioSender if radio_mode == "udp": - radio = UDPRadio(monitoring_hub_url, - source_id=task_id) + radio = UDPRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "htex": - radio = HTEXRadio(monitoring_hub_url, - source_id=task_id) + radio = HTEXRadioSender(monitoring_hub_url, + source_id=task_id) elif radio_mode == "filesystem": - radio = FilesystemRadio(monitoring_url=monitoring_hub_url, - source_id=task_id, run_dir=run_dir) + radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url, + source_id=task_id, run_dir=run_dir) else: raise RuntimeError(f"Unknown radio mode: {radio_mode}") return radio