Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kubernetes executor running slots leak fix #36240

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ def _change_state(
if TYPE_CHECKING:
assert self.kube_scheduler

if state == TaskInstanceState.ADOPTED:
# When the task pod is adopted by another scheduler,
# then remove the task from the current scheduler running queue.
dirrao marked this conversation as resolved.
Show resolved Hide resolved
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:
self.event_buffer[key] = state, None
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,15 @@ def process_status(
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
if status == "Pending":
if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
dirrao marked this conversation as resolved.
Show resolved Hide resolved
# This will happen only when the task pods are adopted by another scheduler.
dirrao marked this conversation as resolved.
Show resolved Hide resolved
# So, there is no change in the pod state.
# However, need to free the executor slot from the current scheduler.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string)
self.watcher_queue.put(
(pod_name, namespace, TaskInstanceState.ADOPTED, annotations, resource_version)
)
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful deletion is requested.
# since kube server have received request to delete pod set TI state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
Expand Down
3 changes: 3 additions & 0 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class TaskInstanceState(str, Enum):
REMOVED = "removed" # Task vanished from DAG before it ran
SCHEDULED = "scheduled" # Task should run and will be handed to executor soon

# Set by executor
ADOPTED = "adopted"

dirrao marked this conversation as resolved.
Show resolved Hide resolved
# Set by the task instance itself
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,25 @@ def test_change_state_none(
finally:
executor.end()

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
)
def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
executor._change_state(key, TaskInstanceState.ADOPTED, "pod_name", "default")
assert len(executor.event_buffer) == 0
assert len(executor.running) == 0
mock_delete_pod.assert_not_called()
finally:
executor.end()

@pytest.mark.db_test
@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
Expand Down Expand Up @@ -1431,12 +1450,31 @@ def test_process_status_succeeded_dedup_timestamp(self):
self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_type_delete(self):
self.pod.status.phase = "Succeeded"
@pytest.mark.parametrize(
"ti_state",
[
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.RUNNING,
TaskInstanceState.QUEUED,
TaskInstanceState.UP_FOR_RETRY,
],
)
def test_process_status_pod_adopted(self, ti_state):
self.pod.status.phase = ti_state
self.events.append({"type": "DELETED", "object": self.pod})
self.pod.metadata.deletion_timestamp = None

self._run()
self.watcher.watcher_queue.put.assert_not_called()
self.watcher.watcher_queue.put.assert_called_once_with(
(
self.pod.metadata.name,
self.watcher.namespace,
TaskInstanceState.ADOPTED,
self.core_annotations,
self.pod.metadata.resource_version,
)
)

def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
Expand Down
Loading