From f3399a0607a6c9ca23a9b5d40b5b8efdb857af5f Mon Sep 17 00:00:00 2001 From: Michael Petro Date: Thu, 2 Jun 2022 16:17:57 -0400 Subject: [PATCH 1/4] Deleting pod in cleanup if remote pod is found --- .../kubernetes/operators/kubernetes_pod.py | 12 +++++--- .../test_kubernetes_pod_operator.py | 2 +- .../operators/test_kubernetes_pod.py | 30 +++++++++++++++++-- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 3b4366dca98c0..31b9fbe39b4ce 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -400,6 +400,8 @@ def execute(self, context: 'Context'): pod_request_obj=self.pod_request_obj, context=context, ) + # finding remote pod to ensure it exists + remote_pod = self.find_pod(self.pod.metadata.namespace, context=context) self.await_pod_start(pod=self.pod) if self.get_logs: @@ -437,16 +439,18 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) - with _suppress(Exception): - self.process_pod_deletion(pod) + if remote_pod is not None: + with _suppress(Exception): + self.process_pod_deletion(pod) error_message = get_container_termination_message(remote_pod, self.BASE_CONTAINER_NAME) error_message = "\n" + error_message if error_message else "" raise AirflowException( f'Pod {pod and pod.metadata.name} returned a failure:{error_message}\n{remote_pod}' ) else: - with _suppress(Exception): - self.process_pod_deletion(pod) + if remote_pod is not None: + with _suppress(Exception): + self.process_pod_deletion(pod) def process_pod_deletion(self, pod): if self.is_delete_operator_pod: diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 49928274517ac..da615b37902dd 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -660,7 +660,7 @@ def test_envs_from_secrets(self, await_pod_completion_mock, create_pod): do_xcom_push=False, ) # THEN - await_pod_completion_mock.return_value = None + await_pod_completion_mock.side_effect = AirflowException context = create_context(k) with pytest.raises(AirflowException): k.execute(context) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index cac1b8a1b587a..ed6f726dcda23 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -266,7 +266,10 @@ def test_image_pull_policy_correctly_set(self): assert pod.spec.containers[0].image_pull_policy == "Always" @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") - def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod") + def test_pod_delete_even_on_launcher_error(self, find_pod_mock, delete_pod_mock): + remote_pod_mock = MagicMock() + find_pod_mock.return_value = remote_pod_mock k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", @@ -286,6 +289,30 @@ def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): k.execute(context=context) assert delete_pod_mock.called + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod") + def test_pod_not_deleting_non_existing_pod(self, find_pod_mock, delete_pod_mock): + + find_pod_mock.return_value = None + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + in_cluster=False, + do_xcom_push=False, + cluster_context="default", + is_delete_operator_pod=True, + ) + self.create_mock.side_effect = AirflowException("fake failure") + with pytest.raises(AirflowException): + context = create_context(k) + k.execute(context=context) + delete_pod_mock.assert_not_called() + @pytest.mark.parametrize('randomize', [True, False]) def test_provided_pod_name(self, randomize): name_base = "test" @@ -790,7 +817,6 @@ def test_previous_pods_ignored_for_reattached(self): task_id="task", ) self.run_pod(k) - k.client.list_namespaced_pod.assert_called_once() _, kwargs = k.client.list_namespaced_pod.call_args assert 'already_checked!=True' in kwargs['label_selector'] From 8856aa822015d198d91d8195b707c5289cd883dc Mon Sep 17 00:00:00 2001 From: Michael Petro Date: Thu, 2 Jun 2022 18:42:06 -0400 Subject: [PATCH 2/4] Adding type error catch in get_container_termination_message --- airflow/providers/cncf/kubernetes/utils/pod_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 83c3cead09a36..0010355aba03e 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -87,7 +87,7 @@ def get_container_termination_message(pod: V1Pod, container_name: str): container_statuses = pod.status.container_statuses container_status = next(iter([x for x in container_statuses if x.name == container_name]), None) return container_status.state.terminated.message if container_status else None - except AttributeError: + except (AttributeError, TypeError): return None From 97f15b858b832c9683f34db1848f5296ffe1bd1b Mon Sep 17 00:00:00 2001 From: Michael Petro Date: Thu, 16 Jun 2022 10:45:48 -0400 Subject: [PATCH 3/4] Passing remote pod to process_pod_deletion and remove redundant mock --- .../kubernetes/operators/kubernetes_pod.py | 21 +++++++++---------- .../operators/test_kubernetes_pod.py | 2 -- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 31b9fbe39b4ce..0970445560a8c 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -439,25 +439,24 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): with _suppress(Exception): for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) - if remote_pod is not None: - with _suppress(Exception): - self.process_pod_deletion(pod) + with _suppress(Exception): + self.process_pod_deletion(remote_pod) error_message = get_container_termination_message(remote_pod, self.BASE_CONTAINER_NAME) error_message = "\n" + error_message if error_message else "" raise AirflowException( f'Pod {pod and pod.metadata.name} returned a failure:{error_message}\n{remote_pod}' ) else: - if remote_pod is not None: - with _suppress(Exception): - self.process_pod_deletion(pod) + with _suppress(Exception): + self.process_pod_deletion(remote_pod) def process_pod_deletion(self, pod): - if self.is_delete_operator_pod: - self.log.info("Deleting pod: %s", pod.metadata.name) - self.pod_manager.delete_pod(pod) - else: - self.log.info("skipping deleting pod: %s", pod.metadata.name) + if pod is not None: + if self.is_delete_operator_pod: + self.log.info("Deleting pod: %s", pod.metadata.name) + self.pod_manager.delete_pod(pod) + else: + self.log.info("skipping deleting pod: %s", pod.metadata.name) def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str: labels = self._get_ti_pod_labels(context, include_try_number=False) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index ed6f726dcda23..b771361d1a679 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -268,8 +268,6 @@ def test_image_pull_policy_correctly_set(self): @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod") def test_pod_delete_even_on_launcher_error(self, find_pod_mock, delete_pod_mock): - remote_pod_mock = MagicMock() - find_pod_mock.return_value = remote_pod_mock k = KubernetesPodOperator( namespace="default", image="ubuntu:16.04", From 3f747cf721718df31efc639e9597f8ae38c37bc6 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 16 Jun 2022 08:52:38 -0700 Subject: [PATCH 4/4] Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py --- airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 0970445560a8c..411b621ce8c2c 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -400,7 +400,7 @@ def execute(self, context: 'Context'): pod_request_obj=self.pod_request_obj, context=context, ) - # finding remote pod to ensure it exists + # get remote pod for use in cleanup methods remote_pod = self.find_pod(self.pod.metadata.namespace, context=context) self.await_pod_start(pod=self.pod)