Skip to content

Commit

Permalink
Fix KubernetesPodOperator reattach when not deleting pods (#18070)
Browse files Browse the repository at this point in the history
This change:

- ignores any pods already marked as complete instead of trying to
  reattach to them.
- properly marks all 'finished' unsuccessful pods that won't
  be deleted.

Combined, these allow `is_delete_operator_pod=False` and
`reattach_on_restart=True` to function together properly during retries.
  • Loading branch information
jedcunningham authored Sep 8, 2021
1 parent 13e7d4a commit b8d06e8
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 3 deletions.
12 changes: 9 additions & 3 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ def handle_pod_overlap(

@staticmethod
def _get_pod_identifying_label_string(labels) -> str:
filtered_labels = {label_id: label for label_id, label in labels.items() if label_id != 'try_number'}
return ','.join(label_id + '=' + label for label_id, label in sorted(filtered_labels.items()))
label_strings = [
f'{label_id}={label}' for label_id, label in sorted(labels.items()) if label_id != 'try_number'
]
return ','.join(label_strings) + ',already_checked!=True'

@staticmethod
def _try_numbers_match(context, pod) -> bool:
Expand Down Expand Up @@ -516,6 +518,7 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
)

self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
final_state = None
try:
launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds)
final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
Expand All @@ -528,6 +531,8 @@ def create_new_pod_for_operator(self, labels, launcher) -> Tuple[State, k8s.V1Po
if self.is_delete_operator_pod:
self.log.debug("Deleting pod for task %s", self.task_id)
launcher.delete_pod(self.pod)
elif final_state != State.SUCCESS:
self.patch_already_checked(self.pod)
return final_state, remote_pod, result

def patch_already_checked(self, pod: k8s.V1Pod):
Expand All @@ -553,7 +558,8 @@ def monitor_launched_pod(self, launcher, pod) -> Tuple[State, Optional[str]]:
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
self.patch_already_checked(pod)
if not self.is_delete_operator_pod:
self.patch_already_checked(pod)
raise AirflowException(f'Pod returned a failure: {final_state}')
return final_state, remote_pod, result

Expand Down
89 changes: 89 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,92 @@ def test_push_xcom_pod_info(self):
pod_namespace = ti.xcom_pull(task_ids=k.task_id, key='pod_namespace')
assert pod_name and pod_name == pod.metadata.name
assert pod_namespace and pod_namespace == pod.metadata.namespace

def test_previous_pods_ignored_for_reattached(self):
"""
When looking for pods to possibly reattach to,
ignore pods from previous tries that were properly finished
"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
)
self.run_pod(k)
self.client_mock.return_value.list_namespaced_pod.assert_called_once()
_, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args
assert 'already_checked!=True' in kwargs['label_selector']

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod):
"""If we aren't deleting pods and have a failure, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
self.monitor_mock.return_value = (State.FAILED, None, None)
context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_created_pod_if_not_deleted_during_exception(
self, mock_patch_already_checked, mock_delete_pod
):
"""If we aren't deleting pods and have an exception, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
self.monitor_mock.side_effect = AirflowException("oops")
context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod")
@mock.patch(
"airflow.providers.cncf.kubernetes.operators.kubernetes_pod"
".KubernetesPodOperator.patch_already_checked"
)
def test_mark_reattached_pod_if_not_deleted(self, mock_patch_already_checked, mock_delete_pod):
"""If we aren't deleting pods and have a failure, mark it so we don't reattach to it"""
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
name="test",
task_id="task",
is_delete_operator_pod=False,
)
# Run it first to easily get the pod
pod = self.run_pod(k)

# Now try and "reattach"
mock_patch_already_checked.reset_mock()
mock_delete_pod.reset_mock()
self.client_mock.return_value.list_namespaced_pod.return_value.items = [pod]
self.monitor_mock.return_value = (State.FAILED, None, None)

context = self.create_context(k)
with pytest.raises(AirflowException):
k.execute(context=context)
mock_patch_already_checked.assert_called_once()
mock_delete_pod.assert_not_called()

0 comments on commit b8d06e8

Please sign in to comment.