Skip to content

Commit

Permalink
[ASTRO-364] Handle timeout watching for tasks to finish (apache#64)
Browse files Browse the repository at this point in the history
(cherry picked from commit e6debf3)
(cherry picked from commit 564e85e)
  • Loading branch information
sjmiller609 authored and kaxil committed Apr 16, 2020
1 parent 6ed59bf commit 0d438e6
Showing 1 changed file with 46 additions and 12 deletions.
58 changes: 46 additions & 12 deletions airflow/contrib/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,21 @@ def _validate(self):
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs"""
def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config):
"""Initialize KubernetesJobWatcher, a background process that informs the
AirflowKubernetesScheduler when tasks are completed.
:param namespace: The namespace which will contain all tasks
:type namespace: str
:param watcher_queue: Used to inform the Scheduler of completed tasks
:type watcher_queue: multiprocessing.Queue
:param resource_version: A counter to indicate how many times a kubernetes resource changed
:type resource_version: str, but looks like an int, for example "0"
:param worker_uuid: A label selector used to locate pods that belong to this executor
:type worker_uuid: str
:param kube_config: Configuration for Kubernetes
:type kube_config: KubeConfig
"""
multiprocessing.Process.__init__(self)
self.namespace = namespace
self.worker_uuid = worker_uuid
Expand All @@ -351,8 +366,8 @@ def run(self):
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
raise
else:
self.log.warning('Watch died gracefully, starting back up with: '
'last resource_version: %s', self.resource_version)
self.log.info('Watcher will start back up with: '
'last resource_version: %s', self.resource_version)

def _run(self, kube_client, resource_version, worker_uuid, kube_config):
self.log.info(
Expand All @@ -362,15 +377,34 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config):
watcher = watch.Watch()

kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)}

if resource_version:
kwargs['resource_version'] = resource_version

if kube_config.kube_client_request_args:
for key, value in kube_config.kube_client_request_args.items():
kwargs[key] = value

last_resource_version = None
for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace,
**kwargs):
if resource_version:
last_resource_version = resource_version
else:
last_resource_version = None

event_generator = watcher.stream(kube_client.list_namespaced_pod,
self.namespace,
**kwargs)
while True:
try:
event = next(event_generator)
except StopIteration:
break
except ReadTimeoutError:
self.log.info("Timed out waiting for an event.")
break
except HTTPError:
self.log.info("Terminating connection to kube-api.")
break

task = event['object']
self.log.info(
'Event: %s had an event of type %s',
Expand Down Expand Up @@ -457,13 +491,11 @@ def _make_kube_watcher(self):
watcher.start()
return watcher

def _health_check_kube_watcher(self):
if self.kube_watcher.is_alive():
pass
else:
def _ensure_kube_watcher(self):
if not self.kube_watcher.is_alive():
settings.Stats.incr("executor.kube_watcher.restarts")
self.log.error(
'Error while health checking kube watcher process. '
'Process died for unknown reasons')
'Kubernetes job watcher process stopped. Restarting')
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job):
Expand Down Expand Up @@ -511,7 +543,7 @@ def sync(self):
:return:
"""
self._health_check_kube_watcher()
self._ensure_kube_watcher()
while True:
try:
task = self.watcher_queue.get_nowait()
Expand All @@ -520,6 +552,8 @@ def sync(self):
finally:
self.watcher_queue.task_done()
except Empty:
# When self.watcher_queue is empty,
# this function returns
break

def process_watcher_task(self, task):
Expand Down

0 comments on commit 0d438e6

Please sign in to comment.