From 371ee6ea88479202ec8e1b942def40e1078ebaf6 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 22 Nov 2019 16:49:50 -0500 Subject: [PATCH 01/10] Adding fix for AIRFLOW-604 --- airflow/executors/kubernetes_executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index ae566bbcdc52d..7f652b7412f23 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -265,12 +265,12 @@ def run(self): def _run(self, kube_client, resource_version, worker_uuid, kube_config): self.log.info( - 'Event: and now my watch begins starting at resource_version: %s', - resource_version - ) + 'Event: and now my watch begins starting at resource_version: %s, ' + 'worker_uuid: %s', resource_version, worker_uuid) watcher = watch.Watch() - kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} + kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), + 'timeout_seconds': 50 } if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From e8261a774f63f96270a2e5591d1be284f5b73349 Mon Sep 17 00:00:00 2001 From: Max Date: Fri, 22 Nov 2019 17:00:47 -0500 Subject: [PATCH 02/10] Adding fix for AIRFLOW-6040 --- airflow/executors/kubernetes_executor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index ae566bbcdc52d..7f652b7412f23 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -265,12 +265,12 @@ def run(self): def _run(self, kube_client, resource_version, worker_uuid, kube_config): self.log.info( - 'Event: and now my watch begins starting at resource_version: %s', - resource_version - ) + 'Event: and now my watch begins starting at resource_version: %s, ' + 'worker_uuid: %s', resource_version, worker_uuid) watcher = watch.Watch() - kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} + kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), + 'timeout_seconds': 50 } if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From a48d5b757275a4a616631c144ff93980e37105ed Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Sat, 23 Nov 2019 10:54:08 -0500 Subject: [PATCH 03/10] Removing whitespace --- airflow/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 7f652b7412f23..1460d13dc8f4a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -270,7 +270,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': 50 } + 'timeout_seconds': 50} if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From 5db44cc42e9acfa060d684fe01db61679b892af6 Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Sat, 23 Nov 2019 11:44:01 -0500 Subject: [PATCH 04/10] Indentation fix --- airflow/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 1460d13dc8f4a..e82b072394f8b 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -270,7 +270,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': 50} + 'timeout_seconds': 50} if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From f6dce89819506eefd11681d2a13a57c002f6e89c Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Mon, 25 Nov 2019 08:18:35 -0500 Subject: [PATCH 05/10] Over indention --- airflow/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index e82b072394f8b..3ca18f642affb 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -270,7 +270,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': 50} + 'timeout_seconds': 50} if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From 0a0ab3d2e727df45ca86ef94d3f12a0ff41ec9e8 Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Mon, 25 Nov 2019 08:43:00 -0500 Subject: [PATCH 06/10] One more time to fix flake8 issue --- airflow/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ca18f642affb..fbf0fb9510e17 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -270,7 +270,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): watcher = watch.Watch() kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': 50} + 'timeout_seconds': 50} if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From 1bb409ee5677873acf7df6ab286e93162a6e3bca Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Sat, 14 Dec 2019 13:48:38 -0500 Subject: [PATCH 07/10] Updating timeout definition --- airflow/executors/kubernetes_executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index fbf0fb9510e17..611060b5820a4 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -268,9 +268,9 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): 'Event: and now my watch begins starting at resource_version: %s, ' 'worker_uuid: %s', resource_version, worker_uuid) watcher = watch.Watch() - + watcher_timeout = max(kube_config.kube_client_request_args.get('_request_timeout', [60, 60])[0] - 1, 1) kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': 50} + 'timeout_seconds': watcher_timeout} if resource_version: kwargs['resource_version'] = resource_version if kube_config.kube_client_request_args: From 67a0f3982482c4029d43deddf680cc4c6b59b283 Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Sat, 14 Dec 2019 14:24:11 -0500 Subject: [PATCH 08/10] Cleaning up method and ensuring >0 timeout --- airflow/executors/kubernetes_executor.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 611060b5820a4..c69236164887a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -268,14 +268,7 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): 'Event: and now my watch begins starting at resource_version: %s, ' 'worker_uuid: %s', resource_version, worker_uuid) watcher = watch.Watch() - watcher_timeout = max(kube_config.kube_client_request_args.get('_request_timeout', [60, 60])[0] - 1, 1) - kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid), - 'timeout_seconds': watcher_timeout} - if resource_version: - kwargs['resource_version'] = resource_version - if kube_config.kube_client_request_args: - for key, value in kube_config.kube_client_request_args.iteritems(): - kwargs[key] = value + kwargs = self._get_watcher_args(resource_version, worker_uuid, kube_config) last_resource_version = None for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, @@ -295,6 +288,18 @@ def _run(self, kube_client, resource_version, worker_uuid, kube_config): return last_resource_version + def _get_watcher_args(self, resource_version, worker_uuid, kube_config): + """Builds and returns the kwargs necessary for Watcher""" + kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} + if resource_version: + kwargs['resource_version'] = resource_version + if kube_config.kube_client_request_args: + for key, value in kube_config.kube_client_request_args.iteritems(): + kwargs[key] = value + conn_timeout = kube_config.kube_client_request_args.get('_request_timeout', [60, 60])[0] + kwargs['timeout_seconds'] = conn_timeout - 1 if conn_timeout - 1 > 0 else 1 + return kwargs + def process_error(self, event): """Process error response""" self.log.error( From 72799cb92e49cdcf7054a06edd08954874f11d18 Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Sat, 14 Dec 2019 14:53:32 -0500 Subject: [PATCH 09/10] Adding method typings --- airflow/executors/kubernetes_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 562fdd1b68b47..5272d333dec0a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -308,7 +308,10 @@ def _run(self, return last_resource_version - def _get_watcher_args(self, resource_version, worker_uuid, kube_config): + def _get_watcher_args(self, + resource_version: Optional[str], + worker_uuid: str, + kube_config: Any) -> dict: """Builds and returns the kwargs necessary for Watcher""" kwargs = {'label_selector': 'airflow-worker={}'.format(worker_uuid)} if resource_version: From 981365c796a1400e80211df9ac2b53ad794cb2fa Mon Sep 17 00:00:00 2001 From: Max <1676208+maxirus@users.noreply.github.com> Date: Tue, 17 Dec 2019 08:35:20 -0500 Subject: [PATCH 10/10] Adding recommended change --- airflow/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 5272d333dec0a..2ee617f3f5a8a 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -320,7 +320,7 @@ def _get_watcher_args(self, for key, value in kube_config.kube_client_request_args.iteritems(): kwargs[key] = value conn_timeout = kube_config.kube_client_request_args.get('_request_timeout', [60, 60])[0] - kwargs['timeout_seconds'] = conn_timeout - 1 if conn_timeout - 1 > 0 else 1 + kwargs['timeout_seconds'] = max(conn_timeout - 1, 1) return kwargs def process_error(self, event: Any) -> str: