diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 6cf8474989124..747f8b024e667 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -409,8 +409,10 @@ def handle_pod_overlap( @staticmethod def _get_pod_identifying_label_string(labels) -> str: - filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'} - return ','.join(label_id + '=' + label for label_id, label in sorted(filtered_labels.items())) + label_strings = [ + f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number' + ] + return ','.join(label_strings) + ',already_checked!=True' @staticmethod def _try_numbers_match(context, pod) -> bool: @@ -516,6 +518,7 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po ) self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict())) + final_state = None try: launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds) final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) @@ -528,6 +531,8 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po if self.is_delete_operator_pod: self.log.debug("Deleting pod for task %s", self.task_id) launcher.delete_pod(self.pod) + elif final_state != State.SUCCESS: + self.patch_already_checked(self.pod) return final_state, remote_pod, result def patch_already_checked(self, pod: k8s.V1Pod): @@ -553,7 +558,8 @@ def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]: if self.log_events_on_failure: for event in launcher.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) - self.patch_already_checked(pod) + if not self.is_delete_operator_pod: + self.patch_already_checked(pod) raise AirflowException(f'Pod returned a failure: {final_state}') return final_state, remote_pod, result diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 47a20f8985882..7f4e53bc19b3a 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -658,3 +658,92 @@ def test_push_xcom_pod_info(self): pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace') assert pod_name and pod_name == pod.metadata.name assert pod_namespace and pod_namespace == pod.metadata.namespace + + def test_previous_pods_ignored_for_reattached(self): + """ + When looking for pods to possibly reattach to, + ignore pods from previous tries that were properly finished + """ + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + name="test", + task_id="task", + ) + self.run_pod(k) + self.client_mock.return_value.list_namespaced_pod.assert_called_once() + _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args + assert 'already_checked!=True' in kwargs['label_selector'] + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch( + "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" + ".KubernetesPodOperator.patch_already_checked" + ) + def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod): + """If we aren't deleting pods and have a failure, mark it so we don't reattach to it""" + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + name="test", + task_id="task", + is_delete_operator_pod=False, + ) + self.monitor_mock.return_value = (State.FAILED, None, None) + context = self.create_context(k) + with pytest.raises(AirflowException): + k.execute(context=context) + mock_patch_already_checked.assert_called_once() + mock_delete_pod.assert_not_called() + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch( + "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" + ".KubernetesPodOperator.patch_already_checked" + ) + def test_mark_created_pod_if_not_deleted_during_exception( + self, mock_patch_already_checked, mock_delete_pod + ): + """If we aren't deleting pods and have an exception, mark it so we don't reattach to it""" + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + name="test", + task_id="task", + is_delete_operator_pod=False, + ) + self.monitor_mock.side_effect = AirflowException("oops") + context = self.create_context(k) + with pytest.raises(AirflowException): + k.execute(context=context) + mock_patch_already_checked.assert_called_once() + mock_delete_pod.assert_not_called() + + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch( + "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" + ".KubernetesPodOperator.patch_already_checked" + ) + def test_mark_reattached_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod): + """If we aren't deleting pods and have a failure, mark it so we don't reattach to it""" + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + name="test", + task_id="task", + is_delete_operator_pod=False, + ) + # Run it first to easily get the pod + pod = self.run_pod(k) + + # Now try and "reattach" + mock_patch_already_checked.reset_mock() + mock_delete_pod.reset_mock() + self.client_mock.return_value.list_namespaced_pod.return_value.items = [pod] + self.monitor_mock.return_value = (State.FAILED, None, None) + + context = self.create_context(k) + with pytest.raises(AirflowException): + k.execute(context=context) + mock_patch_already_checked.assert_called_once() + mock_delete_pod.assert_not_called()