diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 0df4e064c6ca3..1a009ac65f488 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -29,7 +29,7 @@ import kubernetes from kubernetes import watch, client from kubernetes.client.rest import ApiException -from urllib3.exceptions import HTTPError +from urllib3.exceptions import HTTPError, ReadTimeoutError from airflow.configuration import conf from airflow.contrib.kubernetes.istio import Istio @@ -320,6 +320,21 @@ def _validate(self): class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + """Initialize KubernetesJobWatcher, a background process that informs the + AirflowKubernetesScheduler when tasks are completed. + + :param namespace: The namespace which will contain all tasks + :type namespace: str + :param watcher_queue: Used to inform the Scheduler of completed tasks + :type watcher_queue: multiprocessing.Queue + :param resource_version: A counter to indicate how many times a kubernetes resource changed + :type resource_version: str, but looks like an int, for example "0" + :param worker_uuid: A label selector used to locate pods that belong to this executor + :type worker_uuid: str + :param kube_config: Configuration for Kubernetes + :type kube_config: KubeConfig + + """ multiprocessing.Process.__init__(self) self.namespace = namespace self.worker_uuid = worker_uuid @@ -339,8 +354,8 @@ def run(self): self.log.exception('Unknown error in KubernetesJobWatcher. Failing') raise else: - self.log.warning('Watch died gracefully, starting back up with: ' - 'last resource_version: %s', self.resource_version) + self.log.info('Watcher will start back up with: ' + 'last resource_version: %s', self.resource_version) def _run(self, kube_client, resource_version, worker_uuid, kube_config): self.log.info( @@ -350,15 +365,34 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} + if resource_version: kwargs['resource_version'] = resource_version + if kube_config.kube_client_request_args: for key, value in kube_config.kube_client_request_args.items(): kwargs[key] = value - last_resource_version = None - for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, - **kwargs): + if resource_version: + last_resource_version = resource_version + else: + last_resource_version = None + + event_generator = watcher.stream(kube_client.list_namespaced_pod, + self.namespace, + **kwargs) + while True: + try: + event = next(event_generator) + except StopIteration: + break + except ReadTimeoutError: + self.log.info("Timed out waiting for an event.") + break + except HTTPError: + self.log.info("Terminating connection to kube-api.") + break + task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -437,13 +471,11 @@ def _make_kube_watcher(self): watcher.start() return watcher - def _health_check_kube_watcher(self): - if self.kube_watcher.is_alive(): - pass - else: + def _ensure_kube_watcher(self): + if not self.kube_watcher.is_alive(): + settings.Stats.incr("executor.kube_watcher.restarts") self.log.error( - 'Error while health checking kube watcher process. ' - 'Process died for unknown reasons') + 'Kubernetes job watcher process stopped. Restarting') self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job): @@ -491,7 +523,7 @@ def sync(self): :return: """ - self._health_check_kube_watcher() + self._ensure_kube_watcher() while True: try: task = self.watcher_queue.get_nowait() @@ -500,6 +532,8 @@ def sync(self): finally: self.watcher_queue.task_done() except Empty: + # When self.watcher_queue is empty, + # this function returns break def process_watcher_task(self, task):