diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 704745e43365..b84a3ab4e5ef 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -592,7 +592,9 @@ def execute_sync(self, context: Context): container_logs=self.container_logs, follow_logs=True, ) - else: + if not self.get_logs or ( + self.container_logs is not True and self.base_container_name not in self.container_logs + ): self.pod_manager.await_container_completion( pod=self.pod, container_name=self.base_container_name ) diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index 3c1b752f7173..7e7123585b7f 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1232,6 +1232,46 @@ def test_task_skip_when_pod_exit_with_certain_code( with pytest.raises(expected_exc): self.run_pod(k) + @patch(f"{POD_MANAGER_CLASS}.extract_xcom") + @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start") + @patch(f"{POD_MANAGER_CLASS}.await_container_completion") + @patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs") + @patch(HOOK_CLASS) + def test_get_logs_but_not_for_base_container( + self, + hook_mock, + mock_fetch_log, + mock_await_container_completion, + mock_await_xcom_sidecar, + mock_extract_xcom, + ): + k = KubernetesPodOperator( + namespace="default", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels={"foo": "bar"}, + name="test", + task_id="task", + do_xcom_push=True, + container_logs=["some_init_container"], + get_logs=True, + ) + mock_extract_xcom.return_value = "{}" + remote_pod_mock = MagicMock() + remote_pod_mock.status.phase = "Succeeded" + self.await_pod_mock.return_value = remote_pod_mock + pod = self.run_pod(k) + + # check that the base container is not included in the logs + mock_fetch_log.assert_called_once_with( + pod=pod, container_logs=["some_init_container"], follow_logs=True + ) + # check that KPO waits for the base container to complete before proceeding to extract XCom + mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base") + # check that we wait for the xcom sidecar to start before extracting XCom + mock_await_xcom_sidecar.assert_called_once_with(pod=pod) + class TestSuppress: def test__suppress(self, caplog):