Skip to content

Commit

Permalink
Handle kubernetes watcher stream disconnection
Browse files Browse the repository at this point in the history
Currently, when kubernetes watch stream times out and we get error 410,
we just return resourve version '0' which is not the latest version.

From the documentation, timing out is expected and we should handle it
by performing a list>watch>relist operation so we can continue watching
from the latest resource version. See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes

This PR follows the list>watch>relist pattern
  • Loading branch information
ephraimbuddy committed Apr 23, 2021
1 parent 150f225 commit 464b49e
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 23 deletions.
79 changes: 56 additions & 23 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,44 @@
KubernetesWatchType = Tuple[str, str, Optional[str], Dict[str, str], str]


class ResourceVersion:
"""Singleton for tracking resourceVersion from Kubernetes"""
class Borg:
"""Borg class making all instances share state"""

_instance = None
resource_version = "0"
_shared_state = {}

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.__dict__ = self._shared_state


class ResourceVersion(Borg):
"""Track resourceVersion from Kubernetes"""

def __init__(self, *, kube_client=None, namespace=None, resource_version=None):
Borg.__init__(self)
if resource_version:
# Update the state
self._shared_state.update(resource_version=resource_version)
if not hasattr(self, 'resource_version'):
if not (kube_client and namespace):
raise AirflowException("kube_client and namespace is required")
re_version = get_resource_version(kube_client, namespace)
self._shared_state.update(resource_version=re_version)

@classmethod
def _drop(cls):
"""Clear shared state (For testing purposes)"""
cls._shared_state = {}


def get_resource_version(kube_client: client.CoreV1Api, namespace):
"""
List pods to get the latest resource version
See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
"""
pod_list = kube_client.list_namespaced_pod(namespace)
resource_version = pod_list.metadata.resource_version
return resource_version


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
Expand All @@ -84,7 +112,7 @@ def __init__(
namespace: Optional[str],
multi_namespace_mode: bool,
watcher_queue: 'Queue[KubernetesWatchType]',
resource_version: Optional[str],
resource_version: str,
scheduler_job_id: Optional[str],
kube_config: Configuration,
):
Expand Down Expand Up @@ -123,21 +151,20 @@ def run(self) -> None:
def _run(
self,
kube_client: client.CoreV1Api,
resource_version: Optional[str],
resource_version: str,
scheduler_job_id: str,
kube_config: Any,
) -> Optional[str]:
self.log.info('Event: and now my watch begins starting at resource_version: %s', resource_version)
watcher = watch.Watch()

kwargs = {'label_selector': f'airflow-worker={scheduler_job_id}'}
if resource_version:
kwargs['resource_version'] = resource_version
kwargs['resource_version'] = resource_version
if kube_config.kube_client_request_args:
for key, value in kube_config.kube_client_request_args.items():
kwargs[key] = value

last_resource_version: Optional[str] = None
last_resource_version: str = resource_version
if self.multi_namespace_mode:
list_worker_pods = functools.partial(
watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
Expand All @@ -150,7 +177,7 @@ def _run(
task = event['object']
self.log.info('Event: %s had an event of type %s', task.metadata.name, event['type'])
if event['type'] == 'ERROR':
return self.process_error(event)
return self.process_error(event, kube_client)
annotations = task.metadata.annotations
task_instance_related_annotations = {
'dag_id': annotations['dag_id'],
Expand All @@ -171,16 +198,21 @@ def _run(

return last_resource_version

def process_error(self, event: Any) -> str:
def process_error(
self,
event: Any,
kube_client: client.CoreV1Api,
) -> str:
"""Process error response"""
self.log.error('Encountered Error response from k8s list namespaced pod stream => %s', event)
raw_object = event['raw_object']
if raw_object['code'] == 410:
self.log.info(
'Kubernetes resource version is too old, must reset to 0 => %s', (raw_object['message'],)
'Kubernetes resource version is too old, '
'relisting pods to get the latest version. Error => %s',
(raw_object['message'],),
)
# Return resource version 0
return '0'
return get_resource_version(kube_client, self.namespace)
raise AirflowException(
'Kubernetes failure for %s with code %s and message: %s'
% (raw_object['reason'], raw_object['code'], raw_object['message'])
Expand Down Expand Up @@ -265,7 +297,10 @@ def run_pod_async(self, pod: V1Pod, **kwargs):
return resp

def _make_kube_watcher(self) -> KubernetesJobWatcher:
resource_version = ResourceVersion().resource_version
resource_instance = ResourceVersion(
kube_client=self.kube_client, namespace=self.kube_config.kube_namespace
)
resource_version = resource_instance.resource_version # pylint: disable=no-member
watcher = KubernetesJobWatcher(
watcher_queue=self.watcher_queue,
namespace=self.kube_config.kube_namespace,
Expand Down Expand Up @@ -548,7 +583,6 @@ def sync(self) -> None:
if not self.task_queue:
raise AirflowException(NOT_STARTED_MESSAGE)
self.kube_scheduler.sync()

last_resource_version = None
while True: # pylint: disable=too-many-nested-blocks
try:
Expand All @@ -571,9 +605,8 @@ def sync(self) -> None:
self.result_queue.task_done()
except Empty:
break

resource_instance = ResourceVersion()
resource_instance.resource_version = last_resource_version or resource_instance.resource_version
if last_resource_version:
_ = ResourceVersion(resource_version=last_resource_version)

# pylint: disable=too-many-nested-blocks
for _ in range(self.kube_config.worker_pods_creation_batch_size):
Expand Down
37 changes: 37 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
get_base_pod_from_template,
)
Expand Down Expand Up @@ -596,3 +597,39 @@ def test_process_status_catchall(self):

self._run()
self.watcher.watcher_queue.put.assert_not_called()

@mock.patch('airflow.executors.kubernetes_executor.get_resource_version')
def test_process_error_event(self, mock_get_resource_version):
mock_get_resource_version.return_value = '43334'
self.pod.status.phase = 'Pending'
self.pod.metadata.resource_version = '43334'
raw_object = {"code": 410, "message": "too old resource version: 27272 (43334)"}
self.events.append({"type": "ERROR", "object": self.pod, "raw_object": raw_object})
self._run()
assert mock_get_resource_version.called


class TestResourceVersion(unittest.TestCase):
# pylint: disable=no-member
def tearDown(self) -> None:
ResourceVersion._drop()

def test_can_update_with_resource_version_arg(self):
resource_instance = ResourceVersion(resource_version='4567')
assert resource_instance.resource_version == '4567'

@mock.patch('airflow.executors.kubernetes_executor.get_resource_version')
def test_different_instance_share_state(self, mock_get_resource_version):
kube_client = mock.MagicMock()
mock_get_resource_version.return_value = '4566'
resource_instance = ResourceVersion(kube_client=kube_client, namespace='mynamespace')
resource_instance2 = ResourceVersion(kube_client=kube_client, namespace='mynamespace')

assert resource_instance.resource_version == '4566'
assert resource_instance2.resource_version == '4566'
resource_instance3 = ResourceVersion(resource_version='6787')
resource_instance4 = ResourceVersion(kube_client=kube_client, namespace='mynamespace')
assert resource_instance.resource_version == '6787'
assert resource_instance2.resource_version == '6787'
assert resource_instance3.resource_version == '6787'
assert resource_instance4.resource_version == '6787'

0 comments on commit 464b49e

Please sign in to comment.