Skip to content

Commit

Permalink
[ASTRO-364] Handle timeout watching for tasks to finish (apache#64)
Browse files Browse the repository at this point in the history
(cherry picked from commit e6debf3)
(cherry picked from commit 564e85e)
(cherry picked from commit 0d438e6)
(cherry picked from commit 2d15077)
(cherry picked from commit 243d9f9)
  • Loading branch information
sjmiller609 authored and kaxil committed Dec 10, 2020
1 parent 3a1d023 commit 64a0545
Showing 1 changed file with 47 additions and 11 deletions.
58 changes: 47 additions & 11 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from dateutil import parser
from kubernetes import watch, client
from kubernetes.client.rest import ApiException
from urllib3.exceptions import HTTPError, ReadTimeoutError
from urllib3.exceptions import HTTPError, ReadTimeoutError, HTTPError

from airflow import settings
from airflow.configuration import conf
Expand Down Expand Up @@ -270,6 +270,21 @@ def __init__(self,
resource_version,
worker_uuid,
kube_config):
"""Initialize KubernetesJobWatcher, a background process that informs the
AirflowKubernetesScheduler when tasks are completed.
:param namespace: The namespace which will contain all tasks
:type namespace: str
:param watcher_queue: Used to inform the Scheduler of completed tasks
:type watcher_queue: multiprocessing.Queue
:param resource_version: A counter to indicate how many times a kubernetes resource changed
:type resource_version: str, but looks like an int, for example "0"
:param worker_uuid: A label selector used to locate pods that belong to this executor
:type worker_uuid: str
:param kube_config: Configuration for Kubernetes
:type kube_config: KubeConfig
"""
multiprocessing.Process.__init__(self)
self.namespace = namespace
self.multi_namespace_mode = multi_namespace_mode
Expand All @@ -293,8 +308,8 @@ def run(self):
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
raise
else:
self.log.warning('Watch died gracefully, starting back up with: '
'last resource_version: %s', self.resource_version)
self.log.info('Watcher will start back up with: '
'last resource_version: %s', self.resource_version)

def _run(self, kube_client, resource_version, worker_uuid, kube_config):
self.log.info(
Expand All @@ -304,8 +319,10 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
watcher = watch.Watch()

kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}

if resource_version:
kwargs['resource_version'] = resource_version

if kube_config.kube_client_request_args:
for key, value in kube_config.kube_client_request_args.items():
kwargs[key] = value
Expand All @@ -320,7 +337,26 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
kube_client.list_namespaced_pod,
self.namespace,
**kwargs)
for event in list_worker_pods():

if resource_version:
last_resource_version = resource_version
else:
last_resource_version = None

event_generator = list_worker_pods()

while True:
try:
event = next(event_generator)
except StopIteration:
break
except ReadTimeoutError:
self.log.info("Timed out waiting for an event.")
break
except HTTPError:
self.log.info("Terminating connection to kube-api.")
break

task = event['object']
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -410,13 +446,11 @@ def _make_kube_watcher(self):
watcher.start()
return watcher

def _health_check_kube_watcher(self):
if self.kube_watcher.is_alive():
pass
else:
def _ensure_kube_watcher(self):
if not self.kube_watcher.is_alive():
settings.Stats.incr("executor.kube_watcher.restarts")
self.log.error(
'Error while health checking kube watcher process. '
'Process died for unknown reasons')
'Kubernetes job watcher process stopped. Restarting')
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job):
Expand Down Expand Up @@ -481,7 +515,7 @@ def sync(self):
:return:
"""
self._health_check_kube_watcher()
self._ensure_kube_watcher()
while True:
try:
task = self.watcher_queue.get_nowait()
Expand All @@ -490,6 +524,8 @@ def sync(self):
finally:
self.watcher_queue.task_done()
except Empty:
# When self.watcher_queue is empty,
# this function returns
break

def process_watcher_task(self, task):
Expand Down

0 comments on commit 64a0545

Please sign in to comment.