From 261e435028f8d99dd2b9cda234f16b61f704b84f Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 3 Jun 2022 14:16:58 +0800 Subject: [PATCH 01/14] Apply per-run log templates to log handlers --- .../airflow_local_settings.py | 1 - airflow/models/dagrun.py | 6 ++--- .../alibaba/cloud/log/oss_task_handler.py | 2 +- .../amazon/aws/log/cloudwatch_task_handler.py | 3 ++- .../amazon/aws/log/s3_task_handler.py | 3 ++- .../elasticsearch/log/es_task_handler.py | 20 +++++++++++--- .../google/cloud/log/gcs_task_handler.py | 2 +- .../microsoft/azure/log/wasb_task_handler.py | 3 ++- airflow/utils/log/file_task_handler.py | 26 +++++++++++++------ airflow/utils/log/log_reader.py | 2 +- 10 files changed, 46 insertions(+), 22 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index b2752c2be7c25..6684fd18e51a0 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -82,7 +82,6 @@ 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), - 'filename_template': FILENAME_TEMPLATE, 'filters': ['mask_secrets'], }, 'processor': { diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index eeec4d5b099b2..a6698f8ce38ea 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1065,11 +1065,11 @@ def schedule_tis(self, schedulable_tis: Iterable[TI], session: Session = NEW_SES return count @provide_session - def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: + def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: if self.log_template_id is None: # DagRun created before LogTemplate introduction. - template = session.query(LogTemplate.filename).order_by(LogTemplate.id).limit(1).scalar() + template = session.query(LogTemplate).order_by(LogTemplate.id).first() else: - template = session.query(LogTemplate.filename).filter_by(id=self.log_template_id).scalar() + template = session.query(LogTemplate).filter_by(id=self.log_template_id).one_or_none() if template is None: raise AirflowException( f"No log_template entry found for ID {self.log_template_id!r}. " diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index bb1d801ac9821..2162fa1a3a9ed 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -38,7 +38,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from OSS remote storage. """ - def __init__(self, base_log_folder, oss_log_folder, filename_template): + def __init__(self, base_log_folder, oss_log_folder, *, filename_template=None): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) diff --git a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py index c975a2cb83fc6..7d4f81006b380 100644 --- a/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py +++ b/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py @@ -17,6 +17,7 @@ # under the License. import sys from datetime import datetime +from typing import Optional import watchtower @@ -42,7 +43,7 @@ class CloudwatchTaskHandler(FileTaskHandler, LoggingMixin): :param filename_template: template for file name (local storage) or log stream name (remote) """ - def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: str): + def __init__(self, base_log_folder: str, log_group_arn: str, filename_template: Optional[str] = None): super().__init__(base_log_folder, filename_template) split_arn = log_group_arn.split(':') diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index ce3da88f16916..60b2e289919d4 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -18,6 +18,7 @@ import os import pathlib import sys +from typing import Optional if sys.version_info >= (3, 8): from functools import cached_property @@ -36,7 +37,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from S3 remote storage. """ - def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: str): + def __init__(self, base_log_folder: str, s3_log_folder: str, *, filename_template: Optional[str] = None): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder self.log_relative_path = '' diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 83c1163d80c87..2d65592c85f09 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -18,6 +18,7 @@ import logging import sys +import warnings from collections import defaultdict from datetime import datetime from operator import attrgetter @@ -36,6 +37,7 @@ from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin +from airflow.utils.session import create_session # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] @@ -65,8 +67,6 @@ class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMix def __init__( self, base_log_folder: str, - filename_template: str, - log_id_template: str, end_of_log_mark: str, write_stdout: bool, json_format: bool, @@ -76,6 +76,9 @@ def __init__( host: str = "localhost:9200", frontend: str = "localhost:5601", es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"), + *, + filename_template: Optional[str] = None, + log_id_template: Optional[str] = None, ): """ :param base_log_folder: base folder to store logs locally @@ -88,6 +91,12 @@ def __init__( self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined] + if log_id_template is not None: + warnings.warn( + "Passing log_id_template to the log handler is deprecated and has not effect", + DeprecationWarning, + ) + self.log_id_template = log_id_template self.frontend = frontend self.mark_end_on_close = True @@ -103,7 +112,10 @@ def __init__( self.handler: Union[logging.FileHandler, logging.StreamHandler] # type: ignore[assignment] def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: - dag_run = ti.get_dagrun() + with create_session() as session: + dag_run = ti.get_dagrun(session=session) + log_id_template = dag_run.get_log_template(session=session).elasticsearch_id + dag = ti.task.dag assert dag is not None # For Mypy. try: @@ -126,7 +138,7 @@ def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: data_interval_end = "" execution_date = dag_run.execution_date.isoformat() - return self.log_id_template.format( + return log_id_template.format( dag_id=ti.dag_id, task_id=ti.task_id, run_id=getattr(ti, "run_id", ""), diff --git a/airflow/providers/google/cloud/log/gcs_task_handler.py b/airflow/providers/google/cloud/log/gcs_task_handler.py index 92d133d109af5..81f1426d75154 100644 --- a/airflow/providers/google/cloud/log/gcs_task_handler.py +++ b/airflow/providers/google/cloud/log/gcs_task_handler.py @@ -67,7 +67,7 @@ def __init__( *, base_log_folder: str, gcs_log_folder: str, - filename_template: str, + filename_template: Optional[str] = None, gcp_key_path: Optional[str] = None, gcp_keyfile_dict: Optional[dict] = None, gcp_scopes: Optional[Collection[str]] = _DEFAULT_SCOPESS, diff --git a/airflow/providers/microsoft/azure/log/wasb_task_handler.py b/airflow/providers/microsoft/azure/log/wasb_task_handler.py index 9ec0cdf646fc4..f5e89c2c21a68 100644 --- a/airflow/providers/microsoft/azure/log/wasb_task_handler.py +++ b/airflow/providers/microsoft/azure/log/wasb_task_handler.py @@ -44,8 +44,9 @@ def __init__( base_log_folder: str, wasb_log_folder: str, wasb_container: str, - filename_template: str, delete_local_copy: str, + *, + filename_template: Optional[str] = None, ) -> None: super().__init__(base_log_folder, filename_template) self.wasb_container = wasb_container diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index e0561991b255f..f81dd4cc4cd63 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -18,6 +18,7 @@ """File logging handler for tasks.""" import logging import os +import warnings from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Optional, Tuple @@ -28,6 +29,7 @@ from airflow.utils.context import Context from airflow.utils.helpers import parse_template_string, render_template_to_string from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler +from airflow.utils.session import create_session if TYPE_CHECKING: from airflow.models import TaskInstance @@ -44,11 +46,15 @@ class FileTaskHandler(logging.Handler): :param filename_template: template filename string """ - def __init__(self, base_log_folder: str, filename_template: str): + def __init__(self, base_log_folder: str, filename_template: Optional[str] = None): super().__init__() self.handler: Optional[logging.FileHandler] = None self.local_base = base_log_folder - self.filename_template, self.filename_jinja_template = parse_template_string(filename_template) + if filename_template is not None: + warnings.warn( + "Passing filename_template to FileTaskHandler is deprecated and has not effect", + DeprecationWarning, + ) def set_context(self, ti: "TaskInstance"): """ @@ -75,15 +81,19 @@ def close(self): self.handler.close() def _render_filename(self, ti: "TaskInstance", try_number: int) -> str: - if self.filename_jinja_template: + with create_session() as session: + dag_run = ti.get_dagrun(session=session) + template = dag_run.get_log_template(session=session).filename + str_tpl, jinja_tpl = parse_template_string(template) + + if jinja_tpl: if hasattr(ti, "task"): context = ti.get_template_context() else: - context = Context(ti=ti, ts=ti.get_dagrun().logical_date.isoformat()) + context = Context(ti=ti, ts=dag_run.logical_date.isoformat()) context["try_number"] = try_number - return render_template_to_string(self.filename_jinja_template, context) - elif self.filename_template: - dag_run = ti.get_dagrun() + return render_template_to_string(jinja_tpl, context) + elif str_tpl: dag = ti.task.dag assert dag is not None # For Mypy. try: @@ -98,7 +108,7 @@ def _render_filename(self, ti: "TaskInstance", try_number: int) -> str: data_interval_end = data_interval[1].isoformat() else: data_interval_end = "" - return self.filename_template.format( + return str_tpl.format( dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id, diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index 396ab90a324f5..f241c22df188d 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -121,6 +121,6 @@ def render_log_filename( attachment_filename = render_log_filename( ti=ti, try_number="all" if try_number is None else try_number, - filename_template=dagrun.get_log_filename_template(session=session), + filename_template=dagrun.get_log_template(session=session).filename, ) return attachment_filename From b3393219b31183226f309938e1f8c53e40e927ba Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Fri, 3 Jun 2022 20:14:45 +0800 Subject: [PATCH 02/14] Fix log template rendering in some tests --- tests/task/task_runner/test_task_runner.py | 1 + tests/utils/log/test_log_reader.py | 12 +++-- tests/utils/test_log_handlers.py | 59 +++++++++++++++------- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/tests/task/task_runner/test_task_runner.py b/tests/task/task_runner/test_task_runner.py index ab140e05f598f..fc5f3cc894650 100644 --- a/tests/task/task_runner/test_task_runner.py +++ b/tests/task/task_runner/test_task_runner.py @@ -36,6 +36,7 @@ def test_should_have_valid_imports(self, import_path): def test_should_support_core_task_runner(self, mock_subprocess): ti = mock.MagicMock(map_index=-1, run_as_user=None) ti.get_template_context.return_value = {"ti": ti} + ti.get_dagrun.return_value.get_log_template.return_value.filename = "blah" local_task_job = mock.MagicMock(task_instance=ti) task_runner = get_task_runner(local_task_job) diff --git a/tests/utils/log/test_log_reader.py b/tests/utils/log/test_log_reader.py index 4e6e942741df8..9a76ada725d33 100644 --- a/tests/utils/log/test_log_reader.py +++ b/tests/utils/log/test_log_reader.py @@ -29,6 +29,7 @@ from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DagRun +from airflow.models.tasklog import LogTemplate from airflow.operators.python import PythonOperator from airflow.timetables.base import DataInterval from airflow.utils import timezone @@ -44,6 +45,7 @@ class TestLogView: DAG_ID = "dag_log_reader" TASK_ID = "task_log_reader" DEFAULT_DATE = timezone.datetime(2017, 9, 1) + FILENAME_TEMPLATE = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log" @pytest.fixture(autouse=True) def log_dir(self): @@ -70,9 +72,7 @@ def settings_folder(self): def configure_loggers(self, log_dir, settings_folder): logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) logging_config["handlers"]["task"]["base_log_folder"] = log_dir - logging_config["handlers"]["task"][ - "filename_template" - ] = "{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(':', '.') }}/{{ try_number }}.log" + logging_config["handlers"]["task"]["filename_template"] = self.FILENAME_TEMPLATE settings_file = os.path.join(settings_folder, "airflow_local_settings.py") with open(settings_file, "w") as handle: new_logging_file = f"LOGGING_CONFIG = {logging_config}" @@ -93,6 +93,10 @@ def prepare_log_files(self, log_dir): @pytest.fixture(autouse=True) def prepare_db(self, create_task_instance): + session = settings.Session() + log_template = LogTemplate(filename=self.FILENAME_TEMPLATE, elasticsearch_id="") + session.add(log_template) + session.commit() ti = create_task_instance( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -107,6 +111,8 @@ def prepare_db(self, create_task_instance): yield clear_db_runs() clear_db_dags() + session.delete(log_template) + session.commit() def test_test_read_log_chunks_should_read_one_try(self): task_log_reader = TaskLogReader() diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index f4b4f7b2e31d7..0beb42d5e1b2e 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -23,8 +23,10 @@ import pytest +from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance +from airflow.models.tasklog import LogTemplate from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import set_context @@ -219,33 +221,54 @@ def task_callable(ti): @pytest.fixture() -def filename_rendering_ti(session, create_task_instance): - return create_task_instance( - dag_id='dag_for_testing_filename_rendering', - task_id='task_for_testing_filename_rendering', - run_type=DagRunType.SCHEDULED, - execution_date=DEFAULT_DATE, - session=session, - ) +def create_log_template(request): + session = settings.Session() + + def _create_log_template(filename_template): + log_template = LogTemplate(filename=filename_template, elasticsearch_id="") + session.add(log_template) + session.commit() + + def _delete_log_template(): + session.delete(log_template) + session.commit() + + request.addfinalizer(_delete_log_template) + + return _create_log_template class TestFilenameRendering: - def test_python_formatting(self, filename_rendering_ti): - expected_filename = ( - f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/' - f'{DEFAULT_DATE.isoformat()}/42.log' + def test_python_formatting(self, create_log_template, create_task_instance): + create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) - fth = FileTaskHandler('', '{dag_id}/{task_id}/{execution_date}/{try_number}.log') + expected_filename = ( + f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/" + f"{DEFAULT_DATE.isoformat()}/42.log" + ) + fth = FileTaskHandler("") rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename - def test_jinja_rendering(self, filename_rendering_ti): - expected_filename = ( - f'dag_for_testing_filename_rendering/task_for_testing_filename_rendering/' - f'{DEFAULT_DATE.isoformat()}/42.log' + def test_jinja_rendering(self, create_log_template, create_task_instance): + create_log_template("{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log") + filename_rendering_ti = create_task_instance( + dag_id="dag_for_testing_filename_rendering", + task_id="task_for_testing_filename_rendering", + run_type=DagRunType.SCHEDULED, + execution_date=DEFAULT_DATE, ) - fth = FileTaskHandler('', '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log') + expected_filename = ( + f"dag_for_testing_filename_rendering/task_for_testing_filename_rendering/" + f"{DEFAULT_DATE.isoformat()}/42.log" + ) + fth = FileTaskHandler("") rendered_filename = fth._render_filename(filename_rendering_ti, 42) assert expected_filename == rendered_filename From 63c4e148ab720cae9f92312d1adf38b7b4944e81 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 3 Jun 2022 11:47:28 -0600 Subject: [PATCH 03/14] Fix tests --- airflow/config_templates/default_test.cfg | 1 - .../endpoints/test_log_endpoint.py | 8 +-- tests/conftest.py | 21 ++++++ .../elasticsearch/log/test_es_task_handler.py | 68 ++++++++----------- tests/utils/test_log_handlers.py | 22 ------ .../task_for_testing_log_view/1.log | 1 + .../attempt=1.log | 1 + tests/www/views/test_views_log.py | 3 - 8 files changed, 57 insertions(+), 68 deletions(-) create mode 100644 tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log create mode 100644 tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 2f9b6fa264b13..83260d0d5250f 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -54,7 +54,6 @@ base_log_folder = {AIRFLOW_HOME}/logs logging_level = INFO celery_logging_level = WARN fab_logging_level = WARN -log_filename_template = {{{{ ti.dag_id }}}}/{{{{ ti.task_id }}}}/{{{{ ts }}}}/{{{{ try_number }}}}.log log_processor_filename_template = {{{{ filename }}}}.log dag_processor_manager_log_location = {AIRFLOW_HOME}/logs/dag_processor_manager/dag_processor_manager.log worker_log_server_port = 8793 diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py b/tests/api_connexion/endpoints/test_log_endpoint.py index 614e1fa3a19db..1b226be96f985 100644 --- a/tests/api_connexion/endpoints/test_log_endpoint.py +++ b/tests/api_connexion/endpoints/test_log_endpoint.py @@ -99,7 +99,7 @@ def setup_attrs(self, configured_app, configure_loggers, dag_maker, session) -> self.ti.hostname = 'localhost' @pytest.fixture - def configure_loggers(self, tmp_path): + def configure_loggers(self, tmp_path, create_log_template): self.log_dir = tmp_path dir_path = tmp_path / self.DAG_ID / self.TASK_ID / self.default_time.replace(':', '.') @@ -112,9 +112,9 @@ def configure_loggers(self, tmp_path): logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG) logging_config['handlers']['task']['base_log_folder'] = self.log_dir - logging_config['handlers']['task'][ - 'filename_template' - ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' + create_log_template( + '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' + ) logging.config.dictConfig(logging_config) diff --git a/tests/conftest.py b/tests/conftest.py index 1328ea01d55f3..b20c0be494087 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -34,6 +34,9 @@ os.environ["AWS_DEFAULT_REGION"] = os.environ.get("AWS_DEFAULT_REGION") or "us-east-1" os.environ["CREDENTIALS_DIR"] = os.environ.get('CREDENTIALS_DIR') or "/files/airflow-breeze-config/keys" +from airflow import settings # noqa: E402 +from airflow.models.tasklog import LogTemplate # noqa: E402 + from tests.test_utils.perf.perf_kit.sqlalchemy import ( # noqa isort:skip count_queries, trace_queries, @@ -769,3 +772,21 @@ def _get(dag_id): return dag return _get + + +@pytest.fixture() +def create_log_template(request): + session = settings.Session() + + def _create_log_template(filename_template, elasticsearch_id=""): + log_template = LogTemplate(filename=filename_template, elasticsearch_id=elasticsearch_id) + session.add(log_template) + session.commit() + + def _delete_log_template(): + session.delete(log_template) + session.commit() + + request.addfinalizer(_delete_log_template) + + return _create_log_template diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 03eab3dbb78a6..e26a78fe77e83 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -58,9 +58,11 @@ class TestElasticsearchTaskHandler: EXECUTION_DATE = datetime(2016, 1, 1) LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1' JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1' + FILENAME_TEMPLATE = '{try_number}.log' @pytest.fixture() - def ti(self, create_task_instance): + def ti(self, create_task_instance, create_log_template): + create_log_template(self.FILENAME_TEMPLATE, '{dag_id}-{task_id}-{execution_date}-{try_number}') yield get_ti( dag_id=self.DAG_ID, task_id=self.TASK_ID, @@ -73,8 +75,6 @@ def ti(self, create_task_instance): @elasticmock def setup(self): self.local_log_location = 'local/log/location' - self.filename_template = '{try_number}.log' - self.log_id_template = '{dag_id}-{task_id}-{execution_date}-{try_number}' self.end_of_log_mark = 'end_of_log\n' self.write_stdout = False self.json_format = False @@ -82,15 +82,13 @@ def setup(self): self.host_field = 'host' self.offset_field = 'offset' self.es_task_handler = ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - self.json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, ) self.es = elasticsearch.Elasticsearch(hosts=[{'host': 'localhost', 'port': 9200}]) @@ -115,15 +113,13 @@ def test_client_with_config(self): assert es_conf == expected_dict # ensure creating with configs does not fail ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - self.json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=self.json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, es_kwargs=es_conf, ) @@ -395,7 +391,7 @@ def test_close(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: # end_of_log_mark may contain characters like '\n' which is needed to # have the log uploaded but will not be stored in elasticsearch. @@ -409,7 +405,7 @@ def test_close_no_mark_end(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark not in log_file.read() assert self.es_task_handler.closed @@ -419,7 +415,7 @@ def test_close_closed(self, ti): self.es_task_handler.set_context(ti) self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert 0 == len(log_file.read()) @@ -428,7 +424,7 @@ def test_close_with_no_handler(self, ti): self.es_task_handler.handler = None self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert 0 == len(log_file.read()) assert self.es_task_handler.closed @@ -438,7 +434,7 @@ def test_close_with_no_stream(self, ti): self.es_task_handler.handler.stream = None self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark in log_file.read() assert self.es_task_handler.closed @@ -447,7 +443,7 @@ def test_close_with_no_stream(self, ti): self.es_task_handler.handler.stream.close() self.es_task_handler.close() with open( - os.path.join(self.local_log_location, self.filename_template.format(try_number=1)) + os.path.join(self.local_log_location, self.FILENAME_TEMPLATE.format(try_number=1)) ) as log_file: assert self.end_of_log_mark in log_file.read() assert self.es_task_handler.closed @@ -478,15 +474,13 @@ def test_clean_date(self): ) def test_get_external_log_url(self, ti, json_format, es_frontend, expected_url): es_task_handler = ElasticsearchTaskHandler( - self.local_log_location, - self.filename_template, - self.log_id_template, - self.end_of_log_mark, - self.write_stdout, - json_format, - self.json_fields, - self.host_field, - self.offset_field, + base_log_folder=self.local_log_location, + end_of_log_mark=self.end_of_log_mark, + write_stdout=self.write_stdout, + json_format=json_format, + json_fields=self.json_fields, + host_field=self.host_field, + offset_field=self.offset_field, frontend=es_frontend, ) url = es_task_handler.get_external_log_url(ti, ti.try_number) @@ -508,8 +502,6 @@ def test_dynamic_offset(self, stdout_mock, ti): # arrange handler = ElasticsearchTaskHandler( base_log_folder=self.local_log_location, - filename_template=self.filename_template, - log_id_template=self.log_id_template, end_of_log_mark=self.end_of_log_mark, write_stdout=True, json_format=True, diff --git a/tests/utils/test_log_handlers.py b/tests/utils/test_log_handlers.py index 0beb42d5e1b2e..28b9c7cf1aa22 100644 --- a/tests/utils/test_log_handlers.py +++ b/tests/utils/test_log_handlers.py @@ -21,12 +21,8 @@ import os import re -import pytest - -from airflow import settings from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG from airflow.models import DAG, DagRun, TaskInstance -from airflow.models.tasklog import LogTemplate from airflow.operators.python import PythonOperator from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.logging_mixin import set_context @@ -220,24 +216,6 @@ def task_callable(ti): os.remove(log_filename) -@pytest.fixture() -def create_log_template(request): - session = settings.Session() - - def _create_log_template(filename_template): - log_template = LogTemplate(filename=filename_template, elasticsearch_id="") - session.add(log_template) - session.commit() - - def _delete_log_template(): - session.delete(log_template) - session.commit() - - request.addfinalizer(_delete_log_template) - - return _create_log_template - - class TestFilenameRendering: def test_python_formatting(self, create_log_template, create_task_instance): create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log") diff --git a/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log new file mode 100644 index 0000000000000..bc10ef7880290 --- /dev/null +++ b/tests/www/test_logs/dag_for_testing_log_view/scheduled__2017-09-01T00:00:00+00:00/task_for_testing_log_view/1.log @@ -0,0 +1 @@ +Log for testing. diff --git a/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log new file mode 100644 index 0000000000000..bc10ef7880290 --- /dev/null +++ b/tests/www/test_logs/dag_id=dag_for_testing_log_view/run_id=scheduled__2017-09-01T00:00:00+00:00/task_id=task_for_testing_log_view/attempt=1.log @@ -0,0 +1 @@ +Log for testing. diff --git a/tests/www/views/test_views_log.py b/tests/www/views/test_views_log.py index f697cd3772c28..82b30f9d218e5 100644 --- a/tests/www/views/test_views_log.py +++ b/tests/www/views/test_views_log.py @@ -85,9 +85,6 @@ def factory(): logging_config['handlers']['task']['base_log_folder'] = str( pathlib.Path(__file__, "..", "..", "test_logs").resolve(), ) - logging_config['handlers']['task'][ - 'filename_template' - ] = '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts | replace(":", ".") }}/{{ try_number }}.log' with tempfile.TemporaryDirectory() as settings_dir: local_settings = pathlib.Path(settings_dir, "airflow_local_settings.py") From 9efc43053085df2a46e7f91603cc1acfdd645550 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 3 Jun 2022 11:52:57 -0600 Subject: [PATCH 04/14] Simplify --- airflow/models/dagrun.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index a6698f8ce38ea..824473b020765 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1069,7 +1069,7 @@ def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: if self.log_template_id is None: # DagRun created before LogTemplate introduction. template = session.query(LogTemplate).order_by(LogTemplate.id).first() else: - template = session.query(LogTemplate).filter_by(id=self.log_template_id).one_or_none() + template = session.query(LogTemplate).get(self.log_template_id) if template is None: raise AirflowException( f"No log_template entry found for ID {self.log_template_id!r}. " From 3f65634f134166107f8a2e99d3c3b2c78e0656c2 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Sat, 4 Jun 2022 00:29:33 -0600 Subject: [PATCH 05/14] Fix more tests - cloudwatch should probably be refactored cloudwatch should probably be refactored as s3 was --- .../cloud/log/test_oss_task_handler.py | 5 +-- .../aws/log/test_cloudwatch_task_handler.py | 19 ++++---- .../amazon/aws/log/test_s3_task_handler.py | 43 ++++++++++++------- 3 files changed, 40 insertions(+), 27 deletions(-) diff --git a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py index 24eb73b92e92f..30e8cc32b9b23 100644 --- a/tests/providers/alibaba/cloud/log/test_oss_task_handler.py +++ b/tests/providers/alibaba/cloud/log/test_oss_task_handler.py @@ -35,10 +35,7 @@ class TestOSSTaskHandler(unittest.TestCase): def setUp(self): self.base_log_folder = 'local/airflow/logs/1.log' self.oss_log_folder = f'oss://{MOCK_BUCKET_NAME}/airflow/logs' - self.filename_template = '{try_number}.log' - self.oss_task_handler = OSSTaskHandler( - self.base_log_folder, self.oss_log_folder, self.filename_template - ) + self.oss_task_handler = OSSTaskHandler(self.base_log_folder, self.oss_log_folder) @mock.patch(OSS_TASK_HANDLER_STRING.format('conf.get')) @mock.patch(OSS_TASK_HANDLER_STRING.format('OSSHook')) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index dbd2ae28d5ad7..eb97d257737f9 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -27,6 +27,7 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.logs import AwsLogsHook from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudwatchTaskHandler +from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars @@ -52,11 +53,9 @@ def setUp(self): self.remote_log_group = 'log_group_name' self.region_name = 'us-west-2' self.local_log_location = 'local/log/location' - self.filename_template = '{dag_id}/{task_id}/{execution_date}/{try_number}.log' self.cloudwatch_task_handler = CloudwatchTaskHandler( self.local_log_location, f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}", - self.filename_template, ) self.cloudwatch_task_handler.hook @@ -65,21 +64,26 @@ def setUp(self): task_id = 'task_for_testing_cloudwatch_log_handler' self.dag = DAG(dag_id=dag_id, start_date=date) task = EmptyOperator(task_id=task_id, dag=self.dag) - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test") - self.ti = TaskInstance(task=task) + dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + self.ti = TaskInstance(task=task, run_id=dag_run.run_id) self.ti.dag_run = dag_run self.ti.try_number = 1 self.ti.state = State.RUNNING - self.remote_log_stream = f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log'.replace( - ':', '_' - ) + with create_session() as session: + session.merge(dag_run) + + self.remote_log_stream = ( + f'dag_id={dag_id}/run_id={dag_run.run_id}/task_id={task_id}/attempt={self.ti.try_number}.log' + ).replace(':', '_') moto.moto_api._internal.models.moto_api_backend.reset() self.conn = boto3.client('logs', region_name=self.region_name) def tearDown(self): self.cloudwatch_task_handler.handler = None + with create_session() as session: + session.query(DagRun).delete() def test_hook(self): assert isinstance(self.cloudwatch_task_handler.hook, AwsLogsHook) @@ -89,7 +93,6 @@ def test_hook_raises(self): handler = CloudwatchTaskHandler( self.local_log_location, f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}", - self.filename_template, ) with mock.patch.object(handler.log, 'error') as mock_error: diff --git a/tests/providers/amazon/aws/log/test_s3_task_handler.py b/tests/providers/amazon/aws/log/test_s3_task_handler.py index b647f95ba4360..958ded562edfc 100644 --- a/tests/providers/amazon/aws/log/test_s3_task_handler.py +++ b/tests/providers/amazon/aws/log/test_s3_task_handler.py @@ -18,7 +18,6 @@ import contextlib import os -import unittest from unittest import mock from unittest.mock import ANY @@ -29,6 +28,7 @@ from airflow.operators.empty import EmptyOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler +from airflow.utils.session import create_session from airflow.utils.state import State from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars @@ -41,32 +41,39 @@ mock_s3 = None -@unittest.skipIf(mock_s3 is None, "Skipping test because moto.mock_s3 is not available") -@mock_s3 -class TestS3TaskHandler(unittest.TestCase): +@pytest.fixture(autouse=True, scope="module") +def s3mock(): + with mock_s3(): + yield + + +@pytest.mark.skipif(mock_s3 is None, reason="Skipping test because moto.mock_s3 is not available") +class TestS3TaskHandler: @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) - def setUp(self): - super().setUp() + @pytest.fixture(autouse=True) + def setup(self, create_log_template): self.remote_log_base = 's3://bucket/remote/log/location' self.remote_log_location = 's3://bucket/remote/log/location/1.log' self.remote_log_key = 'remote/log/location/1.log' self.local_log_location = 'local/log/location' - self.filename_template = '{try_number}.log' - self.s3_task_handler = S3TaskHandler( - self.local_log_location, self.remote_log_base, self.filename_template - ) + create_log_template('{try_number}.log') + self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base) # Vivfy the hook now with the config override assert self.s3_task_handler.hook is not None date = datetime(2016, 1, 1) self.dag = DAG('dag_for_testing_s3_task_handler', start_date=date) task = EmptyOperator(task_id='task_for_testing_s3_log_handler', dag=self.dag) - dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test") - self.ti = TaskInstance(task=task) + dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="manual") + with create_session() as session: + session.add(dag_run) + session.commit() + session.refresh(dag_run) + + self.ti = TaskInstance(task=task, run_id=dag_run.run_id) self.ti.dag_run = dag_run self.ti.try_number = 1 self.ti.state = State.RUNNING - self.addCleanup(self.dag.clear) self.conn = boto3.client('s3') # We need to create the bucket since this is all in Moto's 'virtual' @@ -74,7 +81,13 @@ def setUp(self): moto.moto_api._internal.models.moto_api_backend.reset() self.conn.create_bucket(Bucket="bucket") - def tearDown(self): + yield + + self.dag.clear() + + with create_session() as session: + session.query(DagRun).delete() + if self.s3_task_handler.handler: with contextlib.suppress(Exception): os.remove(self.s3_task_handler.handler.baseFilename) @@ -85,7 +98,7 @@ def test_hook(self): @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) def test_hook_raises(self): - handler = S3TaskHandler(self.local_log_location, self.remote_log_base, self.filename_template) + handler = S3TaskHandler(self.local_log_location, self.remote_log_base) with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.amazon.aws.hooks.s3.S3Hook") as mock_hook: mock_hook.side_effect = Exception('Failed to connect') From a8aac3ad6c2a68e96c2aa69f0234dc3a85f80ac5 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Sat, 4 Jun 2022 00:40:58 -0600 Subject: [PATCH 06/14] Refactor cloudwatch --- .../aws/log/test_cloudwatch_task_handler.py | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index eb97d257737f9..b3175bcdb1166 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -16,11 +16,11 @@ # specific language governing permissions and limitations # under the License. import time -import unittest from datetime import datetime as dt from unittest import mock from unittest.mock import ANY, call +import pytest from watchtower import CloudWatchLogHandler from airflow.models import DAG, DagRun, TaskInstance @@ -45,14 +45,21 @@ def get_time_str(time_in_milliseconds): return dt_time.strftime("%Y-%m-%d %H:%M:%S,000") -@unittest.skipIf(mock_logs is None, "Skipping test because moto.mock_logs is not available") -@mock_logs -class TestCloudwatchTaskHandler(unittest.TestCase): +@pytest.fixture(autouse=True, scope="module") +def logmock(): + with mock_logs(): + yield + + +@pytest.mark.skipif(mock_logs is None, reason="Skipping test because moto.mock_logs is not available") +class TestCloudwatchTaskHandler: @conf_vars({('logging', 'remote_log_conn_id'): 'aws_default'}) - def setUp(self): + @pytest.fixture(autouse=True) + def setup(self, create_log_template): self.remote_log_group = 'log_group_name' self.region_name = 'us-west-2' self.local_log_location = 'local/log/location' + create_log_template('{dag_id}/{task_id}/{execution_date}/{try_number}.log') self.cloudwatch_task_handler = CloudwatchTaskHandler( self.local_log_location, f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}", @@ -65,22 +72,28 @@ def setUp(self): self.dag = DAG(dag_id=dag_id, start_date=date) task = EmptyOperator(task_id=task_id, dag=self.dag) dag_run = DagRun(dag_id=self.dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") + with create_session() as session: + session.add(dag_run) + session.commit() + session.refresh(dag_run) + self.ti = TaskInstance(task=task, run_id=dag_run.run_id) self.ti.dag_run = dag_run self.ti.try_number = 1 self.ti.state = State.RUNNING - with create_session() as session: - session.merge(dag_run) - - self.remote_log_stream = ( - f'dag_id={dag_id}/run_id={dag_run.run_id}/task_id={task_id}/attempt={self.ti.try_number}.log' - ).replace(':', '_') + self.remote_log_stream = (f'{dag_id}/{task_id}/{date.isoformat()}/{self.ti.try_number}.log').replace( + ':', '_' + ) moto.moto_api._internal.models.moto_api_backend.reset() self.conn = boto3.client('logs', region_name=self.region_name) - def tearDown(self): + yield + + with create_session() as session: + session.query(DagRun).delete() + self.cloudwatch_task_handler.handler = None with create_session() as session: session.query(DagRun).delete() From cb9947960aa6849cfe6fe8ab827269b6e6945391 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sun, 5 Jun 2022 10:58:04 +0800 Subject: [PATCH 07/14] Fix GCS task handler tests --- tests/providers/google/cloud/log/test_gcs_task_handler.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/providers/google/cloud/log/test_gcs_task_handler.py b/tests/providers/google/cloud/log/test_gcs_task_handler.py index 6517be8f31245..b443a9f8ec8ef 100644 --- a/tests/providers/google/cloud/log/test_gcs_task_handler.py +++ b/tests/providers/google/cloud/log/test_gcs_task_handler.py @@ -49,13 +49,11 @@ def local_log_location(self): yield td @pytest.fixture(autouse=True) - def gcs_task_handler(self, local_log_location): - self.remote_log_base = "gs://bucket/remote/log/location" - self.filename_template = "{try_number}.log" + def gcs_task_handler(self, create_log_template, local_log_location): + create_log_template("{try_number}.log") self.gcs_task_handler = GCSTaskHandler( base_log_folder=local_log_location, - gcs_log_folder=self.remote_log_base, - filename_template=self.filename_template, + gcs_log_folder="gs://bucket/remote/log/location", ) yield self.gcs_task_handler From e72c137b877682be9350659dc177f697acc3ca74 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sun, 5 Jun 2022 11:01:49 +0800 Subject: [PATCH 08/14] Fix and refactor WASB tests --- .../azure/log/test_wasb_task_handler.py | 41 ++++++++----------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py index 4fe967671729d..3c92aa78aaa28 100644 --- a/tests/providers/microsoft/azure/log/test_wasb_task_handler.py +++ b/tests/providers/microsoft/azure/log/test_wasb_task_handler.py @@ -22,23 +22,25 @@ from airflow.providers.microsoft.azure.hooks.wasb import WasbHook from airflow.providers.microsoft.azure.log.wasb_task_handler import WasbTaskHandler -from airflow.utils.state import State +from airflow.utils.state import TaskInstanceState from airflow.utils.timezone import datetime from tests.test_utils.config import conf_vars from tests.test_utils.db import clear_db_dags, clear_db_runs +DEFAULT_DATE = datetime(2020, 8, 10) + class TestWasbTaskHandler: @pytest.fixture(autouse=True) - def ti(self, create_task_instance): - date = datetime(2020, 8, 10) + def ti(self, create_task_instance, create_log_template): + create_log_template("{try_number}.log") ti = create_task_instance( dag_id='dag_for_testing_wasb_task_handler', task_id='task_for_testing_wasb_log_handler', - execution_date=date, - start_date=date, - dagrun_state=State.RUNNING, - state=State.RUNNING, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE, + dagrun_state=TaskInstanceState.RUNNING, + state=TaskInstanceState.RUNNING, ) ti.try_number = 1 ti.hostname = 'localhost' @@ -52,12 +54,10 @@ def setup_method(self): self.remote_log_location = 'remote/log/location/1.log' self.local_log_location = 'local/log/location' self.container_name = "wasb-container" - self.filename_template = '{try_number}.log' self.wasb_task_handler = WasbTaskHandler( base_log_folder=self.local_log_location, wasb_log_folder=self.wasb_log_folder, wasb_container=self.container_name, - filename_template=self.filename_template, delete_local_copy=True, ) @@ -68,9 +68,7 @@ def test_hook(self, mock_service): @conf_vars({('logging', 'remote_log_conn_id'): 'wasb_default'}) def test_hook_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: mock_hook.side_effect = AzureHttpError("failed to connect", 404) @@ -120,15 +118,14 @@ def test_wasb_read(self, mock_hook, ti): [{'end_of_log': True}], ) - def test_wasb_read_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + @mock.patch( + "airflow.providers.microsoft.azure.hooks.wasb.WasbHook", + **{"return_value.read_file.side_effect": AzureHttpError("failed to connect", 404)}, + ) + def test_wasb_read_raises(self, mock_hook): + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: - with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: - mock_hook.return_value.read_file.side_effect = AzureHttpError("failed to connect", 404) - - handler.wasb_read(self.remote_log_location, return_error=True) + handler.wasb_read(self.remote_log_location, return_error=True) mock_error.assert_called_once_with( 'Could not read logs from remote/log/location/1.log', exc_info=True, @@ -164,9 +161,7 @@ def test_write_when_append_is_false(self, mock_hook): ) def test_write_raises(self): - handler = WasbTaskHandler( - self.local_log_location, self.wasb_log_folder, self.container_name, self.filename_template, True - ) + handler = self.wasb_task_handler with mock.patch.object(handler.log, 'error') as mock_error: with mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbHook") as mock_hook: mock_hook.return_value.load_string.side_effect = AzureHttpError("failed to connect", 404) From fba540b3f7ce018e194f414050c87997a2542511 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sun, 5 Jun 2022 11:14:01 +0800 Subject: [PATCH 09/14] Add ID template compat on Elastic log handler --- .../elasticsearch/log/es_task_handler.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 2d65592c85f09..7a22a0af0d6c0 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -32,7 +32,8 @@ from elasticsearch_dsl import Search from airflow.configuration import conf -from airflow.models import TaskInstance +from airflow.models.dagrun import DagRun +from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter @@ -42,6 +43,11 @@ # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] +# Compatibility: Airflow 2.3.2 uses this method, which accesses the LogTemplate +# model to record the log ID template used. If this function does not exist, the +# task handler should use the log_id_template attribute instead. +USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") + class ElasticsearchTaskHandler(FileTaskHandler, ExternalLoggingMixin, LoggingMixin): """ @@ -91,13 +97,13 @@ def __init__( self.client = elasticsearch.Elasticsearch([host], **es_kwargs) # type: ignore[attr-defined] - if log_id_template is not None: + if USE_PER_RUN_LOG_ID and log_id_template is not None: warnings.warn( "Passing log_id_template to the log handler is deprecated and has not effect", DeprecationWarning, ) - self.log_id_template = log_id_template + self.log_id_template = log_id_template # Only used on Airflow < 2.3.2. self.frontend = frontend self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark @@ -114,7 +120,10 @@ def __init__( def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: with create_session() as session: dag_run = ti.get_dagrun(session=session) - log_id_template = dag_run.get_log_template(session=session).elasticsearch_id + if USE_PER_RUN_LOG_ID: + log_id_template = dag_run.get_log_template(session=session).elasticsearch_id + else: + log_id_template = self.log_id_template dag = ti.task.dag assert dag is not None # For Mypy. From d1dc70d56f51fcbd096eb5aff19301e10dca558d Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sun, 5 Jun 2022 11:15:48 +0800 Subject: [PATCH 10/14] Typo --- airflow/providers/elasticsearch/log/es_task_handler.py | 2 +- airflow/utils/log/file_task_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 7a22a0af0d6c0..f8311f7bde43b 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -99,7 +99,7 @@ def __init__( if USE_PER_RUN_LOG_ID and log_id_template is not None: warnings.warn( - "Passing log_id_template to the log handler is deprecated and has not effect", + "Passing log_id_template to ElasticsearchTaskHandler is deprecated and has no effect", DeprecationWarning, ) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index f81dd4cc4cd63..db34ea5f6b5b6 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -52,7 +52,7 @@ def __init__(self, base_log_folder: str, filename_template: Optional[str] = None self.local_base = base_log_folder if filename_template is not None: warnings.warn( - "Passing filename_template to FileTaskHandler is deprecated and has not effect", + "Passing filename_template to FileTaskHandler is deprecated and has no effect", DeprecationWarning, ) From a70e2af1bf6c1d16c2c1bb003f295224294dbff7 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 6 Jun 2022 09:31:00 +0800 Subject: [PATCH 11/14] We don't expect the functio until 2.3.3 --- airflow/providers/elasticsearch/log/es_task_handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index f8311f7bde43b..64fce0df53c15 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -43,9 +43,9 @@ # Elasticsearch hosted log type EsLogMsgType = List[Tuple[str, str]] -# Compatibility: Airflow 2.3.2 uses this method, which accesses the LogTemplate -# model to record the log ID template used. If this function does not exist, the -# task handler should use the log_id_template attribute instead. +# Compatibility: Airflow 2.3.3 and up uses this method, which accesses the +# LogTemplate model to record the log ID template used. If this function does +# not exist, the task handler should use the log_id_template attribute instead. USE_PER_RUN_LOG_ID = hasattr(DagRun, "get_log_template") From 3b216e88caa25fea2c30f2b413b07b9504a1ad01 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 6 Jun 2022 09:33:01 +0800 Subject: [PATCH 12/14] Retain perfect compatibility where possible --- airflow/providers/alibaba/cloud/log/oss_task_handler.py | 2 +- airflow/providers/amazon/aws/log/s3_task_handler.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/alibaba/cloud/log/oss_task_handler.py b/airflow/providers/alibaba/cloud/log/oss_task_handler.py index 2162fa1a3a9ed..393847f93a5bd 100644 --- a/airflow/providers/alibaba/cloud/log/oss_task_handler.py +++ b/airflow/providers/alibaba/cloud/log/oss_task_handler.py @@ -38,7 +38,7 @@ class OSSTaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from OSS remote storage. """ - def __init__(self, base_log_folder, oss_log_folder, *, filename_template=None): + def __init__(self, base_log_folder, oss_log_folder, filename_template=None): self.log.info("Using oss_task_handler for remote logging...") super().__init__(base_log_folder, filename_template) (self.bucket_name, self.base_folder) = OSSHook.parse_oss_url(oss_log_folder) diff --git a/airflow/providers/amazon/aws/log/s3_task_handler.py b/airflow/providers/amazon/aws/log/s3_task_handler.py index 60b2e289919d4..cb8e0a2883773 100644 --- a/airflow/providers/amazon/aws/log/s3_task_handler.py +++ b/airflow/providers/amazon/aws/log/s3_task_handler.py @@ -37,7 +37,7 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): uploads to and reads from S3 remote storage. """ - def __init__(self, base_log_folder: str, s3_log_folder: str, *, filename_template: Optional[str] = None): + def __init__(self, base_log_folder: str, s3_log_folder: str, filename_template: Optional[str] = None): super().__init__(base_log_folder, filename_template) self.remote_base = s3_log_folder self.log_relative_path = '' From 502779e794c2b92a60c46f4dd4053b31eaa936d5 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Mon, 6 Jun 2022 08:49:37 -0600 Subject: [PATCH 13/14] Remove dup cloudwatch cleanup code --- tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py index b3175bcdb1166..8b23218c8cc39 100644 --- a/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py +++ b/tests/providers/amazon/aws/log/test_cloudwatch_task_handler.py @@ -91,9 +91,6 @@ def setup(self, create_log_template): yield - with create_session() as session: - session.query(DagRun).delete() - self.cloudwatch_task_handler.handler = None with create_session() as session: session.query(DagRun).delete() From ae19ddee140da36c9abe039fa4bb3ba487f9e895 Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Mon, 6 Jun 2022 11:22:11 -0600 Subject: [PATCH 14/14] Add back get_log_filename_template for backcompat --- airflow/models/dagrun.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 824473b020765..c66e24c536d9b 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -1076,3 +1076,12 @@ def get_log_template(self, *, session: Session = NEW_SESSION) -> LogTemplate: f"Please make sure you set up the metadatabase correctly." ) return template + + @provide_session + def get_log_filename_template(self, *, session: Session = NEW_SESSION) -> str: + warnings.warn( + "This method is deprecated. Please use get_log_template instead.", + DeprecationWarning, + stacklevel=2, + ) + return self.get_log_template(session=session).filename