Skip to content

Commit

Permalink
Cleanup Docker operator logging (#33914)
Browse files Browse the repository at this point in the history
]

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
wolfdn and uranusjr authored Sep 4, 2023
1 parent a6a7a89 commit 3d27504
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 8 deletions.
45 changes: 45 additions & 0 deletions airflow/providers/docker/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Exceptions used by Docker provider."""
from __future__ import annotations

from airflow.exceptions import AirflowException, AirflowSkipException


class DockerContainerFailedException(AirflowException):
"""
Raised when a Docker container returns an error.
:param logs: The log output of the failed Docker container
"""

def __init__(self, message: str | None = None, logs: list[str | bytes] | None = None) -> None:
super().__init__(message)
self.logs = logs


class DockerContainerFailedSkipException(AirflowSkipException):
"""
Raised when a Docker container returns an error and task should be skipped.
:param logs: The log output of the failed Docker container
"""

def __init__(self, message: str | None = None, logs: list[str | bytes] | None = None) -> None:
super().__init__(message)
self.logs = logs
13 changes: 8 additions & 5 deletions airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
from docker.types import LogConfig, Mount
from dotenv import dotenv_values

from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.docker.exceptions import (
DockerContainerFailedException,
DockerContainerFailedSkipException,
)
from airflow.providers.docker.hooks.docker import DockerHook

if TYPE_CHECKING:
Expand Down Expand Up @@ -403,12 +407,11 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> list[

result = self.cli.wait(self.container["Id"])
if result["StatusCode"] in self.skip_on_exit_code:
raise AirflowSkipException(
f"Docker container returned exit code {self.skip_on_exit_code}. Skipping."
raise DockerContainerFailedSkipException(
f"Docker container returned exit code {self.skip_on_exit_code}. Skipping.", logs=log_lines
)
elif result["StatusCode"] != 0:
joined_log_lines = "\n".join(log_lines)
raise AirflowException(f"Docker container failed: {result!r} lines {joined_log_lines}")
raise DockerContainerFailedException(f"Docker container failed: {result!r}", logs=log_lines)

if self.retrieve_output:
return self._attempt_to_retrieve_result()
Expand Down
7 changes: 4 additions & 3 deletions tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from docker.types import DeviceRequest, LogConfig, Mount

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.providers.docker.exceptions import DockerContainerFailedException
from airflow.providers.docker.operators.docker import DockerOperator

TEST_CONN_ID = "docker_test_connection"
Expand Down Expand Up @@ -553,19 +554,19 @@ def test_skip(self, extra_kwargs, actual_exit_code, expected_exc):
def test_execute_container_fails(self):
failed_msg = {"StatusCode": 1}
log_line = ["unicode container log 😁 ", b"byte string container log"]
expected_message = "Docker container failed: {failed_msg} lines {expected_log_output}"
expected_message = "Docker container failed: {failed_msg}"
self.client_mock.attach.return_value = log_line
self.client_mock.wait.return_value = failed_msg

operator = DockerOperator(image="ubuntu", owner="unittest", task_id="unittest")

with pytest.raises(AirflowException) as raised_exception:
with pytest.raises(DockerContainerFailedException) as raised_exception:
operator.execute(None)

assert str(raised_exception.value) == expected_message.format(
failed_msg=failed_msg,
expected_log_output=f'{log_line[0].strip()}\n{log_line[1].decode("utf-8")}',
)
assert raised_exception.value.logs == [log_line[0].strip(), log_line[1].decode("utf-8")]

def test_auto_remove_container_fails(self):
self.client_mock.wait.return_value = {"StatusCode": 1}
Expand Down

0 comments on commit 3d27504

Please sign in to comment.