diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index e449b96eaac4b..57b5af3dd81e1 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -25,14 +25,13 @@ import pytest from kubernetes.client import models as k8s +from kubernetes.client.rest import ApiException from urllib3 import HTTPResponse from airflow.utils import timezone from tests.test_utils.config import conf_vars try: - from kubernetes.client.rest import ApiException - from airflow.executors.kubernetes_executor import ( AirflowKubernetesScheduler, KubernetesExecutor, @@ -120,6 +119,71 @@ def test_execution_date_serialize_deserialize(self): assert datetime_obj == new_datetime_obj + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + @mock.patch('airflow.executors.kubernetes_executor.client') + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + def test_delete_pod_successfully( + self, mock_watcher, mock_client, mock_kube_client + ): # pylint: disable=unused-argument + pod_id = "my-pod-1" + namespace = "my-namespace-1" + + mock_delete_namespace = mock.MagicMock() + mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace + + kube_executor = KubernetesExecutor() + kube_executor.job_id = "test-job-id" + kube_executor.start() + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + @mock.patch('airflow.executors.kubernetes_executor.client') + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + def test_delete_pod_raises_404( + self, mock_watcher, mock_client, mock_kube_client + ): # pylint: disable=unused-argument + pod_id = "my-pod-1" + namespace = "my-namespace-2" + + mock_delete_namespace = mock.MagicMock() + mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace + + # ApiException is raised because status is not 404 + mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=400) + kube_executor = KubernetesExecutor() + kube_executor.job_id = "test-job-id" + kube_executor.start() + + with pytest.raises(ApiException): + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + + @unittest.skipIf(AirflowKubernetesScheduler is None, 'kubernetes python package is not installed') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + @mock.patch('airflow.executors.kubernetes_executor.client') + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + def test_delete_pod_404_not_raised( + self, mock_watcher, mock_client, mock_kube_client + ): # pylint: disable=unused-argument + pod_id = "my-pod-1" + namespace = "my-namespace-3" + + mock_delete_namespace = mock.MagicMock() + mock_kube_client.return_value.delete_namespaced_pod = mock_delete_namespace + + # ApiException not raised because the status is 404 + mock_kube_client.return_value.delete_namespaced_pod.side_effect = ApiException(status=404) + kube_executor = KubernetesExecutor() + kube_executor.job_id = "test-job-id" + kube_executor.start() + + kube_executor.kube_scheduler.delete_pod(pod_id, namespace) + mock_delete_namespace.assert_called_with(pod_id, namespace, body=mock_client.V1DeleteOptions()) + class TestKubernetesExecutor(unittest.TestCase): """ @@ -160,7 +224,6 @@ def test_run_next_exception(self, mock_get_kube_client, mock_kubernetes_job_watc ('kubernetes', 'pod_template_file'): path, } with conf_vars(config): - kubernetes_executor = self.kubernetes_executor kubernetes_executor.start() # Execute a task while the Api Throws errors