Skip to content

Commit

Permalink
Kubernetes executor running slots leak fix (#36240)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: gopal <[email protected]>
(cherry picked from commit 49108e1)
  • Loading branch information
dirrao authored and ephraimbuddy committed Jan 11, 2024
1 parent 185c9ff commit a6a305f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 10 deletions.
16 changes: 14 additions & 2 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@
raise
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.utils.event_scheduler import EventScheduler
Expand Down Expand Up @@ -450,14 +453,23 @@ def sync(self) -> None:
def _change_state(
self,
key: TaskInstanceKey,
state: TaskInstanceState | None,
state: TaskInstanceState | str | None,
pod_name: str,
namespace: str,
session: Session = NEW_SESSION,
) -> None:
if TYPE_CHECKING:
assert self.kube_scheduler

if state == ADOPTED:
# When the task pod is adopted by another executor,
# then remove the task from the current executor running queue.
try:
self.running.remove(key)
except KeyError:
self.log.debug("TI key not in running: %s", key)
return

if state == TaskInstanceState.RUNNING:
self.event_buffer[key] = state, None
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union

ADOPTED = "adopted"
if TYPE_CHECKING:
from airflow.executors.base_executor import CommandType
from airflow.models.taskinstance import TaskInstanceKey
Expand All @@ -27,10 +28,10 @@
KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]

# key, pod state, pod_name, namespace, resource_version
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[TaskInstanceState], str, str, str]
KubernetesResultsType = Tuple[TaskInstanceKey, Optional[Union[TaskInstanceState, str]], str, str, str]

# pod_name, namespace, pod state, annotations, resource_version
KubernetesWatchType = Tuple[str, str, Optional[TaskInstanceState], Dict[str, str], str]
KubernetesWatchType = Tuple[str, str, Optional[Union[TaskInstanceState, str]], Dict[str, str], str]

ALL_NAMESPACES = "ALL_NAMESPACES"
POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

try:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
ALL_NAMESPACES,
POD_EXECUTOR_DONE_KEY,
)
Expand Down Expand Up @@ -220,7 +221,13 @@ def process_status(
pod = event["object"]
annotations_string = annotations_for_logging_task_metadata(annotations)
"""Process status response."""
if status == "Pending":
if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
# This will happen only when the task pods are adopted by another executor.
# So, there is no change in the pod state.
# However, need to free the executor slot from the current executor.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name, annotations_string)
self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, resource_version))
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful deletion is requested.
# since kube server have received request to delete pod set TI state failed
if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
KubernetesExecutor,
PodReconciliationError,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import POD_EXECUTOR_DONE_KEY
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
)
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import (
AirflowKubernetesScheduler,
KubernetesJobWatcher,
Expand Down Expand Up @@ -644,6 +647,25 @@ def test_change_state_none(
finally:
executor.end()

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
@mock.patch(
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
)
def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher):
executor = self.kubernetes_executor
executor.start()
try:
key = ("dag_id", "task_id", "run_id", "try_number2")
executor.running = {key}
executor._change_state(key, ADOPTED, "pod_name", "default")
assert len(executor.event_buffer) == 0
assert len(executor.running) == 0
mock_delete_pod.assert_not_called()
finally:
executor.end()

@pytest.mark.db_test
@pytest.mark.parametrize(
"multi_namespace_mode_namespace_list, watchers_keys",
Expand Down Expand Up @@ -1372,12 +1394,31 @@ def test_process_status_succeeded_dedup_timestamp(self):
self._run()
self.watcher.watcher_queue.put.assert_not_called()

def test_process_status_succeeded_type_delete(self):
self.pod.status.phase = "Succeeded"
@pytest.mark.parametrize(
"ti_state",
[
TaskInstanceState.SUCCESS,
TaskInstanceState.FAILED,
TaskInstanceState.RUNNING,
TaskInstanceState.QUEUED,
TaskInstanceState.UP_FOR_RETRY,
],
)
def test_process_status_pod_adopted(self, ti_state):
self.pod.status.phase = ti_state
self.events.append({"type": "DELETED", "object": self.pod})
self.pod.metadata.deletion_timestamp = None

self._run()
self.watcher.watcher_queue.put.assert_not_called()
self.watcher.watcher_queue.put.assert_called_once_with(
(
self.pod.metadata.name,
self.watcher.namespace,
ADOPTED,
self.core_annotations,
self.pod.metadata.resource_version,
)
)

def test_process_status_running_deleted(self):
self.pod.status.phase = "Running"
Expand Down

0 comments on commit a6a305f

Please sign in to comment.