Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use kubernetes queue in kubernetes hybrid executors #23048

Merged
merged 7 commits into from
May 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: Kuberne
self._job_id: Optional[int] = None
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE

@property
def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
Expand Down
9 changes: 6 additions & 3 deletions airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ def __init__(self):
self.scheduler_job_id: Optional[str] = None
self.event_scheduler: Optional[EventScheduler] = None
self.last_handled: Dict[TaskInstanceKey, float] = {}
self.kubernetes_queue: Optional[str] = None
super().__init__(parallelism=self.kube_config.parallelism)

@provide_session
Expand All @@ -456,9 +457,11 @@ def clear_not_launched_queued_tasks(self, session=None) -> None:
self.log.debug("Clearing tasks that have not been launched")
if not self.kube_client:
raise AirflowException(NOT_STARTED_MESSAGE)
queued_tis: List[TaskInstance] = (
session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED).all()
)

query = session.query(TaskInstance).filter(TaskInstance.state == State.QUEUED)
if self.kubernetes_queue:
query = query.filter(TaskInstance.queue == self.kubernetes_queue)
queued_tis: List[TaskInstance] = query.all()
self.log.info('Found %s queued task instances', len(queued_tis))

# Go through the "last seen" dictionary and clean out old entries
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, local_executor: LocalExecutor, kubernetes_executor: Kubernete
self._job_id: Optional[str] = None
self.local_executor = local_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.KUBERNETES_QUEUE

@property
def queued_tasks(self) -> Dict[TaskInstanceKey, QueuedTaskInstanceType]:
Expand Down
10 changes: 10 additions & 0 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from parameterized import parameterized

from airflow.configuration import conf
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.executors.kubernetes_executor import KubernetesExecutor
Expand Down Expand Up @@ -213,3 +214,12 @@ def test_job_id_setter(self):
job_id = 'this-job-id'
cel_k8s_exec.job_id = job_id
assert cel_exec.job_id == k8s_exec.job_id == cel_k8s_exec.job_id == job_id

def test_kubernetes_executor_knows_its_queue(self):
celery_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get(
'celery_kubernetes_executor', 'kubernetes_queue'
)
38 changes: 37 additions & 1 deletion tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,17 @@ def test_clear_not_launched_queued_tasks_not_launched(self, dag_maker, create_du
),
)

def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_dag, session):
@pytest.mark.parametrize(
'task_queue, kubernetes_queue',
[
pytest.param('default', None),
pytest.param('kubernetes', None),
pytest.param('kubernetes', 'kubernetes'),
],
)
def test_clear_not_launched_queued_tasks_launched(
self, dag_maker, create_dummy_dag, session, task_queue, kubernetes_queue
):
"""Leave the state alone if a pod already exists"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=["something"])
Expand All @@ -732,9 +742,11 @@ def test_clear_not_launched_queued_tasks_launched(self, dag_maker, create_dummy_
ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
ti.queue = task_queue
session.flush()

executor = self.kubernetes_executor
executor.kubernetes_queue = kubernetes_queue
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

Expand Down Expand Up @@ -800,6 +812,30 @@ def list_namespaced_pod(*args, **kwargs):
any_order=True,
)

def test_clear_not_launched_queued_tasks_not_launched_other_queue(
self, dag_maker, create_dummy_dag, session
):
"""Queued TI has no pod, but it is not queued for the k8s executor"""
mock_kube_client = mock.MagicMock()
mock_kube_client.list_namespaced_pod.return_value = k8s.V1PodList(items=[])

create_dummy_dag(dag_id="test_clear", task_id="task1", with_dagrun_type=None)
dag_run = dag_maker.create_dagrun()

ti = dag_run.task_instances[0]
ti.state = State.QUEUED
ti.queued_by_job_id = 1
session.flush()

executor = self.kubernetes_executor
executor.kubernetes_queue = 'kubernetes'
executor.kube_client = mock_kube_client
executor.clear_not_launched_queued_tasks(session=session)

ti.refresh_from_db()
assert ti.state == State.QUEUED
assert mock_kube_client.list_namespaced_pod.call_count == 0


class TestKubernetesJobWatcher(unittest.TestCase):
def setUp(self):
Expand Down
7 changes: 7 additions & 0 deletions tests/executors/test_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ def test_slots_available(self):

# Should be equal to Local Executor default parallelism.
assert local_kubernetes_executor.slots_available == conf.getint('core', 'PARALLELISM')

def test_kubernetes_executor_knows_its_queue(self):
local_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue')