Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find Pod Before Cleanup In KubernetesPodOperator Execution #22092

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ def execute(self, context: 'Context'):
pod_request_obj=self.pod_request_obj,
context=context,
)
# finding remote pod to ensure it exists
dstandish marked this conversation as resolved.
Show resolved Hide resolved
remote_pod = self.find_pod(self.pod.metadata.namespace, context=context)
self.await_pod_start(pod=self.pod)

if self.get_logs:
Expand Down Expand Up @@ -437,16 +439,18 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
with _suppress(Exception):
for event in self.pod_manager.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason, event.message)
with _suppress(Exception):
self.process_pod_deletion(pod)
if remote_pod is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we care, but if the create succeeds but the find fails, we can leave the pod with this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it fair to assume that that if the pod find fails then the pod doesn’t exist and we don’t need to delete it?

with _suppress(Exception):
self.process_pod_deletion(pod)
michaelmicheal marked this conversation as resolved.
Show resolved Hide resolved
error_message = get_container_termination_message(remote_pod, self.BASE_CONTAINER_NAME)
error_message = "\n" + error_message if error_message else ""
raise AirflowException(
f'Pod {pod and pod.metadata.name} returned a failure:{error_message}\n{remote_pod}'
)
else:
with _suppress(Exception):
self.process_pod_deletion(pod)
if remote_pod is not None:
michaelmicheal marked this conversation as resolved.
Show resolved Hide resolved
with _suppress(Exception):
self.process_pod_deletion(pod)

def process_pod_deletion(self, pod):
if self.is_delete_operator_pod:
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def get_container_termination_message(pod: V1Pod, container_name: str):
container_statuses = pod.status.container_statuses
container_status = next(iter([x for x in container_statuses if x.name == container_name]), None)
return container_status.state.terminated.message if container_status else None
except AttributeError:
except (AttributeError, TypeError):
return None


Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def test_envs_from_secrets(self, await_pod_completion_mock, create_pod):
do_xcom_push=False,
)
# THEN
await_pod_completion_mock.return_value = None
await_pod_completion_mock.side_effect = AirflowException
context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context)
Expand Down
30 changes: 28 additions & 2 deletions tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,10 @@ def test_image_pull_policy_correctly_set(self):
assert pod.spec.containers[0].image_pull_policy == "Always"

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
def test_pod_delete_even_on_launcher_error(self, delete_pod_mock):
@mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod")
def test_pod_delete_even_on_launcher_error(self, find_pod_mock, delete_pod_mock):
remote_pod_mock = MagicMock()
find_pod_mock.return_value = remote_pod_mock
michaelmicheal marked this conversation as resolved.
Show resolved Hide resolved
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
Expand All @@ -286,6 +289,30 @@ def test_pod_delete_even_on_launcher_error(self, delete_pod_mock):
k.execute(context=context)
assert delete_pod_mock.called

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod")
@mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.find_pod")
def test_pod_not_deleting_non_existing_pod(self, find_pod_mock, delete_pod_mock):

find_pod_mock.return_value = None
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
cluster_context="default",
is_delete_operator_pod=True,
)
self.create_mock.side_effect = AirflowException("fake failure")
with pytest.raises(AirflowException):
context = create_context(k)
k.execute(context=context)
delete_pod_mock.assert_not_called()

@pytest.mark.parametrize('randomize', [True, False])
def test_provided_pod_name(self, randomize):
name_base = "test"
Expand Down Expand Up @@ -790,7 +817,6 @@ def test_previous_pods_ignored_for_reattached(self):
task_id="task",
)
self.run_pod(k)
k.client.list_namespaced_pod.assert_called_once()
_, kwargs = k.client.list_namespaced_pod.call_args
assert 'already_checked!=True' in kwargs['label_selector']

Expand Down