Skip to content

Commit

Permalink
Prevent KubernetesJobWatcher getting stuck on resource too old (#23521)
Browse files Browse the repository at this point in the history
* Prevent KubernetesJobWatcher getting stuck on resource too old

If the watch fails because "resource too old" the
KubernetesJobWatcher should not retry with the same resource version
as that will end up in loop where there is no progress.

* Reset ResourceVersion().resource_version to 0

(cherry picked from commit dee05b2ebca6ab66f1b447837e11fe204f98b2df)

GitOrigin-RevId: 5a94a165b3654afdf468704f0af1821ae3e8aa0c
  • Loading branch information
ecerulm authored and Cloud Composer Team committed Sep 12, 2024
1 parent 4ba1f90 commit eb04d4f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
3 changes: 3 additions & 0 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def run(self) -> None:
time.sleep(1)
except Exception:
self.log.exception('Unknown error in KubernetesJobWatcher. Failing')
self.resource_version = "0"
ResourceVersion().resource_version = "0"
raise
else:
self.log.warning(
Expand Down Expand Up @@ -288,6 +290,7 @@ def _health_check_kube_watcher(self):
self.log.error(
'Error while health checking kube watcher process. Process died for unknown reasons'
)
ResourceVersion().resource_version = "0"
self.kube_watcher = self._make_kube_watcher()

def run_next(self, next_job: KubernetesJobType) -> None:
Expand Down
34 changes: 34 additions & 0 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
AirflowKubernetesScheduler,
KubernetesExecutor,
KubernetesJobWatcher,
ResourceVersion,
create_pod_id,
get_base_pod_from_template,
)
Expand Down Expand Up @@ -957,3 +958,36 @@ def test_process_error_event_for_raise_if_not_410(self):
f"Kubernetes failure for {raw_object['reason']} "
f"with code {raw_object['code']} and message: {raw_object['message']}"
)

def test_recover_from_resource_too_old(self):
# too old resource
mock_underscore_run = mock.MagicMock()

def effect():
yield '500'
while True:
yield Exception('sentinel')

mock_underscore_run.side_effect = effect()

self.watcher._run = mock_underscore_run

with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
try:
# self.watcher._run() is mocked and return "500" as last resource_version
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

# both resource_version should be 0 after _run raises and exception
assert self.watcher.resource_version == '0'
assert ResourceVersion().resource_version == '0'

# check that in the next run, _run is invoked with resource_version = 0
mock_underscore_run.reset_mock()
try:
self.watcher.run()
except Exception as e:
assert e.args == ('sentinel',)

mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)

0 comments on commit eb04d4f

Please sign in to comment.