From eb04d4fab6da5b807e54b3a69e0a88c7f70a0bdc Mon Sep 17 00:00:00 2001 From: Ruben Laguna Date: Wed, 11 May 2022 08:25:49 +0200 Subject: [PATCH] Prevent KubernetesJobWatcher getting stuck on resource too old (#23521) * Prevent KubernetesJobWatcher getting stuck on resource too old If the watch fails because "resource too old" the KubernetesJobWatcher should not retry with the same resource version as that will end up in loop where there is no progress. * Reset ResourceVersion().resource_version to 0 (cherry picked from commit dee05b2ebca6ab66f1b447837e11fe204f98b2df) GitOrigin-RevId: 5a94a165b3654afdf468704f0af1821ae3e8aa0c --- airflow/executors/kubernetes_executor.py | 3 ++ tests/executors/test_kubernetes_executor.py | 34 +++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 9b9de71681..c76cf58f41 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -109,6 +109,8 @@ def run(self) -> None: time.sleep(1) except Exception: self.log.exception('Unknown error in KubernetesJobWatcher. Failing') + self.resource_version = "0" + ResourceVersion().resource_version = "0" raise else: self.log.warning( @@ -288,6 +290,7 @@ def _health_check_kube_watcher(self): self.log.error( 'Error while health checking kube watcher process. Process died for unknown reasons' ) + ResourceVersion().resource_version = "0" self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job: KubernetesJobType) -> None: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 1a4160d9ce..954f4f0600 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -39,6 +39,7 @@ AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher, + ResourceVersion, create_pod_id, get_base_pod_from_template, ) @@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self): f"Kubernetes failure for {raw_object['reason']} " f"with code {raw_object['code']} and message: {raw_object['message']}" ) + + def test_recover_from_resource_too_old(self): + # too old resource + mock_underscore_run = mock.MagicMock() + + def effect(): + yield '500' + while True: + yield Exception('sentinel') + + mock_underscore_run.side_effect = effect() + + self.watcher._run = mock_underscore_run + + with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'): + try: + # self.watcher._run() is mocked and return "500" as last resource_version + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + # both resource_version should be 0 after _run raises and exception + assert self.watcher.resource_version == '0' + assert ResourceVersion().resource_version == '0' + + # check that in the next run, _run is invoked with resource_version = 0 + mock_underscore_run.reset_mock() + try: + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)