From 4b2b51e5e61e77659a5e996f283e75e9d7389760 Mon Sep 17 00:00:00 2001 From: Mark Andreev Date: Sat, 9 Nov 2024 14:29:10 +0000 Subject: [PATCH] Fix logs with leading spaces in the Docker operator (#33692) (#43840) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Python 3.11’s multi-line error arrows don’t display correctly in Airflow’s DockerOperator logs due to leading spaces being removed, making error messages hard to read. Before fix: return self.main(*args, **kwargs) ^^^^^^^^^^^^^^^^ After fix: return self.main(*args, **kwargs) ^^^^^^^^^^^^^^^^ Fixes: #33692 --- .../providers/docker/operators/docker.py | 21 ++++++++---- .../tests/docker/operators/test_docker.py | 32 ++++++++++++++++++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/providers/src/airflow/providers/docker/operators/docker.py b/providers/src/airflow/providers/docker/operators/docker.py index f9d3ea472ff1..01d07ffd9811 100644 --- a/providers/src/airflow/providers/docker/operators/docker.py +++ b/providers/src/airflow/providers/docker/operators/docker.py @@ -47,6 +47,8 @@ from airflow.utils.types import NOTSET, ArgNotSet if TYPE_CHECKING: + from logging import Logger + from docker import APIClient from docker.types import DeviceRequest @@ -62,6 +64,16 @@ def stringify(line: str | bytes): return line +def fetch_logs(log_stream, log: Logger): + log_lines = [] + for log_chunk in log_stream: + log_chunk = stringify(log_chunk).rstrip() + log_lines.append(log_chunk) + for log_chunk_line in log_chunk.split("\n"): + log.info("%s", log_chunk_line) + return log_lines + + class DockerOperator(BaseOperator): """ Execute a command inside a docker container. @@ -426,16 +438,11 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[ tty=self.tty, hostname=self.hostname, ) - logstream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True) + log_stream = self.cli.attach(container=self.container["Id"], stdout=True, stderr=True, stream=True) try: self.cli.start(self.container["Id"]) - log_lines = [] - for log_chunk in logstream: - log_chunk = stringify(log_chunk).strip() - log_lines.append(log_chunk) - for log_chunk_line in log_chunk.split("\n"): - self.log.info("%s", log_chunk_line) + log_lines = fetch_logs(log_stream, self.log) result = self.cli.wait(self.container["Id"]) if result["StatusCode"] in self.skip_on_exit_code: diff --git a/providers/tests/docker/operators/test_docker.py b/providers/tests/docker/operators/test_docker.py index 9dbc84657f4f..8919fc0962d6 100644 --- a/providers/tests/docker/operators/test_docker.py +++ b/providers/tests/docker/operators/test_docker.py @@ -28,7 +28,7 @@ from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException from airflow.providers.docker.exceptions import DockerContainerFailedException -from airflow.providers.docker.operators.docker import DockerOperator +from airflow.providers.docker.operators.docker import DockerOperator, fetch_logs from airflow.utils.task_instance_session import set_current_task_instance_session TEST_CONN_ID = "docker_test_connection" @@ -865,3 +865,33 @@ def test_partial_deprecated_skip_exit_code_ambiguous( pytest.raises(ValueError, match="Conflicting `skip_on_exit_code` provided"), ): ti.render_templates() + + @pytest.mark.parametrize( + "log_lines, expected_lines", + [ + pytest.param( + [ + "return self.main(*args, **kwargs)", + " ^^^^^^^^^^^^^^^^", + ], + [ + "return self.main(*args, **kwargs)", + " ^^^^^^^^^^^^^^^^", + ], + id="should-not-remove-leading-spaces", + ), + pytest.param( + [ + " ^^^^^^^^^^^^^^^^ ", + ], + [ + " ^^^^^^^^^^^^^^^^", + ], + id="should-remove-trailing-spaces", + ), + ], + ) + @mock.patch("logging.Logger") + def test_fetch_logs(self, logger_mock, log_lines, expected_lines): + fetch_logs(log_lines, logger_mock) + assert logger_mock.info.call_args_list == [call("%s", line) for line in expected_lines]