Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support deleting the local log files when using remote logging #29772

Merged
merged 24 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a5b0686
add the new conf to s3 handler to delete local log when it is true an…
hussein-awala Feb 26, 2023
4941b02
add the new conf to GCS handler to delete local log when it is true a…
hussein-awala Feb 26, 2023
5dd3dae
add the new conf to OSS handler to delete local log when it is true a…
hussein-awala Feb 26, 2023
b06a4e1
add unit test for s3 task handler
hussein-awala Feb 26, 2023
c77ca64
add unit test for gcs task handler
hussein-awala Feb 26, 2023
d8086b5
add unit test for oss task handler
hussein-awala Feb 26, 2023
ad9a99a
add a new config and use it as default value for
hussein-awala Feb 26, 2023
e7153b0
improve gcs task handler tests
hussein-awala Feb 26, 2023
5fedcc0
add remote_task_handler_kwargs conf and add its content as kwargs for…
hussein-awala Feb 26, 2023
893c7f7
update s3 task handler to use kwargs and convert default value parsin…
hussein-awala Feb 26, 2023
9a6bde1
do the same thing for gcs
hussein-awala Feb 26, 2023
c26ca4e
do the same thing for oss
hussein-awala Feb 26, 2023
3192e55
do the same thing for wasb, add a condition on success write data and…
hussein-awala Feb 26, 2023
0bd8496
fix a mistake in config file description
hussein-awala Feb 26, 2023
60a1549
add delete_local_logs to logging tasks doc
hussein-awala Feb 26, 2023
8b70c27
Merge branch 'main' into feat/delete_local_logs
hussein-awala Feb 27, 2023
0671e84
Merge branch 'main' into feat/delete_local_logs
hussein-awala Mar 1, 2023
46dc858
Merge branch 'main' into feat/delete_local_logs
hussein-awala Mar 3, 2023
d5aeb69
fix remote_task_handler_kwargs default value
hussein-awala Mar 3, 2023
fb392a7
Merge branch 'main' into feat/delete_local_logs
hussein-awala Mar 3, 2023
b476b2a
remove indentation to fix build docs
hussein-awala Mar 3, 2023
06813d3
fix spelling
hussein-awala Mar 3, 2023
062c5e7
Apply suggestions from code review
hussein-awala Mar 6, 2023
a4f40b3
Merge branch 'main' into feat/delete_local_logs
hussein-awala Mar 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion airflow/config_templates/airflow_local_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@
# WASB buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})

if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
Expand Down Expand Up @@ -252,7 +253,6 @@
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": "airflow-logs",
"filename_template": FILENAME_TEMPLATE,
"delete_local_copy": False,
},
}

Expand Down Expand Up @@ -315,3 +315,4 @@
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
"'remote_base_log_folder' option in the 'logging' section."
)
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
18 changes: 18 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,14 @@ logging:
type: string
example: ~
default: ""
delete_local_logs:
description: |
Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after
they are uploaded to the remote location.
version_added: 2.6.0
type: string
example: ~
default: "False"
google_key_path:
description: |
Path to Google Credential JSON file. If omitted, authorization based on `the Application Default
Expand All @@ -628,6 +636,16 @@ logging:
type: string
example: ~
default: ""
remote_task_handler_kwargs:
description: |
The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote
task handler and it overrides the values provided by Airflow config. For example if you set
`delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local
log files will be deleted after they are uploaded to remote location.
version_added: 2.6.0
type: string
example: '{"delete_local_copy": true}'
default: ""
encrypt_s3_logs:
description: |
Use server-side encryption for logs stored in S3
Expand Down
11 changes: 11 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ remote_logging = False
# reading logs, not writing them.
remote_log_conn_id =

# Whether the local log files for GCS, S3, WASB and OSS remote logging should be deleted after
# they are uploaded to the remote location.
delete_local_logs = False

# Path to Google Credential JSON file. If omitted, authorization based on `the Application Default
# Credentials
# <https://cloud.google.com/docs/authentication/production#finding_credentials_automatically>`__ will
Expand All @@ -359,6 +363,13 @@ google_key_path =
# Stackdriver logs should start with "stackdriver://"
remote_base_log_folder =

# The remote_task_handler_kwargs param is loaded into a dictionary and passed to __init__ of remote
# task handler and it overrides the values provided by Airflow config. For example if you set
# `delete_local_logs=False` and you provide ``{{"delete_local_copy": true}}``, then the local
# log files will be deleted after they are uploaded to remote location.
# Example: remote_task_handler_kwargs = {{"delete_local_copy": true}}
remote_task_handler_kwargs =

# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False

Expand Down
32 changes: 27 additions & 5 deletions airflow/providers/alibaba/cloud/log/oss_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import contextlib
import os
import pathlib
import shutil

from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -28,21 +31,35 @@
from airflow.utils.log.logging_mixin import LoggingMixin


def get_default_delete_local_copy():
hussein-awala marked this conversation as resolved.
Show resolved Hide resolved
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class OSSTaskHandler(FileTaskHandler, LoggingMixin):
"""
OSSTaskHandler is a python log handler that handles and reads
task instance logs. It extends airflow FileTaskHandler and
uploads to and reads from OSS remote storage.
"""

def __init__(self, base_log_folder, oss_log_folder, filename_template=None):
def __init__(self, base_log_folder, oss_log_folder, filename_template=None, **kwargs):
self.log.info("Using oss_task_handler for remote logging...")
super().__init__(base_log_folder, filename_template)
(self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder)
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self):
Expand Down Expand Up @@ -92,7 +109,9 @@ def close(self):
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
log = pathlib.Path(local_loc).read_text()
self.oss_write(log, remote_loc)
oss_write = self.oss_write(log, remote_loc)
if oss_write and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -154,15 +173,16 @@ def oss_read(self, remote_log_location, return_error=False):
if return_error:
return msg

