From 2d1507758a7183f603b8fd055b5aaddf0a682d3f Mon Sep 17 00:00:00 2001 From: sjmiller609 Date: Fri, 20 Sep 2019 10:57:20 -0400 Subject: [PATCH] [ASTRO-364] Handle timeout watching for tasks to finish (#64) (cherry picked from commit e6debf33462802a2a249695a6ea2bac71fe9f89b) (cherry picked from commit 564e85eaadda55797e25a8de2882bc23685ea5ad) (cherry picked from commit 0d438e6a8761425bfdbf0ee0aba33ed5047cdd54) --- airflow/executors/kubernetes_executor.py | 58 +++++++++++++++++++----- 1 file changed, 46 insertions(+), 12 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 7bbdc985ebcee..75e02f53b3310 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -255,6 +255,21 @@ def _validate(self): class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + """Initialize KubernetesJobWatcher, a background process that informs the + AirflowKubernetesScheduler when tasks are completed. + + :param namespace: The namespace which will contain all tasks + :type namespace: str + :param watcher_queue: Used to inform the Scheduler of completed tasks + :type watcher_queue: multiprocessing.Queue + :param resource_version: A counter to indicate how many times a kubernetes resource changed + :type resource_version: str, but looks like an int, for example "0" + :param worker_uuid: A label selector used to locate pods that belong to this executor + :type worker_uuid: str + :param kube_config: Configuration for Kubernetes + :type kube_config: KubeConfig + + """ multiprocessing.Process.__init__(self) self.namespace = namespace self.worker_uuid = worker_uuid @@ -277,8 +292,8 @@ def run(self): self.log.exception('Unknown error in KubernetesJobWatcher. Failing') raise else: - self.log.warning('Watch died gracefully, starting back up with: ' - 'last resource_version: %s', self.resource_version) + self.log.info('Watcher will start back up with: ' + 'last resource_version: %s', self.resource_version) def _run(self, kube_client, resource_version, worker_uuid, kube_config): self.log.info( @@ -288,15 +303,34 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} + if resource_version: kwargs['resource_version'] = resource_version + if kube_config.kube_client_request_args: for key, value in kube_config.kube_client_request_args.items(): kwargs[key] = value - last_resource_version = None - for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, - **kwargs): + if resource_version: + last_resource_version = resource_version + else: + last_resource_version = None + + event_generator = watcher.stream(kube_client.list_namespaced_pod, + self.namespace, + **kwargs) + while True: + try: + event = next(event_generator) + except StopIteration: + break + except ReadTimeoutError: + self.log.info("Timed out waiting for an event.") + break + except HTTPError: + self.log.info("Terminating connection to kube-api.") + break + task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -382,13 +416,11 @@ def _make_kube_watcher(self): watcher.start() return watcher - def _health_check_kube_watcher(self): - if self.kube_watcher.is_alive(): - pass - else: + def _ensure_kube_watcher(self): + if not self.kube_watcher.is_alive(): + settings.Stats.incr("executor.kube_watcher.restarts") self.log.error( - 'Error while health checking kube watcher process. ' - 'Process died for unknown reasons') + 'Kubernetes job watcher process stopped. Restarting') self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job): @@ -446,7 +478,7 @@ def sync(self): :return: """ - self._health_check_kube_watcher() + self._ensure_kube_watcher() while True: try: task = self.watcher_queue.get_nowait() @@ -455,6 +487,8 @@ def sync(self): finally: self.watcher_queue.task_done() except Empty: + # When self.watcher_queue is empty, + # this function returns break def process_watcher_task(self, task):