Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix KubernetesPodOperator reattach when not deleting pods #18070

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ def handle_pod_overlap(

@staticmethod
def _get_pod_identifying_label_string(labels) -> str:
filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
return ','.join(label_id + '=' + label for label_id, label in sorted(filtered_labels.items()))
label_strings = [
f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
]
return ','.join(label_strings) + ',already_checked!=True'

@staticmethod
def _try_numbers_match(context, pod) -> bool:
Expand Down Expand Up @@ -516,6 +518,7 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
)

self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
final_state = None
try:
launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
Expand All @@ -528,6 +531,8 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
if self.is_delete_operator_pod:
self.log.debug("Deleting pod for task %s", self.task_id)
launcher.delete_pod(self.pod)
elif final_state != State.SUCCESS:
self.patch_already_checked(self.pod)
return final_state, remote_pod, result

def patch_already_checked(self, pod: k8s.V1Pod):
Expand All @@ -553,7 +558,8 @@ def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
self.patch_already_checked(pod)
if not self.is_delete_operator_pod:
self.patch_already_checked(pod)
raise AirflowException(f'Pod returned a failure: {final_state}')
return final_state, remote_pod, result

Expand Down
89 changes: 89 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,92 @@ def test_push_xcom_pod_info(self):
pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace')
assert pod_name and pod_name == pod.metadata.name
assert pod_namespace and pod_namespace == pod.metadata.namespace

def test_previous_pods_ignored_for_reattached(self):
"""
When looking for pods to possibly reattach to,
ignore pods from previous tries that were properly finished
"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
)
self.run_pod(k)
self.client_mock.return_value.list_namespaced_pod.assert_called_once()
_, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
assert 'already_checked!=True' in kwargs['label_selector']

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod):
"""If we aren't deleting pods and have a failure, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
self.monitor_mock.return_value = (State.FAILED, None, None)
context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_created_pod_if_not_deleted_during_exception(
self, mock_patch_already_checked, mock_delete_pod
):
"""If we aren't deleting pods and have an exception, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
self.monitor_mock.side_effect = AirflowException("oops")
context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_reattached_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod):
"""If we aren't deleting pods and have a failure, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
# Run it first to easily get the pod
pod = self.run_pod(k)

# Now try and "reattach"
mock_patch_already_checked.reset_mock()
mock_delete_pod.reset_mock()
self.client_mock.return_value.list_namespaced_pod.return_value.items = [pod]
self.monitor_mock.return_value = (State.FAILED, None, None)

context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()