diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 3b4366dca98c0..411b621ce8c2c 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -400,6 +400,8 @@ def execute(self, context: 'Context'): pod_request_obj=self.pod_request_obj, context=context, ) + # get remote pod for use in cleanup methods + remote_pod = self.find_pod(self.pod.metadata.namespace, context=context) self.await_pod_start(pod=self.pod) if self.get_logs: @@ -438,7 +440,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) with _suppress(Exception): - self.process_pod_deletion(pod) + self.process_pod_deletion(remote_pod) error_message = get_container_termination_message(remote_pod, self.BASE_CONTAINER_NAME) error_message = "\n" + error_message if error_message else "" raise AirflowException( @@ -446,14 +448,15 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): ) else: with _suppress(Exception): - self.process_pod_deletion(pod) + self.process_pod_deletion(remote_pod) def process_pod_deletion(self, pod): - if self.is_delete_operator_pod: - self.log.info("Deleting pod: %s", pod.metadata.name) - self.pod_manager.delete_pod(pod) - else: - self.log.info("skipping deleting pod: %s", pod.metadata.name) + if pod is not None: + if self.is_delete_operator_pod: + self.log.info("Deleting pod: %s", pod.metadata.name) + self.pod_manager.delete_pod(pod) + else: + self.log.info("skipping deleting pod: %s", pod.metadata.name) def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str: labels = self._get_ti_pod_labels(context, include_try_number=False) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 83c3cead09a36..0010355aba03e 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -87,7 +87,7 @@ def get_container_termination_message(pod: V1Pod, container_name: str): container_statuses = pod.status.container_statuses container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) return container_status.state.terminated.message if container_status else None - except AttributeError: + except (AttributeError, TypeError): return None diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 49928274517ac..da615b37902dd 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -660,7 +660,7 @@ def test_envs_from_secrets(self, await_pod_completion_mock, create_pod): do_xcom_push=False, ) # THEN - await_pod_completion_mock.return_value = None + await_pod_completion_mock.side_effect = AirflowException context = create_context(k) with pytest.raises(AirflowException): k.execute(context) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index cac1b8a1b587a..b771361d1a679 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -266,7 +266,8 @@ def test_image_pull_policy_correctly_set(self): assert pod.spec.containers[0].image_pull_policy == "Always" @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") - def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod") + def test_pod_delete_even_on_launcher_error(self, find_pod_mock, delete_pod_mock): k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", @@ -286,6 +287,30 @@ def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): k.execute(context=context) assert delete_pod_mock.called + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod") + def test_pod_not_deleting_non_existing_pod(self, find_pod_mock, delete_pod_mock): + + find_pod_mock.return_value = None + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + cluster_context="default", + is_delete_operator_pod=True, + ) + self.create_mock.side_effect = AirflowException("fake failure") + with pytest.raises(AirflowException): + context = create_context(k) + k.execute(context=context) + delete_pod_mock.assert_not_called() + @pytest.mark.parametrize('randomize', [True, False]) def test_provided_pod_name(self, randomize): name_base = "test" @@ -790,7 +815,6 @@ def test_previous_pods_ignored_for_reattached(self): task_id="task", ) self.run_pod(k) - k.client.list_namespaced_pod.assert_called_once() _, kwargs = k.client.list_namespaced_pod.call_args assert 'already_checked!=True' in kwargs['label_selector']