Skip to content

Commit

Permalink
fixes #188
Browse files Browse the repository at this point in the history
  • Loading branch information
o-smirnov committed Nov 9, 2023
1 parent 7eed821 commit 158cb25
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
19 changes: 18 additions & 1 deletion stimela/backends/kube/pod_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,15 @@ def __init__(self, kube: KubeBackendOptions, podname: str, image_name: str, comm
self._exit_logging_threads = False
self._aux_pod_threads = {}
self._mounts = {}
# accumulate list of exceptions that are raised in the updater thread
self._exceptions = []

def _status_updater(self):
while not self._exit_logging_threads:
update_process_status()
try:
update_process_status()
except BackendError as exc:
self._exceptions.append(exc)
time.sleep(1)

def start_status_update_thread(self):
Expand All @@ -96,6 +101,18 @@ def start_status_update_thread(self):
self._aux_pod_threads["status"] = thread, None, "status update thread"
thread.start()

def check_status(self):
"""
Checks if status thread has caught any exceptions, and re-reises them if so.
Returns True of no exceptions.
"""
if self._exceptions:
if len(self._exceptions) == 1:
raise self._exceptions[0]
else:
raise BackendError("k8s backend errors", *self._exceptions)
return True

@property
def session_init_container(self):
if self._session_init_container is None:
Expand Down
7 changes: 4 additions & 3 deletions stimela/backends/kube/run_kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def elapsed(since=None):
pathlib.Path(numba_cache_dir).mkdir(parents=True, exist_ok=True)

pod_created = dask_job_created = volumes_provisioned = port_forward_proc = None
bailout_with_exceptions = []

def k8s_event_handler(event):
objkind = event.involved_object.kind
Expand Down Expand Up @@ -228,7 +229,7 @@ def dprint(level, *args):
dprint(2, "Responce: ", resp)
pod_created = resp
connected = True
while True:
while pod.check_status():
try:
resp = kube_api.read_namespaced_pod_status(name=podname, namespace=namespace,
_request_timeout=(1, 1))
Expand Down Expand Up @@ -279,7 +280,7 @@ def dprint(level, *args):
job_status = None
connected = True
# wait for dask job to start up
while True:
while pod.check_status():
try:
resp = custom_obj_api.get_namespaced_custom_object_status(group, version,
namespace, plural, name=dask_job_name, _request_timeout=(1, 1))
Expand Down Expand Up @@ -361,7 +362,7 @@ def dprint(level, *args):
connected = True
last_log_timestamp = None
seen_logs = set()
while retcode is None:
while retcode is None and pod.check_status():
try:
for entry in kube_api.read_namespaced_pod_log(name=podname, namespace=namespace, container="job",
follow=True, timestamps=True,
Expand Down

0 comments on commit 158cb25

Please sign in to comment.