Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement send_callback method for CeleryKubernetesExecutor and LocalKubernetesExecutor #23617

Merged
merged 3 commits into from
May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from typing import Dict, List, Optional, Set, Union

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.celery_executor import CeleryExecutor
Expand All @@ -35,6 +37,7 @@ class CeleryKubernetesExecutor(LoggingMixin):
"""

supports_ad_hoc_ti_run: bool = True
callback_sink: Optional[BaseCallbackSink] = None

KUBERNETES_QUEUE = conf.get('celery_kubernetes_executor', 'kubernetes_queue')

Expand Down Expand Up @@ -204,3 +207,12 @@ def debug_dump(self) -> None:
self.celery_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

:param request: Callback request to be executed.
"""
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)
12 changes: 12 additions & 0 deletions airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# under the License.
from typing import Dict, List, Optional, Set, Union

from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.base_executor import CommandType, EventBufferValueType, QueuedTaskInstanceType
from airflow.executors.kubernetes_executor import KubernetesExecutor
Expand All @@ -35,6 +37,7 @@ class LocalKubernetesExecutor(LoggingMixin):
"""

supports_ad_hoc_ti_run: bool = True
callback_sink: Optional[BaseCallbackSink] = None

KUBERNETES_QUEUE = conf.get('local_kubernetes_executor', 'kubernetes_queue')

Expand Down Expand Up @@ -203,3 +206,12 @@ def debug_dump(self) -> None:
self.local_executor.debug_dump()
self.log.info("Dumping KubernetesExecutor state")
self.kubernetes_executor.debug_dump()

def send_callback(self, request: CallbackRequest) -> None:
"""Sends callback for execution.

:param request: Callback request to be executed.
"""
if not self.callback_sink:
raise ValueError("Callback sink is not ready.")
self.callback_sink.send(request)
12 changes: 12 additions & 0 deletions tests/executors/test_celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from parameterized import parameterized

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.celery_executor import CeleryExecutor
from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
Expand Down Expand Up @@ -223,3 +224,14 @@ def test_kubernetes_executor_knows_its_queue(self):
assert k8s_executor_mock.kubernetes_queue == conf.get(
'celery_kubernetes_executor', 'kubernetes_queue'
)

def test_send_callback(self):
cel_exec = CeleryExecutor()
k8s_exec = KubernetesExecutor()
cel_k8s_exec = CeleryKubernetesExecutor(cel_exec, k8s_exec)
cel_k8s_exec.callback_sink = mock.MagicMock()

callback = CallbackRequest(full_filepath="fake")
cel_k8s_exec.send_callback(callback)

cel_k8s_exec.callback_sink.send.assert_called_once_with(callback)
12 changes: 12 additions & 0 deletions tests/executors/test_local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
from unittest import mock

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.local_executor import LocalExecutor
from airflow.executors.local_kubernetes_executor import LocalKubernetesExecutor
Expand Down Expand Up @@ -67,3 +68,14 @@ def test_kubernetes_executor_knows_its_queue(self):
LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)

assert k8s_executor_mock.kubernetes_queue == conf.get('local_kubernetes_executor', 'kubernetes_queue')

def test_send_callback(self):
local_executor_mock = mock.MagicMock()
k8s_executor_mock = mock.MagicMock()
local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
local_k8s_exec.callback_sink = mock.MagicMock()

callback = CallbackRequest(full_filepath="fake")
local_k8s_exec.send_callback(callback)

local_k8s_exec.callback_sink.send.assert_called_once_with(callback)