Skip to content

Commit

Permalink
(fix): HybridExecutor tasks of other executor rescheduled in kubernet…
Browse files Browse the repository at this point in the history
…es executor (apache#43003)
  • Loading branch information
pavansharma36 authored Oct 20, 2024
1 parent f0740b3 commit 57500b6
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from typing import TYPE_CHECKING, Any, Sequence

from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update
from sqlalchemy import or_, select, update

from airflow.cli.cli_config import (
ARG_DAG_ID,
Expand All @@ -52,6 +52,7 @@
)
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import KUBERNETES_EXECUTOR
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
ADOPTED,
POD_EXECUTOR_DONE_KEY,
Expand Down Expand Up @@ -229,13 +230,30 @@ def clear_not_launched_queued_tasks(self, session: Session = NEW_SESSION) -> Non
assert self.kube_client
from airflow.models.taskinstance import TaskInstance

hybrid_executor_enabled = hasattr(TaskInstance, "executor")
default_executor = None
if hybrid_executor_enabled:
from airflow.executors.executor_loader import ExecutorLoader

default_executor = str(ExecutorLoader.get_default_executor_name())

with Stats.timer("kubernetes_executor.clear_not_launched_queued_tasks.duration"):
self.log.debug("Clearing tasks that have not been launched")
query = select(TaskInstance).where(
TaskInstance.state == TaskInstanceState.QUEUED, TaskInstance.queued_by_job_id == self.job_id
TaskInstance.state == TaskInstanceState.QUEUED,
TaskInstance.queued_by_job_id == self.job_id,
)
if self.kubernetes_queue:
query = query.where(TaskInstance.queue == self.kubernetes_queue)
elif hybrid_executor_enabled and KUBERNETES_EXECUTOR == default_executor:
query = query.where(
or_(
TaskInstance.executor == KUBERNETES_EXECUTOR,
TaskInstance.executor.is_(None),
),
)
elif hybrid_executor_enabled:
query = query.where(TaskInstance.executor == KUBERNETES_EXECUTOR)
queued_tis: list[TaskInstance] = session.scalars(query).all()
self.log.info("Found %s queued task instances", len(queued_tis))

Expand Down
213 changes: 213 additions & 0 deletions providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
from urllib3 import HTTPResponse

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
)
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.operators.empty import EmptyOperator
from airflow.providers.cncf.kubernetes import pod_generator
Expand Down Expand Up @@ -1277,6 +1283,7 @@ def test_kube_config_get_namespace_list(

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
Expand All @@ -1287,6 +1294,13 @@ def test_clear_not_launched_queued_tasks_not_launched(
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand Down Expand Up @@ -1320,6 +1334,7 @@ def test_clear_not_launched_queued_tasks_not_launched(
],
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
):
Expand Down Expand Up @@ -1350,6 +1365,13 @@ def test_clear_not_launched_queued_tasks_launched(
]
)

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand All @@ -1376,6 +1398,7 @@ def test_clear_not_launched_queued_tasks_launched(

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_mapped_task(self, mock_kube_dynamic_client, dag_maker, session):
"""One mapped task has a launched pod - other does not."""

Expand Down Expand Up @@ -1410,6 +1433,13 @@ def get(*args, **kwargs):
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.side_effect = get

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
with dag_maker(dag_id="test_clear"):
op = BashOperator.partial(task_id="bash").expand(bash_command=["echo 0", "echo 1"])

Expand Down Expand Up @@ -1443,13 +1473,21 @@ def get(*args, **kwargs):
)

@pytest.mark.db_test
@conf_vars({("core", "executor"): CELERY_KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_queue(
self, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand All @@ -1470,7 +1508,175 @@ def test_clear_not_launched_queued_tasks_not_launched_other_queue(
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.executor = "CeleryExecutor"
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.executor == "CeleryExecutor"
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): CELERY_EXECUTOR})
def test_clear_not_launched_queued_tasks_not_launched_other_default_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched_none_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_dynamic_client.return_value.get.call_count == 1

@pytest.mark.db_test
@pytest.mark.skipif(
not hasattr(TaskInstance, "executor"), reason="Hybrid executor added in later version"
)
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_launched_kubernetes_executor(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_pod_resource = mock.MagicMock()
mock_kube_dynamic_client.return_value.resources.get.return_value = mock_pod_resource
mock_kube_dynamic_client.return_value.get.return_value.items = []

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.executor = KUBERNETES_EXECUTOR
session.flush()

executor = self.kubernetes_executor
executor.job_id = 1

executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.SCHEDULED
assert mock_kube_dynamic_client.return_value.get.call_count == 1

@pytest.mark.db_test
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
@conf_vars({("core", "executor"): KUBERNETES_EXECUTOR})
def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
self, mock_kube_dynamic_client, dag_maker, create_dummy_dag, session
):
Expand All @@ -1479,6 +1685,13 @@ def test_clear_not_launched_queued_tasks_clear_only_by_job_id(
mock_kube_dynamic_client.return_value = mock.MagicMock()
mock_kube_dynamic_client.return_value.get.return_value = k8s.V1PodList(items=[])

# This is hack to use overridden conf vars as it seems executors loaded before conf override.
if hasattr(TaskInstance, "executor"):
import importlib

from airflow.executors import executor_loader

importlib.reload(executor_loader)
create_dummy_dag(dag_id="test_clear_0", task_id="task0", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

Expand Down

0 comments on commit 57500b6

Please sign in to comment.