From a5b068665eebe8a07ad222b4f25b75ec688f1ab3 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 01:28:37 +0100 Subject: [PATCH 01/19] add the new conf to s3 handler to delete local log when it is true and log is successfully uploaded --- .../amazon/aws/log/s3_task_handler.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 098f17a28a95c..bbfeaaedbcd2b 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -19,6 +19,7 @@ import os import pathlib +import shutil from airflow.compat.functools import cached_property from airflow.configuration import conf @@ -36,9 +37,16 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): trigger_should_wrap = True - def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None): + def __init__( + self, + base_log_folder: str, + s3_log_folder: str, + filename_template: str | None = None, + delete_local_copy: bool = False, + ): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder + self.delete_local_copy = delete_local_copy self.log_relative_path = "" self._hook = None self.closed = False @@ -84,7 +92,9 @@ def close(self): if os.path.exists(local_loc): # read log and remove old logs to get just the latest additions log = pathlib.Path(local_loc).read_text() - self.s3_write(log, remote_loc) + write_to_s3 = self.s3_write(log, remote_loc) + if write_to_s3 and self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) # Mark closed so we don't double write if close is called twice self.closed = True @@ -164,16 +174,17 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str: return msg return "" - def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1): + def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool: """ - Writes the log to the remote_log_location. Fails silently if no hook - was created. + Writes the log to the remote_log_location and return `True` when done. Fails silently + and return `False` if no log was created. :param log: the log to write to the remote_log_location :param remote_log_location: the log's location in remote storage :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. :param max_retry: Maximum number of times to retry on upload failure + :return: whether the log is successfully written to remote location or not. """ try: if append and self.s3_log_exists(remote_log_location): @@ -181,6 +192,7 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_ log = "\n".join([old_log, log]) if old_log else log except Exception: self.log.exception("Could not verify previous log to append") + return False # Default to a single retry attempt because s3 upload failures are # rare but occasionally occur. Multiple retry attempts are unlikely @@ -199,3 +211,5 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_ self.log.warning("Failed attempt to write logs to %s, will retry", remote_log_location) else: self.log.exception("Could not write logs to %s", remote_log_location) + return False + return True From 4941b027ba7e3e9284b2a47efcc45c29a179a3cd Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 01:33:21 +0100 Subject: [PATCH 02/19] add the new conf to GCS handler to delete local log when it is true and log is successfully uploaded --- .../google/cloud/log/gcs_task_handler.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 4523cddc5f442..9c472d3d1837f 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -19,6 +19,7 @@ import logging import os +import shutil from pathlib import Path from typing import Collection @@ -63,6 +64,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): :param gcp_scopes: Comma-separated string containing OAuth2 scopes :param project_id: Project ID to read the secrets from. If not passed, the project ID from credentials will be used. + :param delete_local_copy: Whether local log files should be deleted after they are downloaded when using + remote logging """ trigger_should_wrap = True @@ -77,6 +80,7 @@ def __init__( gcp_keyfile_dict: dict | None = None, gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS, project_id: str | None = None, + delete_local_copy: bool = False, ): super().__init__(base_log_folder, filename_template) self.remote_base = gcs_log_folder @@ -87,6 +91,7 @@ def __init__( self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id + self.delete_local_copy = delete_local_copy @cached_property def hook(self) -> GCSHook | None: @@ -147,7 +152,9 @@ def close(self): # read log and remove old logs to get just the latest additions with open(local_loc) as logfile: log = logfile.read() - self.gcs_write(log, remote_loc) + gcs_write = self.gcs_write(log, remote_loc) + if gcs_write and self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) # Mark closed so we don't double write if close is called twice self.closed = True @@ -207,13 +214,14 @@ def _read(self, ti, try_number, metadata=None): return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True} - def gcs_write(self, log, remote_log_location): + def gcs_write(self, log, remote_log_location) -> bool: """ - Writes the log to the remote_log_location. Fails silently if no log - was created. + Writes the log to the remote_log_location and return `True` when done. Fails silently + and return `False` if no log was created. :param log: the log to write to the remote_log_location :param remote_log_location: the log's location in remote storage + :return: whether the log is successfully written to remote location or not. """ try: blob = storage.Blob.from_string(remote_log_location, self.client) @@ -232,6 +240,8 @@ def gcs_write(self, log, remote_log_location): blob.upload_from_string(log, content_type="text/plain") except Exception as e: self.log.error("Could not write logs to %s: %s", remote_log_location, e) + return False + return True @staticmethod def no_log_found(exc): From 5dd3dae94f7225f21021ec06fac9383a850a61c2 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 01:40:17 +0100 Subject: [PATCH 03/19] add the new conf to OSS handler to delete local log when it is true and log is successfully uploaded --- .../alibaba/cloud/log/oss_task_handler.py | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index c443b4e014392..7af100857142d 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -20,6 +20,7 @@ import contextlib import os import pathlib +import shutil from airflow.compat.functools import cached_property from airflow.configuration import conf @@ -35,10 +36,17 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from OSS remote storage. """ - def __init__(self, base_log_folder, oss_log_folder, filename_template=None): + def __init__( + self, + base_log_folder, + oss_log_folder, + filename_template=None, + delete_local_copy: bool = False, + ): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) + self.delete_local_copy = delete_local_copy self.log_relative_path = "" self._hook = None self.closed = False @@ -92,7 +100,9 @@ def close(self): if os.path.exists(local_loc): # read log and remove old logs to get just the latest additions log = pathlib.Path(local_loc).read_text() - self.oss_write(log, remote_loc) + oss_write = self.oss_write(log, remote_loc) + if oss_write and self.delete_local_copy: + shutil.rmtree(os.path.dirname(local_loc)) # Mark closed so we don't double write if close is called twice self.closed = True @@ -154,15 +164,16 @@ def oss_read(self, remote_log_location, return_error=False): if return_error: return msg - def oss_write(self, log, remote_log_location, append=True): + def oss_write(self, log, remote_log_location, append=True) -> bool: """ - Writes the log to the remote_log_location. Fails silently if no hook - was created. + Writes the log to the remote_log_location and return `True` when done. Fails silently + and return `False` if no log was created. :param log: the log to write to the remote_log_location :param remote_log_location: the log's location in remote storage :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. + :return: whether the log is successfully written to remote location or not. """ oss_remote_log_location = f"{self.base_folder}/{remote_log_location}" pos = 0 @@ -180,3 +191,5 @@ def oss_write(self, log, remote_log_location, append=True): str(pos), str(append), ) + return False + return True From b06a4e1dc1f5e06f5d3d7818153549e7101b1568 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 02:03:57 +0100 Subject: [PATCH 04/19] add unit test for s3 task handler --- .../amazon/aws/log/test_s3_task_handler.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index aeca09d36d6a7..ea734deba3c4d 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -236,3 +236,18 @@ def test_close_no_upload(self): with pytest.raises(ClientError): boto3.resource("s3").Object("bucket", self.remote_log_key).get() + + @pytest.mark.parametrize( + "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + ) + def test_close_with_delete_local_copy_conf(self, delete_local_copy, expected_existence_of_local_copy): + handler = S3TaskHandler( + self.local_log_location, self.remote_log_base, delete_local_copy=delete_local_copy + ) + + handler.log.info("test") + handler.set_context(self.ti) + assert handler.upload_on_close + + handler.close() + assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy From c77ca648455b885032707b4483bb59538a7a6f40 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 02:04:32 +0100 Subject: [PATCH 05/19] add unit test for gcs task handler --- .../google/cloud/log/test_gcs_task_handler.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 690ae9dc335e4..da7fe74133ec9 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -18,6 +18,7 @@ import copy import logging +import os import tempfile from unittest import mock from unittest.mock import MagicMock @@ -246,3 +247,36 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli ], any_order=False, ) + + @pytest.mark.parametrize( + "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + ) + @mock.patch( + "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", + return_value=("TEST_CREDENTIALS", "TEST_PROJECT_ID"), + ) + @mock.patch("google.cloud.storage.Client") + @mock.patch("google.cloud.storage.Blob") + def test_close_with_delete_local_copy_conf( + self, + mock_blob, + mock_client, + mock_creds, + local_log_location, + delete_local_copy, + expected_existence_of_local_copy, + ): + mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" + + handler = GCSTaskHandler( + base_log_folder=local_log_location, + gcs_log_folder="gs://bucket/remote/log/location", + delete_local_copy=delete_local_copy, + ) + + handler.log.info("test") + handler.set_context(self.ti) + assert handler.upload_on_close + + handler.close() + assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy From d8086b5ceb8e6cc11565c8c45f50d55ae627d849 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 02:26:31 +0100 Subject: [PATCH 06/19] add unit test for oss task handler --- .../cloud/log/test_oss_task_handler.py | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py index 2cf999849143f..983c0e2004de3 100644 --- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py +++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py @@ -17,10 +17,16 @@ # under the License. from __future__ import annotations +import os from unittest import mock from unittest.mock import PropertyMock +import pytest + from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler +from airflow.utils.state import TaskInstanceState +from airflow.utils.timezone import datetime +from tests.test_utils.db import clear_db_dags, clear_db_runs OSS_TASK_HANDLER_STRING = "airflow.providers.alibaba.cloud.log.oss_task_handler.{}" MOCK_OSS_CONN_ID = "mock_id" @@ -37,6 +43,20 @@ def setup_method(self): self.oss_log_folder = f"oss://{MOCK_BUCKET_NAME}/airflow/logs" self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder) + @pytest.fixture(autouse=True) + def task_instance(self, create_task_instance): + self.ti = ti = create_task_instance( + dag_id="dag_for_testing_oss_task_handler", + task_id="task_for_testing_oss_task_handler", + execution_date=datetime(2020, 1, 1), + state=TaskInstanceState.RUNNING, + ) + ti.try_number = 1 + ti.raw = False + yield + clear_db_runs() + clear_db_dags() + @mock.patch(OSS_TASK_HANDLER_STRING.format("conf.get")) @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSHook")) def test_hook(self, mock_service, mock_conf_get): @@ -130,3 +150,20 @@ def test_oss_write_into_remote_non_existing_file_not_via_append(self, mock_servi mock_service.return_value.append_string.assert_called_once_with( MOCK_BUCKET_NAME, MOCK_CONTENT, "airflow/logs/1.log", 0 ) + + @pytest.mark.parametrize( + "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + ) + @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.hook"), new_callable=PropertyMock) + def test_close_with_delete_local_copy_conf( + self, mock_service, tmp_path_factory, delete_local_copy, expected_existence_of_local_copy + ): + local_log_path = str(tmp_path_factory.mktemp("local-oss-log-location")) + handler = OSSTaskHandler(local_log_path, self.oss_log_folder, delete_local_copy=delete_local_copy) + + handler.log.info("test") + handler.set_context(self.ti) + assert handler.upload_on_close + + handler.close() + assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy From ad9a99a809ccceef7352c7acc76fdd93b7abf175 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 15:40:04 +0100 Subject: [PATCH 07/19] add a new config and use it as default value for --- airflow/config_templates/airflow_local_settings.py | 1 - airflow/config_templates/config.yml | 8 ++++++++ airflow/config_templates/default_airflow.cfg | 4 ++++ .../providers/alibaba/cloud/log/oss_task_handler.py | 10 +++++++++- airflow/providers/amazon/aws/log/s3_task_handler.py | 10 +++++++++- airflow/providers/google/cloud/log/gcs_task_handler.py | 10 +++++++++- .../providers/microsoft/azure/log/wasb_task_handler.py | 9 ++++++++- 7 files changed, 47 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 39d809e5167ed..f6a2cdbf64156 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -252,7 +252,6 @@ "wasb_log_folder": REMOTE_BASE_LOG_FOLDER, "wasb_container": "airflow-logs", "filename_template": FILENAME_TEMPLATE, - "delete_local_copy": False, }, } diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index eed20e351c9a0..9230b7227c9ba 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -598,6 +598,14 @@ logging: type: string example: ~ default: "" + delete_local_logs: + description: | + Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after + they are uploaded to remote location + version_added: 2.6.0 + type: string + example: ~ + default: "False" google_key_path: description: | Path to Google Credential JSON file. If omitted, authorization based on `the Application Default diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a4b03c8764dfe..8f2ddfb30e033 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -340,6 +340,10 @@ remote_logging = False # reading logs, not writing them. remote_log_conn_id = +# Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after +# they are uploaded to remote location +delete_local_logs = False + # Path to Google Credential JSON file. If omitted, authorization based on `the Application Default # Credentials # `__ will diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index 7af100857142d..13e1ca84b28bc 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -22,11 +22,19 @@ import pathlib import shutil +from packaging.version import Version + from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.providers.alibaba.cloud.hooks.oss import OSSHook from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.version import version + +if Version(version) < Version("2.6"): + DEFAULT_DELETE_LOCAL_COPY = False +else: + DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") class OSSTaskHandler(FileTaskHandler, LoggingMixin): @@ -41,7 +49,7 @@ def __init__( base_log_folder, oss_log_folder, filename_template=None, - delete_local_copy: bool = False, + delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, ): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index bbfeaaedbcd2b..20ad0d9f988fd 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -21,11 +21,19 @@ import pathlib import shutil +from packaging.version import Version + from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.version import version + +if Version(version) < Version("2.6"): + DEFAULT_DELETE_LOCAL_COPY = False +else: + DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") class S3TaskHandler(FileTaskHandler, LoggingMixin): @@ -42,7 +50,7 @@ def __init__( base_log_folder: str, s3_log_folder: str, filename_template: str | None = None, - delete_local_copy: bool = False, + delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, ): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 9c472d3d1837f..d55dce1019ac7 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -25,6 +25,7 @@ # not sure why but mypy complains on missing `storage` but it is clearly there and is importable from google.cloud import storage # type: ignore[attr-defined] +from packaging.version import Version from airflow.compat.functools import cached_property from airflow.configuration import conf @@ -34,6 +35,7 @@ from airflow.providers.google.common.consts import CLIENT_INFO from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.version import version _DEFAULT_SCOPESS = frozenset( [ @@ -44,6 +46,12 @@ logger = logging.getLogger(__name__) +if Version(version) < Version("2.6"): + DEFAULT_DELETE_LOCAL_COPY = False +else: + DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") + + class GCSTaskHandler(FileTaskHandler, LoggingMixin): """ GCSTaskHandler is a python log handler that handles and reads @@ -80,7 +88,7 @@ def __init__( gcp_keyfile_dict: dict | None = None, gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS, project_id: str | None = None, - delete_local_copy: bool = False, + delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, ): super().__init__(base_log_folder, filename_template) self.remote_base = gcs_log_folder diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 52af3171c2d6c..f0f1e9f34d4be 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -23,11 +23,18 @@ from typing import TYPE_CHECKING, Any from azure.core.exceptions import HttpResponseError +from packaging.version import Version from airflow.compat.functools import cached_property from airflow.configuration import conf from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.version import version + +if Version(version) < Version("2.6"): + DEFAULT_DELETE_LOCAL_COPY = False +else: + DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") class WasbTaskHandler(FileTaskHandler, LoggingMixin): @@ -44,8 +51,8 @@ def __init__( base_log_folder: str, wasb_log_folder: str, wasb_container: str, - delete_local_copy: str, *, + delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, filename_template: str | None = None, ) -> None: super().__init__(base_log_folder, filename_template) From e7153b0668c6945e8d0f0f6ff5e623b8b8755cd9 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 19:49:53 +0100 Subject: [PATCH 08/19] improve gcs task handler tests --- .../providers/google/cloud/log/test_gcs_task_handler.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index da7fe74133ec9..5f491aa225079 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -19,7 +19,6 @@ import copy import logging import os -import tempfile from unittest import mock from unittest.mock import MagicMock @@ -49,10 +48,8 @@ def task_instance(self, create_task_instance): clear_db_dags() @pytest.fixture(autouse=True) - def local_log_location(self): - with tempfile.TemporaryDirectory() as td: - self.local_log_location = td - yield td + def local_log_location(self, tmp_path_factory): + return str(tmp_path_factory.mktemp("local-gcs-log-location")) @pytest.fixture(autouse=True) def gcs_task_handler(self, create_log_template, local_log_location): @@ -128,7 +125,7 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client, "*** * gs://bucket/remote/log/location/1.log\n" "*** Unable to read remote log Failed to connect\n" "*** Found local files:\n" - f"*** * {self.local_log_location}/1.log\n" + f"*** * {self.gcs_task_handler.local_base}/1.log\n" ) assert metadata == {"end_of_log": True, "log_pos": 0} mock_blob.from_string.assert_called_once_with( From 5fedcc0ac8803528df19358d0f324b3b068f7509 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:04:49 +0100 Subject: [PATCH 09/19] add remote_task_handler_kwargs conf and add its content as kwargs for remote task handlers --- .../airflow_local_settings.py | 2 ++ airflow/config_templates/config.yml | 10 +++++++++ airflow/config_templates/default_airflow.cfg | 6 ++++++ tests/core/test_logging_config.py | 21 +++++++++++++++++++ 4 files changed, 39 insertions(+) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index f6a2cdbf64156..86482d3d05468 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -203,6 +203,7 @@ # WASB buckets should start with "wasb" # just to help Airflow select correct handler REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER") + REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={}) if REMOTE_BASE_LOG_FOLDER.startswith("s3://"): S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = { @@ -314,3 +315,4 @@ "section 'elasticsearch' if you are using Elasticsearch. In the other case, " "'remote_base_log_folder' option in the 'logging' section." ) + DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 9230b7227c9ba..5a6b4edbb6a0b 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -628,6 +628,16 @@ logging: type: string example: ~ default: "" + remote_task_handler_kwargs: + description: | + The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote + task handler and it overrides the values provided by Airflow config. For example if you set + `remote_base_log_folder=False` and you provide ``{{"delete_local_copy": true}}``, then the local + log files will be deleted after they are uploaded to remote location. + version_added: 2.6.0 + type: string + example: ~ + default: '{"delete_local_copy": true}' encrypt_s3_logs: description: | Use server-side encryption for logs stored in S3 diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 8f2ddfb30e033..b5a0a87175d99 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -358,6 +358,12 @@ google_key_path = # Stackdriver logs should start with "stackdriver://" remote_base_log_folder = +# The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote +# task handler and it overrides the values provided by Airflow config. For example if you set +# `remote_base_log_folder=False` and you provide ``{{"delete_local_copy": true}}``, then the local +# log files will be deleted after they are uploaded to remote location. +remote_task_handler_kwargs = {"delete_local_copy": true} + # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False diff --git a/tests/core/test_logging_config.py b/tests/core/test_logging_config.py index 70636ca67ec23..0d8fbbed214ce 100644 --- a/tests/core/test_logging_config.py +++ b/tests/core/test_logging_config.py @@ -316,3 +316,24 @@ def test_log_group_arns_remote_logging_with_cloudwatch_handler( airflow_local_settings.DEFAULT_LOGGING_CONFIG["handlers"]["task"]["log_group_arn"] == log_group_arn ) + + def test_loading_remote_logging_with_kwargs(self): + """Test if logging can be configured successfully with kwargs""" + from airflow.config_templates import airflow_local_settings + from airflow.logging_config import configure_logging + from airflow.utils.log.s3_task_handler import S3TaskHandler + + with conf_vars( + { + ("logging", "remote_logging"): "True", + ("logging", "remote_log_conn_id"): "some_s3", + ("logging", "remote_base_log_folder"): "s3://some-folder", + ("logging", "remote_task_handler_kwargs"): '{"delete_local_copy": true}', + } + ): + importlib.reload(airflow_local_settings) + configure_logging() + + logger = logging.getLogger("airflow.task") + assert isinstance(logger.handlers[0], S3TaskHandler) + assert getattr(logger.handlers[0], "delete_local_copy") is True From 893c7f750f9ef51c2f4b906c4547e22f4560d941 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:08:18 +0100 Subject: [PATCH 10/19] update s3 task handler to use kwargs and convert default value parsing to method in order to simplify testing --- .../amazon/aws/log/s3_task_handler.py | 25 +++++++++++-------- .../amazon/aws/log/test_s3_task_handler.py | 14 +++++++---- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 20ad0d9f988fd..20754075a2dac 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -28,12 +28,17 @@ from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.version import version -if Version(version) < Version("2.6"): - DEFAULT_DELETE_LOCAL_COPY = False -else: - DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") + +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + TODO: delete this function when min airflow version >= 2.6 + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") class S3TaskHandler(FileTaskHandler, LoggingMixin): @@ -46,19 +51,17 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): trigger_should_wrap = True def __init__( - self, - base_log_folder: str, - s3_log_folder: str, - filename_template: str | None = None, - delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, + self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None, **kwargs ): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder - self.delete_local_copy = delete_local_copy self.log_relative_path = "" self._hook = None self.closed = False self.upload_on_close = True + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + ) @cached_property def hook(self): diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index ea734deba3c4d..c5223bd21b3c1 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -238,12 +238,16 @@ def test_close_no_upload(self): boto3.resource("s3").Object("bucket", self.remote_log_key).get() @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], ) - def test_close_with_delete_local_copy_conf(self, delete_local_copy, expected_existence_of_local_copy): - handler = S3TaskHandler( - self.local_log_location, self.remote_log_base, delete_local_copy=delete_local_copy - ) + def test_close_with_delete_local_logs_conf( + self, delete_local_copy, expected_existence_of_local_copy, airflow_version + ): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): + handler = S3TaskHandler(self.local_log_location, self.remote_log_base) handler.log.info("test") handler.set_context(self.ti) From 9a6bde120474c08361e1343cf97c7c9873ac2757 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:09:47 +0100 Subject: [PATCH 11/19] do the same thing for gcs --- .../google/cloud/log/gcs_task_handler.py | 20 ++++++++++++------- .../google/cloud/log/test_gcs_task_handler.py | 17 +++++++++------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index d55dce1019ac7..303145310f10a 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -35,7 +35,6 @@ from airflow.providers.google.common.consts import CLIENT_INFO from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.version import version _DEFAULT_SCOPESS = frozenset( [ @@ -46,10 +45,15 @@ logger = logging.getLogger(__name__) -if Version(version) < Version("2.6"): - DEFAULT_DELETE_LOCAL_COPY = False -else: - DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + TODO: delete this function when min airflow version >= 2.6 + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") class GCSTaskHandler(FileTaskHandler, LoggingMixin): @@ -88,7 +92,7 @@ def __init__( gcp_keyfile_dict: dict | None = None, gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS, project_id: str | None = None, - delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, + **kwargs, ): super().__init__(base_log_folder, filename_template) self.remote_base = gcs_log_folder @@ -99,7 +103,9 @@ def __init__( self.gcp_keyfile_dict = gcp_keyfile_dict self.scopes = gcp_scopes self.project_id = project_id - self.delete_local_copy = delete_local_copy + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + ) @cached_property def hook(self) -> GCSHook | None: diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 5f491aa225079..ead895a3340db 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -246,7 +246,8 @@ def test_write_to_remote_on_close_failed_read_old_logs(self, mock_blob, mock_cli ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], ) @mock.patch( "airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id", @@ -262,14 +263,16 @@ def test_close_with_delete_local_copy_conf( local_log_location, delete_local_copy, expected_existence_of_local_copy, + airflow_version, ): mock_blob.from_string.return_value.download_as_bytes.return_value = b"CONTENT" - - handler = GCSTaskHandler( - base_log_folder=local_log_location, - gcs_log_folder="gs://bucket/remote/log/location", - delete_local_copy=delete_local_copy, - ) + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): + handler = GCSTaskHandler( + base_log_folder=local_log_location, + gcs_log_folder="gs://bucket/remote/log/location", + ) handler.log.info("test") handler.set_context(self.ti) From c26ca4eeee34fefad30216b6108f37ac2a6154bf Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:10:38 +0100 Subject: [PATCH 12/19] do the same thing for oss --- .../alibaba/cloud/log/oss_task_handler.py | 27 ++++++++++--------- .../cloud/log/test_oss_task_handler.py | 16 ++++++++--- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index 13e1ca84b28bc..512eda90c69e1 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -29,12 +29,17 @@ from airflow.providers.alibaba.cloud.hooks.oss import OSSHook from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.version import version -if Version(version) < Version("2.6"): - DEFAULT_DELETE_LOCAL_COPY = False -else: - DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") + +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + TODO: delete this function when min airflow version >= 2.6 + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") class OSSTaskHandler(FileTaskHandler, LoggingMixin): @@ -44,21 +49,17 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from OSS remote storage. """ - def __init__( - self, - base_log_folder, - oss_log_folder, - filename_template=None, - delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, - ): + def __init__(self, base_log_folder, oss_log_folder, filename_template=None, **kwargs): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) - self.delete_local_copy = delete_local_copy self.log_relative_path = "" self._hook = None self.closed = False self.upload_on_close = True + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + ) @cached_property def hook(self): diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py index 983c0e2004de3..0d0348d8aa07a 100644 --- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py +++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py @@ -26,6 +26,7 @@ from airflow.providers.alibaba.cloud.log.oss_task_handler import OSSTaskHandler from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime +from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs OSS_TASK_HANDLER_STRING = "airflow.providers.alibaba.cloud.log.oss_task_handler.{}" @@ -152,14 +153,23 @@ def test_oss_write_into_remote_non_existing_file_not_via_append(self, mock_servi ) @pytest.mark.parametrize( - "delete_local_copy, expected_existence_of_local_copy", [(True, False), (False, True)] + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], ) @mock.patch(OSS_TASK_HANDLER_STRING.format("OSSTaskHandler.hook"), new_callable=PropertyMock) def test_close_with_delete_local_copy_conf( - self, mock_service, tmp_path_factory, delete_local_copy, expected_existence_of_local_copy + self, + mock_service, + tmp_path_factory, + delete_local_copy, + expected_existence_of_local_copy, + airflow_version, ): local_log_path = str(tmp_path_factory.mktemp("local-oss-log-location")) - handler = OSSTaskHandler(local_log_path, self.oss_log_folder, delete_local_copy=delete_local_copy) + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): + handler = OSSTaskHandler(local_log_path, self.oss_log_folder) handler.log.info("test") handler.set_context(self.ti) From 3192e559603010654bcede5d61ac8c7c0a62d210 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:12:40 +0100 Subject: [PATCH 13/19] do the same thing for wasb, add a condition on success write data and improve existing tests --- .../microsoft/azure/log/wasb_task_handler.py | 29 ++++++++++------ .../azure/log/test_wasb_task_handler.py | 34 +++++++++++++++++-- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index f0f1e9f34d4be..aab84a04c5499 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -29,12 +29,17 @@ from airflow.configuration import conf from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.version import version -if Version(version) < Version("2.6"): - DEFAULT_DELETE_LOCAL_COPY = False -else: - DEFAULT_DELETE_LOCAL_COPY = conf.getboolean("logging", "delete_local_logs") + +def get_default_delete_local_copy(): + """Load delete_local_logs conf if Airflow version > 2.6 and return False if not + TODO: delete this function when min airflow version >= 2.6 + """ + from airflow.version import version + + if Version(version) < Version("2.6"): + return False + return conf.getboolean("logging", "delete_local_logs") class WasbTaskHandler(FileTaskHandler, LoggingMixin): @@ -52,8 +57,8 @@ def __init__( wasb_log_folder: str, wasb_container: str, *, - delete_local_copy: bool = DEFAULT_DELETE_LOCAL_COPY, filename_template: str | None = None, + **kwargs, ) -> None: super().__init__(base_log_folder, filename_template) self.wasb_container = wasb_container @@ -62,7 +67,9 @@ def __init__( self._hook = None self.closed = False self.upload_on_close = True - self.delete_local_copy = delete_local_copy + self.delete_local_copy = ( + kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy() + ) @cached_property def hook(self): @@ -114,9 +121,9 @@ def close(self) -> None: # read log and remove old logs to get just the latest additions with open(local_loc) as logfile: log = logfile.read() - self.wasb_write(log, remote_loc, append=True) + wasb_write = self.wasb_write(log, remote_loc, append=True) - if self.delete_local_copy: + if wasb_write and self.delete_local_copy: shutil.rmtree(os.path.dirname(local_loc)) # Mark closed so we don't double write if close is called twice self.closed = True @@ -216,7 +223,7 @@ def wasb_read(self, remote_log_location: str, return_error: bool = False): return msg return "" - def wasb_write(self, log: str, remote_log_location: str, append: bool = True) -> None: + def wasb_write(self, log: str, remote_log_location: str, append: bool = True) -> bool: """ Writes the log to the remote_log_location. Fails silently if no hook was created. @@ -234,3 +241,5 @@ def wasb_write(self, log: str, remote_log_location: str, append: bool = True) -> self.hook.load_string(log, self.wasb_container, remote_log_location, overwrite=True) except Exception: self.log.exception("Could not write logs to %s", remote_log_location) + return False + return True diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py index 60b6947f619ae..73001ae21bfc2 100644 --- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py +++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py @@ -17,6 +17,7 @@ from __future__ import annotations import copy +import os import tempfile from pathlib import Path from unittest import mock @@ -65,9 +66,6 @@ def setup_method(self): delete_local_copy=True, ) - def teardown_method(self): - self.wasb_task_handler.close() - @conf_vars({("logging", "remote_log_conn_id"): "wasb_default"}) @mock.patch("airflow.providers.microsoft.azure.hooks.wasb.BlobServiceClient") def test_hook(self, mock_service): @@ -175,3 +173,33 @@ def test_write_raises(self): mock_error.assert_called_once_with( "Could not write logs to %s", "remote/log/location/1.log", exc_info=True ) + + @pytest.mark.parametrize( + "delete_local_copy, expected_existence_of_local_copy, airflow_version", + [(True, False, "2.6.0"), (False, True, "2.6.0"), (True, True, "2.5.0"), (False, True, "2.5.0")], + ) + @mock.patch("airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler.wasb_write") + def test_close_with_delete_local_logs_conf( + self, + wasb_write_mock, + ti, + tmp_path_factory, + delete_local_copy, + expected_existence_of_local_copy, + airflow_version, + ): + with conf_vars({("logging", "delete_local_logs"): str(delete_local_copy)}), mock.patch( + "airflow.version.version", airflow_version + ): + handler = WasbTaskHandler( + base_log_folder=str(tmp_path_factory.mktemp("local-s3-log-location")), + wasb_log_folder=self.wasb_log_folder, + wasb_container=self.container_name, + ) + wasb_write_mock.return_value = True + handler.log.info("test") + handler.set_context(ti) + assert handler.upload_on_close + + handler.close() + assert os.path.exists(handler.handler.baseFilename) == expected_existence_of_local_copy From 0bd84966c895d43305524c42d8a5956f83df4d27 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:23:22 +0100 Subject: [PATCH 14/19] fix a mistake in config file description --- airflow/config_templates/config.yml | 2 +- airflow/config_templates/default_airflow.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 5a6b4edbb6a0b..a6a9582f28724 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -632,7 +632,7 @@ logging: description: | The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote task handler and it overrides the values provided by Airflow config. For example if you set - `remote_base_log_folder=False` and you provide ``{{"delete_local_copy": true}}``, then the local + `delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local log files will be deleted after they are uploaded to remote location. version_added: 2.6.0 type: string diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index b5a0a87175d99..10f68ebe10e3c 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -360,7 +360,7 @@ remote_base_log_folder = # The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote # task handler and it overrides the values provided by Airflow config. For example if you set -# `remote_base_log_folder=False` and you provide ``{{"delete_local_copy": true}}``, then the local +# `delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local # log files will be deleted after they are uploaded to remote location. remote_task_handler_kwargs = {"delete_local_copy": true} From 60a1549c2a59447ebda05a0193e8eaec6b24c14b Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Sun, 26 Feb 2023 23:29:24 +0100 Subject: [PATCH 15/19] add delete_local_logs to logging tasks doc --- .../logging-monitoring/logging-tasks.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst index c7d5819b31b83..a12ade359ca9c 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst @@ -25,6 +25,15 @@ Core Airflow provides an interface FileTaskHandler, which writes task logs to fi services (:doc:`apache-airflow-providers:index`) and some of them provide handlers that extend the logging capability of Apache Airflow. You can see all of these providers in :doc:`apache-airflow-providers:core-extensions/logging`. +When using S3, GCS, WASB or OSS remote logging service, you can delete the local log files after +they are uploaded to the remote location, by setting th config: + .. code-block:: ini + + [logging] + remote_logging = True + remote_base_log_folder = schema://path/to/remote/log + delete_local_logs = True + Configuring logging ------------------- From d5aeb69381192d92e85afe0db8406deb6b3f16f1 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Fri, 3 Mar 2023 02:07:59 +0100 Subject: [PATCH 16/19] fix remote_task_handler_kwargs default value --- airflow/config_templates/config.yml | 4 ++-- airflow/config_templates/default_airflow.cfg | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 240a6888e2248..a0a83bec79a45 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -644,8 +644,8 @@ logging: log files will be deleted after they are uploaded to remote location. version_added: 2.6.0 type: string - example: ~ - default: '{"delete_local_copy": true}' + example: '{"delete_local_copy": true}' + default: "" encrypt_s3_logs: description: | Use server-side encryption for logs stored in S3 diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c92c095c4ed96..1b29dbb145de2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -367,7 +367,8 @@ remote_base_log_folder = # task handler and it overrides the values provided by Airflow config. For example if you set # `delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local # log files will be deleted after they are uploaded to remote location. -remote_task_handler_kwargs = {"delete_local_copy": true} +# Example: remote_task_handler_kwargs = {{"delete_local_copy": true}} +remote_task_handler_kwargs = # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False From b476b2a33fbe5e0bbedb5e8c734312a4800a11b0 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Fri, 3 Mar 2023 22:01:08 +0100 Subject: [PATCH 17/19] remove indentation to fix build docs --- .../logging-monitoring/logging-tasks.rst | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst index a12ade359ca9c..1d6f6db72ca11 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst @@ -27,12 +27,13 @@ capability of Apache Airflow. You can see all of these providers in :doc:`apache When using S3, GCS, WASB or OSS remote logging service, you can delete the local log files after they are uploaded to the remote location, by setting th config: - .. code-block:: ini - [logging] - remote_logging = True - remote_base_log_folder = schema://path/to/remote/log - delete_local_logs = True +.. code-block:: ini + + [logging] + remote_logging = True + remote_base_log_folder = schema://path/to/remote/log + delete_local_logs = True Configuring logging ------------------- From 06813d32a9e32e9175da562be3c822caeebbdca3 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Fri, 3 Mar 2023 22:45:05 +0100 Subject: [PATCH 18/19] fix spelling --- .../logging-monitoring/logging-tasks.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst index 1d6f6db72ca11..f3da6dbabede7 100644 --- a/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/administration-and-deployment/logging-monitoring/logging-tasks.rst @@ -26,7 +26,7 @@ services (:doc:`apache-airflow-providers:index`) and some of them provide handle capability of Apache Airflow. You can see all of these providers in :doc:`apache-airflow-providers:core-extensions/logging`. When using S3, GCS, WASB or OSS remote logging service, you can delete the local log files after -they are uploaded to the remote location, by setting th config: +they are uploaded to the remote location, by setting the config: .. code-block:: ini From 062c5e72c752f8ae66f1694f1d34769c0f25052f Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Mon, 6 Mar 2023 23:45:04 +0100 Subject: [PATCH 19/19] Apply suggestions from code review Co-authored-by: Niko Oliveira --- airflow/config_templates/config.yml | 2 +- airflow/config_templates/default_airflow.cfg | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index a0a83bec79a45..2e96bee26d74e 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -609,7 +609,7 @@ logging: delete_local_logs: description: | Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after - they are uploaded to remote location + they are uploaded to the remote location. version_added: 2.6.0 type: string example: ~ diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 1b29dbb145de2..fe4a538b0ebd9 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -346,7 +346,7 @@ remote_logging = False remote_log_conn_id = # Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after -# they are uploaded to remote location +# they are uploaded to the remote location. delete_local_logs = False # Path to Google Credential JSON file. If omitted, authorization based on `the Application Default