Skip to content

Commit

Permalink
Fix waiting the base container when reading the logs of other containers
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala committed Aug 3, 2023
1 parent 1c7472d commit ca12377
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
5 changes: 1 addition & 4 deletions airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,10 +592,7 @@ def execute_sync(self, context: Context):
container_logs=self.container_logs,
follow_logs=True,
)
else:
self.pod_manager.await_container_completion(
pod=self.pod, container_name=self.base_container_name
)
self.pod_manager.await_container_completion(pod=self.pod, container_name=self.base_container_name)

if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
Expand Down
40 changes: 40 additions & 0 deletions tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1232,6 +1232,46 @@ def test_task_skip_when_pod_exit_with_certain_code(
with pytest.raises(expected_exc):
self.run_pod(k)

@patch(f"{POD_MANAGER_CLASS}.extract_xcom")
@patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
@patch(f"{POD_MANAGER_CLASS}.await_container_completion")
@patch(f"{POD_MANAGER_CLASS}.fetch_requested_container_logs")
@patch(HOOK_CLASS)
def test_get_logs_but_not_for_base_container(
self,
hook_mock,
mock_fetch_log,
mock_await_container_completion,
mock_await_xcom_sidecar,
mock_extract_xcom,
):
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
do_xcom_push=True,
container_logs=["some_init_container"],
get_logs=True,
)
mock_extract_xcom.return_value = "{}"
remote_pod_mock = MagicMock()
remote_pod_mock.status.phase = "Succeeded"
self.await_pod_mock.return_value = remote_pod_mock
pod = self.run_pod(k)

# check that the base container is not included in the logs
mock_fetch_log.assert_called_once_with(
pod=pod, container_logs=["some_init_container"], follow_logs=True
)
# check that KPO waits for the base container to complete before proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base")
# check that we wait for the xcom sidecar to start before extracting XCom
mock_await_xcom_sidecar.assert_called_once_with(pod=pod)


class TestSuppress:
def test__suppress(self, caplog):
Expand Down

0 comments on commit ca12377

Please sign in to comment.