diff --git a/parsl/executors/high_throughput/errors.py b/parsl/executors/high_throughput/errors.py index 4db7907523..9916ec506f 100644 --- a/parsl/executors/high_throughput/errors.py +++ b/parsl/executors/high_throughput/errors.py @@ -1,3 +1,36 @@ +import time + + +class ManagerLost(Exception): + """ + Task lost due to manager loss. Manager is considered lost when multiple heartbeats + have been missed. + """ + def __init__(self, manager_id: bytes, hostname: str) -> None: + self.manager_id = manager_id + self.tstamp = time.time() + self.hostname = hostname + + def __str__(self) -> str: + return ( + f"Task failure due to loss of manager {self.manager_id.decode()} on" + f" host {self.hostname}" + ) + + +class VersionMismatch(Exception): + """Manager and Interchange versions do not match""" + def __init__(self, interchange_version: str, manager_version: str): + self.interchange_version = interchange_version + self.manager_version = manager_version + + def __str__(self) -> str: + return ( + f"Manager version info {self.manager_version} does not match interchange" + f" version info {self.interchange_version}, causing a critical failure" + ) + + class WorkerLost(Exception): """Exception raised when a worker is lost """ diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 9fe94dbabd..819836e95f 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -17,6 +17,7 @@ from parsl import curvezmq from parsl.app.errors import RemoteExceptionWrapper +from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch from parsl.executors.high_throughput.manager_record import ManagerRecord from parsl.monitoring.message_type import MessageType from parsl.process_loggers import wrap_with_logs @@ -31,32 +32,6 @@ logger = logging.getLogger(LOGGER_NAME) -class ManagerLost(Exception): - ''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats - have been missed. - ''' - def __init__(self, manager_id: bytes, hostname: str) -> None: - self.manager_id = manager_id - self.tstamp = time.time() - self.hostname = hostname - - def __str__(self) -> str: - return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname) - - -class VersionMismatch(Exception): - ''' Manager and Interchange versions do not match - ''' - def __init__(self, interchange_version: str, manager_version: str): - self.interchange_version = interchange_version - self.manager_version = manager_version - - def __str__(self) -> str: - return "Manager version info {} does not match interchange version info {}, causing a critical failure".format( - self.manager_version, - self.interchange_version) - - class Interchange: """ Interchange is a task orchestrator for distributed systems. diff --git a/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py b/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py new file mode 100644 index 0000000000..3d35b110c2 --- /dev/null +++ b/parsl/tests/test_serialization/test_3495_deserialize_managerlost.py @@ -0,0 +1,47 @@ +import os +import signal + +import pytest + +import parsl +from parsl import Config, HighThroughputExecutor + + +@parsl.python_app +def get_manager_pgid(): + import os + return os.getpgid(os.getpid()) + + +@parsl.python_app +def lose_manager(): + import os + import signal + + manager_pid = os.getppid() + os.kill(manager_pid, signal.SIGSTOP) + + +@pytest.mark.local +def test_manager_lost_system_failure(tmpd_cwd): + hte = HighThroughputExecutor( + label="htex_local", + address="127.0.0.1", + max_workers_per_node=2, + cores_per_worker=1, + worker_logdir_root=str(tmpd_cwd), + heartbeat_period=1, + heartbeat_threshold=1, + ) + c = Config(executors=[hte], strategy='simple', strategy_period=0.1) + + with parsl.load(c): + manager_pgid = get_manager_pgid().result() + try: + lose_manager().result() + except Exception as e: + assert "ManagerLost" not in str(e), f"Issue 3495: {e}" + finally: + # Allow process to clean itself up + os.killpg(manager_pgid, signal.SIGCONT) + os.killpg(manager_pgid, signal.SIGTERM)