Skip to content

Commit

Permalink
Move ManagerLost and VersionMismatch to errors.py
Browse files Browse the repository at this point in the history
Per the analysis in #3495, defining the `ManagerLost` and `VersionMismatch`
errors in the `interchange.py` became a problem in #3463, where the interchange
now runs as `__main__`.  This makes it difficult for Dill to get the serde
correct.

The organizational fix is simply to move these classes to an importable
location, which follows the expectation that classes are available in both
local and remote locations, which defining in `__main__` can't easily
guarantee.

Fixes: #3495
  • Loading branch information
khk-globus committed Jun 26, 2024
1 parent a128fdc commit 10c1b3a
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 26 deletions.
33 changes: 33 additions & 0 deletions parsl/executors/high_throughput/errors.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,36 @@
import time


class ManagerLost(Exception):
"""
Task lost due to manager loss. Manager is considered lost when multiple heartbeats
have been missed.
"""
def __init__(self, manager_id: bytes, hostname: str) -> None:
self.manager_id = manager_id
self.tstamp = time.time()
self.hostname = hostname

def __str__(self) -> str:
return (
f"Task failure due to loss of manager {self.manager_id.decode()} on"
f" host {self.hostname}"
)


class VersionMismatch(Exception):
"""Manager and Interchange versions do not match"""
def __init__(self, interchange_version: str, manager_version: str):
self.interchange_version = interchange_version
self.manager_version = manager_version

def __str__(self) -> str:
return (
f"Manager version info {self.manager_version} does not match interchange"
f" version info {self.interchange_version}, causing a critical failure"
)


class WorkerLost(Exception):
"""Exception raised when a worker is lost
"""
Expand Down
27 changes: 1 addition & 26 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from parsl import curvezmq
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.monitoring.message_type import MessageType
from parsl.process_loggers import wrap_with_logs
Expand All @@ -31,32 +32,6 @@
logger = logging.getLogger(LOGGER_NAME)


class ManagerLost(Exception):
''' Task lost due to manager loss. Manager is considered lost when multiple heartbeats
have been missed.
'''
def __init__(self, manager_id: bytes, hostname: str) -> None:
self.manager_id = manager_id
self.tstamp = time.time()
self.hostname = hostname

def __str__(self) -> str:
return "Task failure due to loss of manager {} on host {}".format(self.manager_id.decode(), self.hostname)


class VersionMismatch(Exception):
''' Manager and Interchange versions do not match
'''
def __init__(self, interchange_version: str, manager_version: str):
self.interchange_version = interchange_version
self.manager_version = manager_version

def __str__(self) -> str:
return "Manager version info {} does not match interchange version info {}, causing a critical failure".format(
self.manager_version,
self.interchange_version)


class Interchange:
""" Interchange is a task orchestrator for distributed systems.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import os
import signal

import pytest

import parsl
from parsl import Config, HighThroughputExecutor


@parsl.python_app
def get_manager_pgid():
import os
return os.getpgid(os.getpid())


@parsl.python_app
def lose_manager():
import os
import signal

manager_pid = os.getppid()
os.kill(manager_pid, signal.SIGSTOP)


@pytest.mark.local
def test_manager_lost_system_failure(tmpd_cwd):
hte = HighThroughputExecutor(
label="htex_local",
address="127.0.0.1",
max_workers_per_node=2,
cores_per_worker=1,
worker_logdir_root=str(tmpd_cwd),
heartbeat_period=1,
heartbeat_threshold=1,
)
c = Config(executors=[hte], strategy='simple', strategy_period=0.1)

with parsl.load(c):
manager_pgid = get_manager_pgid().result()
try:
lose_manager().result()
except Exception as e:
assert "ManagerLost" not in str(e), f"Issue 3495: {e}"
finally:
# Allow process to clean itself up
os.killpg(manager_pgid, signal.SIGCONT)
os.killpg(manager_pgid, signal.SIGTERM)

0 comments on commit 10c1b3a

Please sign in to comment.