Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix "Reason: Expired: too old resource version: 379140622 (380367990)" #23504

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def _run(
for key, value in kube_config.kube_client_request_args.items():
kwargs[key] = value

last_resource_version: Optional[str] = None
last_resource_version: str = "0"
if self.multi_namespace_mode:
list_worker_pods = functools.partial(
watcher.stream, kube_client.list_pod_for_all_namespaces, **kwargs
Expand Down Expand Up @@ -167,7 +167,9 @@ def _run(
resource_version=task.metadata.resource_version,
event=event,
)
last_resource_version = task.metadata.resource_version
task_resource_version = task.metadata.resource_version
if task_resource_version:
last_resource_version = str(max(int(last_resource_version), int(task_resource_version)))

return last_resource_version

Expand Down
32 changes: 30 additions & 2 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ def setUp(self):
)
self.events = []

def _run(self):
def _run(self, assert_resource_version=True):
with mock.patch('airflow.executors.kubernetes_executor.watch') as mock_watch:
mock_watch.Watch.return_value.stream.return_value = self.events
latest_resource_version = self.watcher._run(
Expand All @@ -875,7 +875,9 @@ def _run(self):
self.watcher.scheduler_job_id,
self.watcher.kube_config,
)
assert self.pod.metadata.resource_version == latest_resource_version
if assert_resource_version:
assert self.pod.metadata.resource_version == latest_resource_version
return latest_resource_version

def assert_watcher_queue_called_once_with_state(self, state):
self.watcher.watcher_queue.put.assert_called_once_with(
Expand Down Expand Up @@ -957,3 +959,29 @@ def test_process_error_event_for_raise_if_not_410(self):
f"Kubernetes failure for {raw_object['reason']} "
f"with code {raw_object['code']} and message: {raw_object['message']}"
)

def test_last_resource_version_even_if_watch_unsorted(self):
pod1 = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name="pod1",
annotations={"airflow-worker": "bar", **self.core_annotations},
namespace="airflow",
resource_version="900",
),
status=k8s.V1PodStatus(phase="Running"),
)
self.events.append({"type": 'MODIFIED', "object": pod1})

pod2 = k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name="pod2",
annotations={"airflow-worker": "bar", **self.core_annotations},
namespace="airflow",
resource_version="800",
),
status=k8s.V1PodStatus(phase="Running"),
)
self.events.append({"type": 'MODIFIED', "object": pod2})

resource_version = self._run(False)
assert resource_version == "900"