def oss_write(self, log, remote_log_location, append=True):
def oss_write(self, log, remote_log_location, append=True) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.

:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:return: whether the log is successfully written to remote location or not.
"""
oss_remote_log_location = f"{self.base_folder}/{remote_log_location}"
pos = 0
Expand All @@ -180,3 +200,5 @@ def oss_write(self, log, remote_log_location, append=True):
str(pos),
str(append),
)
return False
return True
35 changes: 30 additions & 5 deletions airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

import os
import pathlib
import shutil

from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -27,6 +30,17 @@
from airflow.utils.log.logging_mixin import LoggingMixin


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class S3TaskHandler(FileTaskHandler, LoggingMixin):
"""
S3TaskHandler is a python log handler that handles and reads
Expand All @@ -36,13 +50,18 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):

trigger_should_wrap = True

def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None):
def __init__(
self, base_log_folder: str, s3_log_folder: str, filename_template: str | None = None, **kwargs
):
super().__init__(base_log_folder, filename_template)
self.remote_base = s3_log_folder
self.log_relative_path = ""
self._hook = None
self.closed = False
self.upload_on_close = True
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self):
Expand Down Expand Up @@ -84,7 +103,9 @@ def close(self):
if os.path.exists(local_loc):
# read log and remove old logs to get just the latest additions
log = pathlib.Path(local_loc).read_text()
self.s3_write(log, remote_loc)
write_to_s3 = self.s3_write(log, remote_loc)
if write_to_s3 and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -164,23 +185,25 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str:
return msg
return ""

def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1):
def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.

:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:param max_retry: Maximum number of times to retry on upload failure
:return: whether the log is successfully written to remote location or not.
"""
try:
if append and self.s3_log_exists(remote_log_location):
old_log = self.s3_read(remote_log_location)
log = "\n".join([old_log, log]) if old_log else log
except Exception:
self.log.exception("Could not verify previous log to append")
return False

# Default to a single retry attempt because s3 upload failures are
# rare but occasionally occur. Multiple retry attempts are unlikely
Expand All @@ -199,3 +222,5 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_
self.log.warning("Failed attempt to write logs to %s, will retry", remote_log_location)
else:
self.log.exception("Could not write logs to %s", remote_log_location)
return False
return True
32 changes: 28 additions & 4 deletions airflow/providers/google/cloud/log/gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import logging
import os
import shutil
from pathlib import Path
from typing import Collection

# not sure why but mypy complains on missing `storage` but it is clearly there and is importable
from google.cloud import storage # type: ignore[attr-defined]
from packaging.version import Version

from airflow.compat.functools import cached_property
from airflow.configuration import conf
Expand All @@ -43,6 +45,17 @@
logger = logging.getLogger(__name__)


def get_default_delete_local_copy():
"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not
TODO: delete this function when min airflow version >= 2.6
"""
from airflow.version import version

if Version(version) < Version("2.6"):
return False
return conf.getboolean("logging", "delete_local_logs")


class GCSTaskHandler(FileTaskHandler, LoggingMixin):
"""
GCSTaskHandler is a python log handler that handles and reads
Expand All @@ -63,6 +76,8 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
:param gcp_scopes: Comma-separated string containing OAuth2 scopes
:param project_id: Project ID to read the secrets from. If not passed, the project ID from credentials
will be used.
:param delete_local_copy: Whether local log files should be deleted after they are downloaded when using
remote logging
"""

trigger_should_wrap = True
Expand All @@ -77,6 +92,7 @@ def __init__(
gcp_keyfile_dict: dict | None = None,
gcp_scopes: Collection[str] | None = _DEFAULT_SCOPESS,
project_id: str | None = None,
**kwargs,
):
super().__init__(base_log_folder, filename_template)
self.remote_base = gcs_log_folder
Expand All @@ -87,6 +103,9 @@ def __init__(
self.gcp_keyfile_dict = gcp_keyfile_dict
self.scopes = gcp_scopes
self.project_id = project_id
self.delete_local_copy = (
kwargs["delete_local_copy"] if "delete_local_copy" in kwargs else get_default_delete_local_copy()
)

@cached_property
def hook(self) -> GCSHook | None:
Expand Down Expand Up @@ -147,7 +166,9 @@ def close(self):
# read log and remove old logs to get just the latest additions
with open(local_loc) as logfile:
log = logfile.read()
self.gcs_write(log, remote_loc)
gcs_write = self.gcs_write(log, remote_loc)
if gcs_write and self.delete_local_copy:
shutil.rmtree(os.path.dirname(local_loc))

# Mark closed so we don't double write if close is called twice
self.closed = True
Expand Down Expand Up @@ -207,13 +228,14 @@ def _read(self, ti, try_number, metadata=None):

return "".join([f"*** {x}\n" for x in messages]) + "\n".join(logs), {"end_of_log": True}

def gcs_write(self, log, remote_log_location):
def gcs_write(self, log, remote_log_location) -> bool:
"""
Writes the log to the remote_log_location. Fails silently if no log
was created.
Writes the log to the remote_log_location and return `True` when done. Fails silently
and return `False` if no log was created.

:param log: the log to write to the remote_log_location
:param remote_log_location: the log's location in remote storage
:return: whether the log is successfully written to remote location or not.
"""
try:
blob = storage.Blob.from_string(remote_log_location, self.client)
Expand All @@ -232,6 +254,8 @@ def gcs_write(self, log, remote_log_location):
blob.upload_from_string(log, content_type="text/plain")
except Exception as e:
self.log.error("Could not write logs to %s: %s", remote_log_location, e)
return False
return True

@staticmethod
def no_log_found(exc):
Expand Down
Loading