Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Test Coverage for Kubernetes Executor #15617

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 66 additions & 3 deletions tests/executors/test_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@

import pytest
from kubernetes.client import models as k8s
from kubernetes.client.rest import ApiException
from urllib3 import HTTPResponse

from airflow.utils import timezone
from tests.test_utils.config import conf_vars

try:
from kubernetes.client.rest import ApiException

from airflow.executors.kubernetes_executor import (
AirflowKubernetesScheduler,
KubernetesExecutor,
Expand Down Expand Up @@ -120,6 +119,71 @@ def test_execution_date_serialize_deserialize(self):

assert datetime_obj == new_datetime_obj

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_successfully(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace"

kube_client = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = kube_client
Dr-Denzy marked this conversation as resolved.
Show resolved Hide resolved

kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)

kube_client.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_raises_404(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace"

kube_client = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = kube_client

# ApiException is raised because status is not 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=400)
kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()

with pytest.raises(ApiException):
kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
kube_client.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())

@unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed')
@mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
@mock.patch('airflow.executors.kubernetes_executor.client')
@mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
def test_delete_pod_404_not_raised(
self, mock_watcher, mock_client, mock_kube_client
): # pylint: disable=unused-argument
pod_id = "my-pod-1"
namespace = "my-namespace"

kube_client = mock.MagicMock()
mock_kube_client.return_value.delete_namespaced_pod = kube_client

# ApiException not raised because the status is 404
mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=404)
kube_executor = KubernetesExecutor()
kube_executor.job_id = "test-job-id"
kube_executor.start()

kube_executor.kube_scheduler.delete_pod(pod_id, namespace)
kube_client.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions())


class TestKubernetesExecutor(unittest.TestCase):
"""
Expand Down Expand Up @@ -160,7 +224,6 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc
('kubernetes', 'pod_template_file'): path,
}
with conf_vars(config):

kubernetes_executor = self.kubernetes_executor
kubernetes_executor.start()
# Execute a task while the Api Throws errors
Expand Down