diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 9b9de71681cf6..c76cf58f418d4 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -109,6 +109,8 @@ def run(self) -> None: time.sleep(1) except Exception: self.log.exception('Unknown error in KubernetesJobWatcher. Failing') + self.resource_version = "0" + ResourceVersion().resource_version = "0" raise else: self.log.warning( @@ -288,6 +290,7 @@ def _health_check_kube_watcher(self): self.log.error( 'Error while health checking kube watcher process. Process died for unknown reasons' ) + ResourceVersion().resource_version = "0" self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job: KubernetesJobType) -> None: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index a677fe598b47f..a332a2fd6adfa 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -39,6 +39,7 @@ AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher, + ResourceVersion, create_pod_id, get_base_pod_from_template, ) @@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self): f"Kubernetes failure for {raw_object['reason']} " f"with code {raw_object['code']} and message: {raw_object['message']}" ) + + def test_recover_from_resource_too_old(self): + # too old resource + mock_underscore_run = mock.MagicMock() + + def effect(): + yield '500' + while True: + yield Exception('sentinel') + + mock_underscore_run.side_effect = effect() + + self.watcher._run = mock_underscore_run + + with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'): + try: + # self.watcher._run() is mocked and return "500" as last resource_version + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + # both resource_version should be 0 after _run raises and exception + assert self.watcher.resource_version == '0' + assert ResourceVersion().resource_version == '0' + + # check that in the next run, _run is invoked with resource_version = 0 + mock_underscore_run.reset_mock() + try: + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)