diff --git a/stimela/backends/kube/pod_proxy.py b/stimela/backends/kube/pod_proxy.py index 4183232f..cb31c6f1 100644 --- a/stimela/backends/kube/pod_proxy.py +++ b/stimela/backends/kube/pod_proxy.py @@ -84,10 +84,15 @@ def __init__(self, kube: KubeBackendOptions, podname: str, image_name: str, comm self._exit_logging_threads = False self._aux_pod_threads = {} self._mounts = {} + # accumulate list of exceptions that are raised in the updater thread + self._exceptions = [] def _status_updater(self): while not self._exit_logging_threads: - update_process_status() + try: + update_process_status() + except BackendError as exc: + self._exceptions.append(exc) time.sleep(1) def start_status_update_thread(self): @@ -96,6 +101,18 @@ def start_status_update_thread(self): self._aux_pod_threads["status"] = thread, None, "status update thread" thread.start() + def check_status(self): + """ + Checks if status thread has caught any exceptions, and re-reises them if so. + Returns True of no exceptions. + """ + if self._exceptions: + if len(self._exceptions) == 1: + raise self._exceptions[0] + else: + raise BackendError("k8s backend errors", *self._exceptions) + return True + @property def session_init_container(self): if self._session_init_container is None: diff --git a/stimela/backends/kube/run_kube.py b/stimela/backends/kube/run_kube.py index 6e2268b3..c32c12ff 100644 --- a/stimela/backends/kube/run_kube.py +++ b/stimela/backends/kube/run_kube.py @@ -79,6 +79,7 @@ def elapsed(since=None): pathlib.Path(numba_cache_dir).mkdir(parents=True, exist_ok=True) pod_created = dask_job_created = volumes_provisioned = port_forward_proc = None + bailout_with_exceptions = [] def k8s_event_handler(event): objkind = event.involved_object.kind @@ -228,7 +229,7 @@ def dprint(level, *args): dprint(2, "Responce: ", resp) pod_created = resp connected = True - while True: + while pod.check_status(): try: resp = kube_api.read_namespaced_pod_status(name=podname, namespace=namespace, _request_timeout=(1, 1)) @@ -279,7 +280,7 @@ def dprint(level, *args): job_status = None connected = True # wait for dask job to start up - while True: + while pod.check_status(): try: resp = custom_obj_api.get_namespaced_custom_object_status(group, version, namespace, plural, name=dask_job_name, _request_timeout=(1, 1)) @@ -361,7 +362,7 @@ def dprint(level, *args): connected = True last_log_timestamp = None seen_logs = set() - while retcode is None: + while retcode is None and pod.check_status(): try: for entry in kube_api.read_namespaced_pod_log(name=podname, namespace=namespace, container="job", follow=True, timestamps=True,