Skip to content

Commit

Permalink
Free up the *Radio namespace for future config structures
Browse files Browse the repository at this point in the history
Ongoing monitoring radio work (see PR #3315) introduces per-radio
configuration classes using *Radio names.

This PR frees up the *Radio namespace for that use, by renaming
non-user-exposed internal classes out of the way.
  • Loading branch information
benclifford committed Jul 18, 2024
1 parent a060dba commit 7837587
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
8 changes: 4 additions & 4 deletions parsl/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from typing_extensions import Literal, Self

from parsl.monitoring.radios import MonitoringRadio
from parsl.monitoring.radios import MonitoringRadioSender


class ParslExecutor(metaclass=ABCMeta):
Expand Down Expand Up @@ -52,7 +52,7 @@ def __init__(
*,
hub_address: Optional[str] = None,
hub_zmq_port: Optional[int] = None,
monitoring_radio: Optional[MonitoringRadio] = None,
monitoring_radio: Optional[MonitoringRadioSender] = None,
run_dir: str = ".",
run_id: Optional[str] = None,
):
Expand Down Expand Up @@ -147,11 +147,11 @@ def hub_zmq_port(self, value: Optional[int]) -> None:
self._hub_zmq_port = value

@property
def monitoring_radio(self) -> Optional[MonitoringRadio]:
def monitoring_radio(self) -> Optional[MonitoringRadioSender]:
"""Local radio for sending monitoring messages
"""
return self._monitoring_radio

@monitoring_radio.setter
def monitoring_radio(self, value: Optional[MonitoringRadio]) -> None:
def monitoring_radio(self, value: Optional[MonitoringRadioSender]) -> None:
self._monitoring_radio = value
4 changes: 2 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from parsl.log_utils import set_file_logger
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import MultiprocessingQueueRadio
from parsl.monitoring.radios import MultiprocessingQueueRadioSender
from parsl.monitoring.router import router_starter
from parsl.monitoring.types import AddressedMonitoringMessage
from parsl.multiprocessing import ForkProcess, SizedQueue
Expand Down Expand Up @@ -187,7 +187,7 @@ def start(self, run_id: str, dfk_run_dir: str, config_run_dir: Union[str, os.Pat
self.filesystem_proc.start()
logger.info(f"Started filesystem radio receiver process {self.filesystem_proc.pid}")

self.radio = MultiprocessingQueueRadio(self.block_msgs)
self.radio = MultiprocessingQueueRadioSender(self.block_msgs)

try:
comm_q_result = comm_q.get(block=True, timeout=120)
Expand Down
14 changes: 7 additions & 7 deletions parsl/monitoring/radios.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
logger = logging.getLogger(__name__)


class MonitoringRadio(metaclass=ABCMeta):
class MonitoringRadioSender(metaclass=ABCMeta):
@abstractmethod
def send(self, message: object) -> None:
pass


class FilesystemRadio(MonitoringRadio):
"""A MonitoringRadio that sends messages over a shared filesystem.
class FilesystemRadioSender(MonitoringRadioSender):
"""A MonitoringRadioSender that sends messages over a shared filesystem.
The messsage directory structure is based on maildir,
https://en.wikipedia.org/wiki/Maildir
Expand All @@ -36,7 +36,7 @@ class FilesystemRadio(MonitoringRadio):
This avoids a race condition of reading partially written messages.
This radio is likely to give higher shared filesystem load compared to
the UDPRadio, but should be much more reliable.
the UDP radio, but should be much more reliable.
"""

def __init__(self, *, monitoring_url: str, source_id: int, timeout: int = 10, run_dir: str):
Expand Down Expand Up @@ -66,7 +66,7 @@ def send(self, message: object) -> None:
os.rename(tmp_filename, new_filename)


class HTEXRadio(MonitoringRadio):
class HTEXRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
"""
Expand Down Expand Up @@ -120,7 +120,7 @@ def send(self, message: object) -> None:
return


class UDPRadio(MonitoringRadio):
class UDPRadioSender(MonitoringRadioSender):

def __init__(self, monitoring_url: str, source_id: int, timeout: int = 10):
"""
Expand Down Expand Up @@ -174,7 +174,7 @@ def send(self, message: object) -> None:
return


class MultiprocessingQueueRadio(MonitoringRadio):
class MultiprocessingQueueRadioSender(MonitoringRadioSender):
"""A monitoring radio which connects over a multiprocessing Queue.
This radio is intended to be used on the submit side, where components
in the submit process, or processes launched by multiprocessing, will have
Expand Down
24 changes: 12 additions & 12 deletions parsl/monitoring/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@

from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios import (
FilesystemRadio,
HTEXRadio,
MonitoringRadio,
UDPRadio,
FilesystemRadioSender,
HTEXRadioSender,
MonitoringRadioSender,
UDPRadioSender,
)
from parsl.multiprocessing import ForkProcess
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -100,17 +100,17 @@ def wrapped(*args: List[Any], **kwargs: Dict[str, Any]) -> Any:
return (wrapped, args, new_kwargs)


def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadio:
radio: MonitoringRadio
def get_radio(radio_mode: str, monitoring_hub_url: str, task_id: int, run_dir: str) -> MonitoringRadioSender:
radio: MonitoringRadioSender
if radio_mode == "udp":
radio = UDPRadio(monitoring_hub_url,
source_id=task_id)
radio = UDPRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "htex":
radio = HTEXRadio(monitoring_hub_url,
source_id=task_id)
radio = HTEXRadioSender(monitoring_hub_url,
source_id=task_id)
elif radio_mode == "filesystem":
radio = FilesystemRadio(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
radio = FilesystemRadioSender(monitoring_url=monitoring_hub_url,
source_id=task_id, run_dir=run_dir)
else:
raise RuntimeError(f"Unknown radio mode: {radio_mode}")
return radio
Expand Down

0 comments on commit 7837587

Please sign in to comment.