From 060345c0d982765e39da5fa8b2e2c6a01e89e394 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Tue, 21 Sep 2021 21:36:04 +0800 Subject: [PATCH] Add docs for AIP 39: Timetables (#17552) --- airflow/example_dags/plugins/__init__.py | 16 + airflow/example_dags/plugins/workday.py | 90 ++++++ .../elasticsearch/log/es_task_handler.py | 26 +- airflow/sentry.py | 14 +- airflow/timetables/interval.py | 2 +- .../logging/index.rst | 4 +- .../operators/cloud/gcs.rst | 7 +- docs/apache-airflow/best-practices.rst | 160 ++++++---- docs/apache-airflow/concepts/dags.rst | 31 +- docs/apache-airflow/concepts/operators.rst | 8 +- docs/apache-airflow/concepts/overview.rst | 2 +- docs/apache-airflow/concepts/scheduler.rst | 10 +- docs/apache-airflow/concepts/tasks.rst | 6 +- docs/apache-airflow/dag-run.rst | 57 ++-- docs/apache-airflow/faq.rst | 35 +- .../howto/define_extra_link.rst | 12 +- docs/apache-airflow/howto/index.rst | 1 + .../howto/operator/external_task_sensor.rst | 3 +- docs/apache-airflow/howto/operator/python.rst | 2 +- docs/apache-airflow/howto/timetable.rst | 298 ++++++++++++++++++ docs/apache-airflow/lineage.rst | 4 +- .../logging-monitoring/errors.rst | 9 +- .../logging-monitoring/logging-tasks.rst | 2 +- docs/apache-airflow/plugins.rst | 7 +- docs/apache-airflow/python-api-ref.rst | 11 + docs/apache-airflow/timezone.rst | 12 +- docs/apache-airflow/tutorial.rst | 31 +- docs/conf.py | 11 +- docs/spelling_wordlist.txt | 4 + tests/core/test_sentry.py | 8 +- .../elasticsearch/log/test_es_task_handler.py | 6 +- 31 files changed, 719 insertions(+), 170 deletions(-) create mode 100644 airflow/example_dags/plugins/__init__.py create mode 100644 airflow/example_dags/plugins/workday.py create mode 100644 docs/apache-airflow/howto/timetable.rst diff --git a/airflow/example_dags/plugins/__init__.py b/airflow/example_dags/plugins/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/example_dags/plugins/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py new file mode 100644 index 0000000000000..3b39a6ef61cea --- /dev/null +++ b/airflow/example_dags/plugins/workday.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Plugin to demostrate timetable registration and accomdate example DAGs.""" + +# [START howto_timetable] +from datetime import timedelta +from typing import Optional + +from pendulum import Date, DateTime, Time, timezone + +from airflow.plugins_manager import AirflowPlugin +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable + +UTC = timezone("UTC") + + +class AfterWorkdayTimetable(Timetable): + + # [START howto_timetable_infer_data_interval] + def infer_data_interval(self, run_after: DateTime) -> DataInterval: + weekday = run_after.weekday() + if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. + days_since_friday = (run_after.weekday() - 4) % 7 + delta = timedelta(days=days_since_friday) + else: # Otherwise the interval is yesterday. + delta = timedelta(days=1) + start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) + return DataInterval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_infer_data_interval] + + # [START howto_timetable_next_dagrun_info] + def next_dagrun_info( + self, + *, + last_automated_data_interval: Optional[DataInterval], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_data_interval is not None: # There was a previous run on the regular schedule. + last_start = last_automated_data_interval.start + last_start_weekday = 7 - last_start.weekday() + if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow. + delta = timedelta(days=1) + else: # Last run on Friday -- skip to next Monday. + delta = timedelta(days=(7 - last_start_weekday)) + next_start = DateTime.combine((last_start + delta).date(), Time.min) + else: # This is the first ever run on the regular schedule. + next_start = restriction.earliest + if next_start is None: # No start_date. Don't schedule. + return None + if not restriction.catchup: + # If the DAG has catchup=False, today is the earliest to consider. + next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) + elif next_start.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + next_day = next_start.date() + timedelta(days=1) + next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC) + next_start_weekday = next_start.weekday() + if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday. + delta = timedelta(days=(7 - next_start_weekday)) + next_start = next_start + delta + if restriction.latest is not None and next_start > restriction.latest: + return None # Over the DAG's scheduled end; don't schedule. + return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1))) + + # [END howto_timetable_next_dagrun_info] + + +class WorkdayTimetablePlugin(AirflowPlugin): + name = "workday_timetable_plugin" + timetables = [AfterWorkdayTimetable] + + +# [END howto_timetable] diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 1a562805213fc..cd0897153dfdc 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -101,25 +101,37 @@ def __init__( self.context_set = False def _render_log_id(self, ti: TaskInstance, try_number: int) -> str: + dag_run = ti.dag_run + if self.json_format: - execution_date = self._clean_execution_date(ti.execution_date) + data_interval_start = self._clean_date(dag_run.data_interval_start) + data_interval_end = self._clean_date(dag_run.data_interval_end) + execution_date = self._clean_date(dag_run.execution_date) else: - execution_date = ti.execution_date.isoformat() + data_interval_start = dag_run.data_interval_start.isoformat() + data_interval_end = dag_run.data_interval_end.isoformat() + execution_date = dag_run.execution_date.isoformat() return self.log_id_template.format( - dag_id=ti.dag_id, task_id=ti.task_id, execution_date=execution_date, try_number=try_number + dag_id=ti.dag_id, + task_id=ti.task_id, + run_id=ti.run_id, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + execution_date=execution_date, + try_number=try_number, ) @staticmethod - def _clean_execution_date(execution_date: datetime) -> str: + def _clean_date(value: datetime) -> str: """ - Clean up an execution date so that it is safe to query in elasticsearch + Clean up a date value so that it is safe to query in elasticsearch by removing reserved characters. # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters :param execution_date: execution date of the dag run. """ - return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f") + return value.strftime("%Y_%m_%dT%H_%M_%S_%f") def _group_logs_by_host(self, logs): grouped_logs = defaultdict(list) @@ -270,7 +282,7 @@ def set_context(self, ti: TaskInstance) -> None: extras={ 'dag_id': str(ti.dag_id), 'task_id': str(ti.task_id), - 'execution_date': self._clean_execution_date(ti.execution_date), + 'execution_date': self._clean_date(ti.execution_date), 'try_number': str(ti.try_number), 'log_id': self._render_log_id(ti, ti.try_number), }, diff --git a/airflow/sentry.py b/airflow/sentry.py index a6b2adf891fd5..19d83f636c6b4 100644 --- a/airflow/sentry.py +++ b/airflow/sentry.py @@ -59,7 +59,10 @@ def flush(self): class ConfiguredSentry(DummySentry): """Configure Sentry SDK.""" - SCOPE_TAGS = frozenset(("task_id", "dag_id", "execution_date", "operator", "try_number")) + SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date")) + SCOPE_TASK_TAGS = frozenset(("operator",)) + SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number")) + SCOPE_TAGS = SCOPE_DAG_RUN_TAGS | SCOPE_TASK_TAGS | SCOPE_TASK_INSTANCE_TAGS SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration")) UNSUPPORTED_SENTRY_OPTIONS = frozenset( @@ -116,14 +119,17 @@ def __init__(self): def add_tagging(self, task_instance): """Function to add tagging for a task_instance.""" + dag_run = task_instance.dag_run task = task_instance.task with sentry_sdk.configure_scope() as scope: - for tag_name in self.SCOPE_TAGS: + for tag_name in self.SCOPE_TASK_INSTANCE_TAGS: attribute = getattr(task_instance, tag_name) - if tag_name == "operator": - attribute = task.__class__.__name__ scope.set_tag(tag_name, attribute) + for tag_name in self.SCOPE_DAG_RUN_TAGS: + attribute = getattr(dag_run, tag_name) + scope.set_tag(tag_name, attribute) + scope.set_tag("operator", task.__class__.__name__) @provide_session def add_breadcrumbs(self, task_instance, session=None): diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index 8f6132ae170bd..8f5c3f1032a29 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -114,7 +114,7 @@ class CronDataIntervalTimetable(_DataIntervalTimetable): a five/six-segment representation, or one of ``cron_presets``. The implementation extends on croniter to add timezone awareness. This is - because crontier works only with naive timestamps, and cannot consider DST + because croniter works only with naive timestamps, and cannot consider DST when determining the next/previous time. Don't pass ``@once`` in here; use ``OnceTimetable`` instead. diff --git a/docs/apache-airflow-providers-elasticsearch/logging/index.rst b/docs/apache-airflow-providers-elasticsearch/logging/index.rst index e558db52fd83b..5e16ccbdc0e78 100644 --- a/docs/apache-airflow-providers-elasticsearch/logging/index.rst +++ b/docs/apache-airflow-providers-elasticsearch/logging/index.rst @@ -38,7 +38,7 @@ First, to use the handler, ``airflow.cfg`` must be configured as follows: [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} + log_id_template = {dag_id}-{task_id}-{run_id}-{try_number} end_of_log_mark = end_of_log write_stdout = json_fields = @@ -56,7 +56,7 @@ To output task logs to stdout in JSON format, the following config could be used [elasticsearch] host = : - log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number} + log_id_template = {dag_id}-{task_id}-{run_id}-{try_number} end_of_log_mark = end_of_log write_stdout = True json_format = True diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst index 5fa330c7a7d6c..0a5e2435407ea 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst @@ -58,11 +58,10 @@ GCSTimeSpanFileTransformOperator Use the :class:`~airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator` -to transform files that were modified in a specific time span. The time span is defined -by the DAG instance logical execution timestamp (``execution_date``, start of time span) -and the timestamp when the next DAG instance execution is scheduled (end of time span). If a DAG +to transform files that were modified in a specific time span (the data interval). +The time span is defined by the time span's start and end timestamps. If a DAG does not have a *next* DAG instance scheduled, the time span end infinite, meaning the operator -processes all files older than ``execution_date``. +processes all files older than ``data_interval_start``. .. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs_timespan_file_transform.py :language: python diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 013699b886674..dba73ec859400 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -43,21 +43,26 @@ Please follow our guide on :ref:`custom Operators `. Creating a task --------------- -You should treat tasks in Airflow equivalent to transactions in a database. This implies that you should never produce -incomplete results from your tasks. An example is not to produce incomplete data in ``HDFS`` or ``S3`` at the end of a task. - -Airflow can retry a task if it fails. Thus, the tasks should produce the same outcome on every re-run. -Some of the ways you can avoid producing a different result - - -* Do not use INSERT during a task re-run, an INSERT statement might lead to duplicate rows in your database. - Replace it with UPSERT. -* Read and write in a specific partition. Never read the latest available data in a task. - Someone may update the input data between re-runs, which results in different outputs. - A better way is to read the input data from a specific partition. You can use ``execution_date`` as a partition. - You should follow this partitioning method while writing data in S3/HDFS, as well. -* The Python datetime ``now()`` function gives the current datetime object. - This function should never be used inside a task, especially to do the critical computation, as it leads to different outcomes on each run. - It's fine to use it, for example, to generate a temporary log. +You should treat tasks in Airflow equivalent to transactions in a database. This +implies that you should never produce incomplete results from your tasks. An +example is not to produce incomplete data in ``HDFS`` or ``S3`` at the end of a +task. + +Airflow can retry a task if it fails. Thus, the tasks should produce the same +outcome on every re-run. Some of the ways you can avoid producing a different +result - + +* Do not use INSERT during a task re-run, an INSERT statement might lead to + duplicate rows in your database. Replace it with UPSERT. +* Read and write in a specific partition. Never read the latest available data + in a task. Someone may update the input data between re-runs, which results in + different outputs. A better way is to read the input data from a specific + partition. You can use ``data_interval_start`` as a partition. You should + follow this partitioning method while writing data in S3/HDFS as well. +* The Python datetime ``now()`` function gives the current datetime object. This + function should never be used inside a task, especially to do the critical + computation, as it leads to different outcomes on each run. It's fine to use + it, for example, to generate a temporary log. .. tip:: @@ -270,77 +275,94 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write uni .. code-block:: python - from airflow.models import DagBag - import unittest + import pytest + from airflow.models import DagBag - class TestHelloWorldDAG(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.dagbag = DagBag() - def test_dag_loaded(self): - dag = self.dagbag.get_dag(dag_id="hello_world") - assert self.dagbag.import_errors == {} - assert dag is not None - assert len(dag.tasks) == 1 + @pytest.fixture() + def dagbag(self): + return DagBag() + + + def test_dag_loaded(self, dagbag): + dag = dagbag.get_dag(dag_id="hello_world") + assert dagbag.import_errors == {} + assert dag is not None + assert len(dag.tasks) == 1 + **Unit test a DAG structure:** This is an example test want to verify the structure of a code-generated DAG against a dict object .. code-block:: python - import unittest + def assert_dag_dict_equal(source, dag): + assert dag.task_dict.keys() == source.keys() + for task_id, downstream_list in source.items(): + assert dag.has_task(task_id) + task = dag.get_task(task_id) + assert task.downstream_task_ids == set(downstream_list) - class testClass(unittest.TestCase): - def assertDagDictEqual(self, source, dag): - assert dag.task_dict.keys() == source.keys() - for task_id, downstream_list in source.items(): - assert dag.has_task(task_id) - task = dag.get_task(task_id) - assert task.downstream_task_ids == set(downstream_list) + def test_dag(): + assert_dag_dict_equal( + { + "DummyInstruction_0": ["DummyInstruction_1"], + "DummyInstruction_1": ["DummyInstruction_2"], + "DummyInstruction_2": ["DummyInstruction_3"], + "DummyInstruction_3": [], + }, + dag, + ) - def test_dag(self): - self.assertDagDictEqual( - { - "DummyInstruction_0": ["DummyInstruction_1"], - "DummyInstruction_1": ["DummyInstruction_2"], - "DummyInstruction_2": ["DummyInstruction_3"], - "DummyInstruction_3": [], - }, - dag, - ) **Unit test for custom operator:** .. code-block:: python - import unittest - from airflow.utils.state import State - - DEFAULT_DATE = "2019-10-03" - TEST_DAG_ID = "test_my_custom_operator" - - - class MyCustomOperatorTest(unittest.TestCase): - def setUp(self): - self.dag = DAG( - TEST_DAG_ID, - schedule_interval="@daily", - default_args={"start_date": DEFAULT_DATE}, - ) - self.op = MyCustomOperator( - dag=self.dag, - task_id="test", - prefix="s3://bucket/some/prefix", - ) - self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE) - - def test_execute_no_trigger(self): - self.ti.run(ignore_ti_state=True) - assert self.ti.state == State.SUCCESS - # Assert something related to tasks results + import datetime + + import pytest + + from airflow.utils.state import DagRunState + from airflow.utils.types import DagRunType + + DATA_INTERVAL_START = datetime.datetime(2021, 9, 13) + DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) + + TEST_DAG_ID = "my_custom_operator_dag" + TEST_TASK_ID = "my_custom_operator_task" + + + @pytest.fixture() + def dag(): + with DAG( + dag_id=TEST_DAG_ID, + schedule_interval="@daily", + default_args={"start_date": DATA_INTERVAL_START}, + ) as dag: + MyCustomOperator( + task_id=TEST_TASK_ID, + prefix="s3://bucket/some/prefix", + ) + return dag + + + def test_my_custom_operator_execute_no_trigger(dag): + dagrun = dag.create_dagrun( + state=DagRunState.RUNNING, + execution_date=DATA_INTERVAL_START, + data_interval=(DATA_INTERVAL_START, DATA_INTERVAL_END), + start_date=DATA_INTERVAL_END, + run_type=DagRunType.MANUAL, + ) + ti = dagrun.get_task_instance(task_id=TEST_TASK_ID) + ti.task = dag.get_task(task_id=TEST_TASK_ID) + ti.run(ignore_ti_state=True) + assert ti.state == State.SUCCESS + # Assert something related to tasks results. + Self-Checks ------------ diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index 89a0995beca77..b8718956aba5a 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -150,13 +150,30 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab `. DAG Runs can run in parallel for the same DAG, and each has a defined ``execution_date``, which identifies the *logical* date and time it is running for - not the *actual* time when it was started. +.. tip:: -As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It's been rewritten, and you want to run it on the previous 3 months of data - no problem, since Airflow can *backfill* the DAG and run copies of it for every day in those previous 3 months, all at once. + For more information on ``schedule_interval`` values, see :doc:`DAG Run `. -Those DAG Runs will all have been started on the same actual day, but their ``execution_date`` values will cover those last 3 months, and that's what all the tasks, operators and sensors inside the DAG look at when they run. + If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables `. -In much the same way a DAG instantiates into a DAG Run every time it's run, Tasks specified inside a DAG also instantiate into :ref:`Task Instances ` along with it. +Every time you run a DAG, you are creating a new instance of that DAG which +Airflow calls a :doc:`DAG Run `. DAG Runs can run in parallel for the +same DAG, and each has a defined data interval, which identifies the period of +data the tasks should operate on. + +As an example of why this is useful, consider writing a DAG that processes a +daily set of experimental data. It's been rewritten, and you want to run it on +the previous 3 months of data---no problem, since Airflow can *backfill* the DAG +and run copies of it for every day in those previous 3 months, all at once. + +Those DAG Runs will all have been started on the same actual day, but each DAG +run will have one data interval covering a single day in that 3 month period, +and that data interval is all the tasks, operators and sensors inside the DAG +look at when they run. + +In much the same way a DAG instantiates into a DAG Run every time it's run, +Tasks specified inside a DAG are also instantiated into +:ref:`Task Instances ` along with it. DAG Assignment @@ -279,7 +296,7 @@ As with the callable for ``BranchPythonOperator``, this method should return the """ Run an extra branch on the first day of the month """ - if context['execution_date'].day == 1: + if context['data_interval_start'].day == 1: return ['daily_task_id', 'monthly_task_id'] else: return 'daily_task_id' @@ -319,7 +336,7 @@ Depends On Past You can also say a task can only run if the *previous* run of the task in the previous DAG Run succeeded. To use this, you just need to set the ``depends_on_past`` argument on your Task to ``True``. -Note that if you are running the DAG at the very start of its life - specifically, that the ``execution_date`` matches the ``start_date`` - then the Task will still run, as there is no previous run to depend on. +Note that if you are running the DAG at the very start of its life---specifically, its first ever *automated* run---then the Task will still run, as there is no previous run to depend on. .. _concepts:trigger-rules: @@ -612,7 +629,7 @@ in which one DAG can depend on another: - waiting - :class:`~airflow.sensors.external_task_sensor.ExternalTaskSensor` Additional difficulty is that one DAG could wait for or trigger several runs of the other DAG -with different execution dates. The **Dag Dependencies** view +with different data intervals. The **Dag Dependencies** view ``Menu -> Browse -> DAG Dependencies`` helps visualize dependencies between DAGs. The dependencies are calculated by the scheduler during DAG serialization and the webserver uses them to build the dependency graph. diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 25146356becad..695e9d39c4cd3 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -66,20 +66,20 @@ Jinja Templating ---------------- Airflow leverages the power of `Jinja Templating `_ and this can be a powerful tool to use in combination with :ref:`macros `. -For example, say you want to pass the execution date as an environment variable to a Bash script using the ``BashOperator``: +For example, say you want to pass the start of the data interval as an environment variable to a Bash script using the ``BashOperator``: .. code-block:: python - # The execution date as YYYY-MM-DD + # The start of the data interval as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( task_id="test_env", bash_command="/tmp/test.sh ", dag=dag, - env={"EXECUTION_DATE": date}, + env={"DATA_INTERVAL_START": date}, ) -Here, ``{{ ds }}`` is a templated variable, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the execution date will be available as an environment variable named ``EXECUTION_DATE`` in your Bash script. +Here, ``{{ ds }}`` is a templated variable, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the data interval's start date will be available as an environment variable named ``DATA_INTERVAL_START`` in your Bash script. You can use Jinja templating with every parameter that is marked as "templated" in the documentation. Template substitution occurs just before the ``pre_execute`` function of your operator is called. diff --git a/docs/apache-airflow/concepts/overview.rst b/docs/apache-airflow/concepts/overview.rst index 7571992b1ab58..b9d9f1dbe89d3 100644 --- a/docs/apache-airflow/concepts/overview.rst +++ b/docs/apache-airflow/concepts/overview.rst @@ -60,7 +60,7 @@ Internally, these are all actually subclasses of Airflow's ``BaseOperator``, and Control Flow ------------ -:doc:`dags` are designed to be run many times, and multiple runs of them can happen in parallel. DAGs are parameterized, always including a date they are "running for" (the ``execution_date``), but with other optional parameters as well. +:doc:`dags` are designed to be run many times, and multiple runs of them can happen in parallel. DAGs are parameterized, always including an interval they are "running for" (the :ref:`data interval `), but with other optional parameters as well. :doc:`tasks` have dependencies declared on each other. You'll see this in a DAG either using the ``>>`` and ``<<`` operators:: diff --git a/docs/apache-airflow/concepts/scheduler.rst b/docs/apache-airflow/concepts/scheduler.rst index 3a54b0269746c..2376d1e118b04 100644 --- a/docs/apache-airflow/concepts/scheduler.rst +++ b/docs/apache-airflow/concepts/scheduler.rst @@ -54,19 +54,19 @@ In the UI, it appears as if Airflow is running your tasks a day **late** .. note:: - If you run a DAG on a ``schedule_interval`` of one day, the run with ``execution_date`` ``2019-11-21`` triggers soon after ``2019-11-21T23:59``. + If you run a DAG on a ``schedule_interval`` of one day, the run with data interval starting on ``2019-11-21`` triggers after ``2019-11-21T23:59``. - **Let’s Repeat That**, the scheduler runs your job one ``schedule_interval`` AFTER the start date, at the END of the period. + **Let’s Repeat That**, the scheduler runs your job one ``schedule_interval`` AFTER the start date, at the END of the interval. You should refer to :doc:`/dag-run` for details on scheduling a DAG. Triggering DAG with Future Date ------------------------------- -If you want to use 'external trigger' to run future-dated execution dates, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``. +If you want to use 'external trigger' to run future-dated data intervals, set ``allow_trigger_in_future = True`` in ``scheduler`` section in ``airflow.cfg``. This only has effect if your DAG has no ``schedule_interval``. -If you keep default ``allow_trigger_in_future = False`` and try 'external trigger' to run future-dated execution dates, -the scheduler won't execute it now but the scheduler will execute it in the future once the current date rolls over to the execution date. +If you keep default ``allow_trigger_in_future = False`` and try 'external trigger' to run future-dated data intervals, +the scheduler won't execute it now but the scheduler will execute it in the future once the current date rolls over to the start of the data interval. .. _scheduler:ha: diff --git a/docs/apache-airflow/concepts/tasks.rst b/docs/apache-airflow/concepts/tasks.rst index c1db8416d53f7..efc709050b11f 100644 --- a/docs/apache-airflow/concepts/tasks.rst +++ b/docs/apache-airflow/concepts/tasks.rst @@ -59,7 +59,7 @@ Task Instances Much in the same way that a DAG is instantiated into a :ref:`DAG Run ` each time it runs, the tasks under a DAG are instantiated into *Task Instances*. -An instance of a Task is a specific run of that task for a given DAG (and thus for a given ``execution_date``). They are also the representation of a Task that has *state*, representing what stage of the lifecycle it is in. +An instance of a Task is a specific run of that task for a given DAG (and thus for a given data interval). They are also the representation of a Task that has *state*, representing what stage of the lifecycle it is in. .. _concepts:task-states: @@ -97,9 +97,9 @@ Firstly, it can have *upstream* and *downstream* tasks:: task1 >> task2 >> task3 -When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same ``execution_date``. +When a DAG runs, it will create instances for each of these tasks that are upstream/downstream of each other, but which all have the same data interval. -There may also be instances of the *same task*, but for different values of ``execution_date`` - from other runs of the same DAG. We call these *previous* and *next* - it is a different relationship to *upstream* and *downstream*! +There may also be instances of the *same task*, but for different data intervals - from other runs of the same DAG. We call these *previous* and *next* - it is a different relationship to *upstream* and *downstream*! .. note:: diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 5d47a0be46657..f340fe2928004 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -54,17 +54,36 @@ Cron Presets Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend. -.. note:: - If you run a DAG on a schedule_interval of one day, the run stamped 2020-01-01 - will be triggered soon after 2020-01-01T23:59. In other words, the job instance is - started once the period it covers has ended. The ``execution_date`` available in the context - will also be 2020-01-01. +.. _data-interval: - The first DAG Run is created based on the minimum ``start_date`` for the tasks in your DAG. - Subsequent DAG Runs are created by the scheduler process, based on your DAG’s ``schedule_interval``, - sequentially. If your start_date is 2020-01-01 and schedule_interval is @daily, the first run - will be created on 2020-01-02 i.e., after your start date has passed. +Data Interval +------------- + +Each DAG run in Airflow has an assigned "data interval" that represents the time +range it operates in. For a DAG scheduled with ``@daily``, for example, each of +its data interval would start at midnight of each day and end at midnight of the +next day. + +A DAG run is usually scheduled *after* its associated data interval has ended, +to ensure the run is able to collect all the data within the time period. In +other words, a run covering the data period of 2020-01-01 generally does not +start to run until 2020-01-01 has ended, i.e. after 2020-01-02 00:00:00. + +All dates in Airflow are tied to the data interval concept in some way. The +"logical date" (also called ``execution_date`` in Airflow versions prior to 2.2) +of a DAG run, for example, denotes the start of the data interval, not when the +DAG is actually executed. + +Similarly, since the ``start_date`` argument for the DAG and its tasks points to +the same logical date, it marks the start of *the DAG's fist data interval*, not +when tasks in the DAG will start running. In other words, a DAG run will only be +scheduled one interval after ``start_date``. + +.. tip:: + + If ``schedule_interval`` is not enough to express your DAG's schedule, + logical date, or data interval, see :doc:`/howto/timetable`. Re-run DAG '''''''''' @@ -78,7 +97,7 @@ Catchup An Airflow DAG with a ``start_date``, possibly an ``end_date``, and a ``schedule_interval`` defines a series of intervals which the scheduler turns into individual DAG Runs and executes. The scheduler, by default, will -kick off a DAG Run for any interval that has not been run since the last execution date (or has been cleared). This concept is called Catchup. +kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup. If your DAG is not written to handle its catchup (i.e., not limited to the interval, but instead to ``Now`` for instance.), then you will want to turn catchup off. This can be done by setting ``catchup = False`` in DAG or ``catchup_by_default = False`` @@ -114,9 +133,11 @@ in the configuration file. When turned off, the scheduler creates a DAG run only catchup=False, ) -In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, -(or from the command line), a single DAG Run will be created, with an `execution_date` of 2016-01-01, -and the next one will be created just after midnight on the morning of 2016-01-03 with an execution date of 2016-01-02. +In the example above, if the DAG is picked up by the scheduler daemon on +2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created +with a data between 2016-01-01 and 2016-01-02, and the next one will be created +just after midnight on the morning of 2016-01-03 with a data interval between +2016-01-02 and 2016-01-03. If the ``dag.catchup`` value had been ``True`` instead, the scheduler would have created a DAG Run for each completed interval between 2015-12-01 and 2016-01-02 (but not yet one for 2016-01-02, @@ -158,12 +179,12 @@ The executor will re-run it. There are multiple options you can select to re-run - -* **Past** - All the instances of the task in the runs before the current DAG's execution date -* **Future** - All the instances of the task in the runs after the current DAG's execution date +* **Past** - All the instances of the task in the runs before the DAG's most recent data interval +* **Future** - All the instances of the task in the runs after the DAG's most recent data interval * **Upstream** - The upstream tasks in the current DAG * **Downstream** - The downstream tasks in the current DAG * **Recursive** - All the tasks in the child DAGs and parent DAGs -* **Failed** - Only the failed tasks in the current DAG +* **Failed** - Only the failed tasks in the DAG's most recent run You can also clear the task through CLI using the command: @@ -188,10 +209,10 @@ Note that DAG Runs can also be created manually through the CLI. Just run the .. code-block:: bash - airflow dags trigger --exec-date execution_date run_id + airflow dags trigger --exec-date logical_date run_id The DAG Runs created externally to the scheduler get associated with the trigger’s timestamp and are displayed -in the UI alongside scheduled DAG runs. The execution date passed inside the DAG can be specified using the ``-e`` argument. +in the UI alongside scheduled DAG runs. The logical date passed inside the DAG can be specified using the ``-e`` argument. The default is the current date in the UTC timezone. In addition, you can also manually trigger a DAG Run using the web UI (tab **DAGs** -> column **Links** -> button **Trigger Dag**) diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 645c24258143a..599a1f683c150 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -216,12 +216,23 @@ actually start. If this were not the case, the backfill just would not start. What does ``execution_date`` mean? ---------------------------------- -Airflow was developed as a solution for ETL needs. In the ETL world, you typically summarize data. So, if you want to -summarize data for 2016-02-19, You would do it at 2016-02-20 midnight UTC, which would be right after all data for -2016-02-19 becomes available. - -This datetime value is available to you as :ref:`Template variables` with various formats in Jinja templated -fields. They are also included in the context dictionary given to an Operator's execute function. +*Execution date* or ``execution_date`` is a historical name for what is called a +*logical date*, and also usually the start of the data interval represented by a +DAG run. + +Airflow was developed as a solution for ETL needs. In the ETL world, you +typically summarize data. So, if you want to summarize data for ``2016-02-19``, +you would do it at ``2016-02-20`` midnight UTC, which would be right after all +data for ``2016-02-19`` becomes available. This interval between midnights of +``2016-02-19`` and ``2016-02-20`` is called the *data interval*, and since it +represents data in the date of ``2016-02-19``, this date is also called the +run's *logical date*, or the date that this DAG run is executed for, thus +*execution date*. + +For backward compatibility, a datetime value ``execution_date`` is still +as :ref:`Template variables` with various formats in Jinja +templated fields, and in Airflow's Python API. It is also included in the +context dictionary given to an Operator's execute function. .. code-block:: python @@ -229,7 +240,12 @@ fields. They are also included in the context dictionary given to an Operator's def execute(self, context): logging.info(context["execution_date"]) -Note that ``ds`` refers to date_string, not date start as may be confusing to some. +However, you should always use ``data_interval_start`` or ``data_interval_end`` +if possible, since those names are semantically more correct and less prone to +misunderstandings. + +Note that ``ds`` (the YYYY-MM-DD form of ``data_interval_start``) refers to +*date* ***string***, not *date* ***start*** as may be confusing to some. How to create DAGs dynamically? @@ -295,7 +311,8 @@ commonly attempted in ``user_defined_macros``. bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag) -This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with the execution date 2020-01-01 00:00:00. +This will echo "day={{ ds }}" instead of "day=2020-01-01" for a DAG run with a +``data_interval_start`` of 2020-01-01 00:00:00. .. code-block:: python @@ -310,7 +327,7 @@ Why ``next_ds`` or ``prev_ds`` might not contain expected values? - When scheduling DAG, the ``next_ds`` ``next_ds_nodash`` ``prev_ds`` ``prev_ds_nodash`` are calculated using ``execution_date`` and ``schedule_interval``. If you set ``schedule_interval`` as ``None`` or ``@once``, the ``next_ds``, ``next_ds_nodash``, ``prev_ds``, ``prev_ds_nodash`` values will be set to ``None``. -- When manually triggering DAG, the schedule will be ignored, and ``prev_ds == next_ds == ds`` +- When manually triggering DAG, the schedule will be ignored, and ``prev_ds == next_ds == ds``. Task execution interactions diff --git a/docs/apache-airflow/howto/define_extra_link.rst b/docs/apache-airflow/howto/define_extra_link.rst index 223cd424387d7..631bf7e6a770f 100644 --- a/docs/apache-airflow/howto/define_extra_link.rst +++ b/docs/apache-airflow/howto/define_extra_link.rst @@ -95,10 +95,10 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{logical_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, - execution_date=dttm, + logical_date=dttm, ) @@ -121,6 +121,7 @@ Console, but if we wanted to change that link we could: from airflow.plugins_manager import AirflowPlugin from airflow.models.baseoperator import BaseOperatorLink + from airflow.models.xcom import XCom from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator # Change from https to http just to display the override @@ -136,8 +137,11 @@ Console, but if we wanted to change that link we could: operators = [BigQueryOperator] def get_link(self, operator, dttm): - ti = TaskInstance(task=operator, execution_date=dttm) - job_id = ti.xcom_pull(task_ids=operator.task_id, key="job_id") + job_id = XCom.get_one( + execution_date=dttm, + task_id=operator.task_id, + key="job_id", + ) return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else "" diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index cf2ec014bd1cf..afea741f57dd2 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -33,6 +33,7 @@ configuring an Airflow environment. set-config set-up-database operator/index + timetable customize-ui custom-operator create-custom-decorator diff --git a/docs/apache-airflow/howto/operator/external_task_sensor.rst b/docs/apache-airflow/howto/operator/external_task_sensor.rst index 28987f439d059..f6ae421969bec 100644 --- a/docs/apache-airflow/howto/operator/external_task_sensor.rst +++ b/docs/apache-airflow/howto/operator/external_task_sensor.rst @@ -29,7 +29,8 @@ tasks on the same DAG. For example: on a daily DAG. - Different teams are responsible for different DAGs, but these DAGs have some cross-DAG dependencies. -- A task may depend on another task on the same DAG, but for a different ``execution_date``. +- A task may depend on another task on the same DAG, but for a different ``execution_date`` + (start of the data interval). - Use ``execution_delta`` for tasks running at different times, like ``execution_delta=timedelta(hours=1)`` to check against a task that runs 1 hour earlier. diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 212d9a2270c2c..ec56b702eabbd 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -77,7 +77,7 @@ Unfortunately we currently do not support to serialize ``var`` and ``ti`` / ``ta with the underlying library. For airflow context variables make sure that you either have access to Airflow through setting ``system_site_packages`` to ``True`` or add ``apache-airflow`` to the ``requirements`` argument. Otherwise you won't have access to the most context variables of Airflow in ``op_kwargs``. -If you want the context related to datetime objects like ``execution_date`` you can add ``pendulum`` and +If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy``. Templating diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst new file mode 100644 index 0000000000000..8c1354f59cc47 --- /dev/null +++ b/docs/apache-airflow/howto/timetable.rst @@ -0,0 +1,298 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Customizing DAG Scheduling with Timetables +========================================== + +A DAG's scheduling strategy is determined by its internal "timetable". This +timetable can be created by specifying the DAG's ``schedule_interval`` argument, +as described in :doc:`DAG Run `. The timetable also dictates the data +interval and the logical time of each run created for the DAG. + +However, there are situations when a cron expression or simple ``timedelta`` +periods cannot properly express the schedule. Some of the examples are: + +* Data intervals with "holes" between. (Instead of continuous, as both the cron + expression and ``timedelta`` schedules represent.) +* Run tasks at different times each day. For example, an astronomer may find it + useful to run a task at dawn to process data collected from the previous + night-time period. +* Schedules not following the Gregorian calendar. For example, create a run for + each month in the `Traditional Chinese Calendar`_. This is conceptually + similar to the sunset case above, but for a different time scale. +* Rolling windows, or overlapping data intervals. For example, one may want to + have a run each day, but make each run cover the period of the previous seven + days. It is possible to "hack" this with a cron expression, but a custom data + interval would be a more natural representation. + +.. _`Traditional Chinese Calendar`: https://en.wikipedia.org/wiki/Chinese_calendar + + +For our example, let's say a company wants to run a job after each weekday to +process data collected during the work day. The first intuitive answer to this +would be ``schedule_interval="0 0 * * 1-5"`` (midnight on Monday to Friday), but +this means data collected on Friday will *not* be processed right after Friday +ends, but on the next Monday, and that run's interval would be from midnight +Friday to midnight *Monday*. + +This is, therefore, an example in the "holes" category above; the intended +schedule should not include the two weekend days. What we want is: + +* Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The + run's data interval would cover from midnight of each day, to midnight of the + next day (e.g. 2021-01-01 00:00:00 to 2021-01-02 00:00:00). +* Each run would be created right after the data interval ends. The run covering + Monday happens on midnight Tuesday and so on. The run covering Friday happens + on midnight Saturday. No runs happen on midnights Sunday and Monday. + +For simplicity, we will only deal with UTC datetimes in this example. + + +Timetable Registration +---------------------- + +A timetable must be a subclass of :class:`~airflow.timetables.base.Timetable`, +and be registered as a part of a :doc:`plugin `. The following is a +skeleton for us to implement a new timetable: + +.. code-block:: python + + from airflow.plugins_manager import AirflowPlugin + from airflow.timetables.base import Timetable + + + class AfterWorkdayTimetable(Timetable): + pass + + + class WorkdayTimetablePlugin(AirflowPlugin): + name = "workday_timetable_plugin" + timetables = [AfterWorkdayTimetable] + +Next, we'll start putting code into ``AfterWorkdayTimetable``. After the +implementation is finished, we should be able to use the timetable in our DAG +file: + +.. code-block:: python + + from airflow import DAG + + + with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: + ... + + +Define Scheduling Logic +----------------------- + +When Airflow's scheduler encounters a DAG, it calls one of the two methods to +know when to schedule the DAG's next run. + +* ``next_dagrun_info``: The scheduler uses this to learn the timetable's regular + schedule, i.e. the "one for every workday, run at the end of it" part in our + example. +* ``infer_data_interval``: When a DAG run is manually triggered (from the web + UI, for example), the scheduler uses this method to learn about how to + reverse-infer the out-of-schedule run's data interval. + +We'll start with ``infer_data_interval`` since it's the easier of the two: + +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_infer_data_interval] + :end-before: [END howto_timetable_infer_data_interval] + +The method accepts one argument ``run_after``, a ``pendulum.DateTime`` object +that indicates when the DAG is externally triggered. Since our timetable creates +a data interval for each complete work day, the data interval inferred here +should usually start at the midnight one day prior to ``run_after``, but if +``run_after`` falls on a Sunday or Monday (i.e. the prior day is Saturday or +Sunday), it should be pushed further back to the previous Friday. Once we know +the start of the interval, the end is simply one full day after it. We then +create a :class:`~airflow.timetables.base.DataInterval` object to describe this +interval. + +Next is the implementation of ``next_dagrun_info``: + +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_next_dagrun_info] + :end-before: [END howto_timetable_next_dagrun_info] + +This method accepts two arguments. ``last_automated_dagrun`` is a +:class:`~airflow.timetables.base.DataInterval` instance indicating the data +interval of this DAG's previous non-manually-triggered run, or ``None`` if this +is the first time ever the DAG is being scheduled. ``restriction`` encapsulates +how the DAG and its tasks specify the schedule, and contains three attributes: + +* ``earliest``: The earliest time the DAG may be scheduled. This is a + ``pendulum.DateTime`` calculated from all the ``start_date`` arguments from + the DAG and its tasks, or ``None`` if there are no ``start_date`` arguments + found at all. +* ``latest``: Similar to ``earliest``, this is the latest time the DAG may be + scheduled, calculated from ``end_date`` arguments. +* ``catchup``: A boolean reflecting the DAG's ``catchup`` argument. + +.. note:: + + Both ``earliest`` and ``latest`` apply to the DAG run's logical date + (the *start* of the data interval), not when the run will be scheduled + (usually after the end of the data interval). + +If there was a run scheduled previously, we should now schedule for the next +weekday, i.e. plus one day if the previous run was on Monday through Thursday, +or three days if it was on Friday. If there was not a previous scheduled run, +however, we pick the next workday's midnight after ``restriction.earliest`` +(unless it *is* a workday's midnight; in which case it's used directly). +``restriction.catchup`` also needs to be considered---if it's ``False``, we +can't schedule before the current time, even if ``start_date`` values are in the +past. Finally, if our calculated data interval is later than +``restriction.latest``, we must respect it and not schedule a run by returning +``None``. + +If we decide to schedule a run, we need to describe it with a +:class:`~airflow.timetables.base.DagRunInfo`. This type has two arguments and +attributes: + +* ``data_interval``: A :class:`~airflow.timetables.base.DataInterval` instance + describing the next run's data interval. +* ``run_after``: A ``pendulum.DateTime`` instance that tells the scheduler when + the DAG run can be scheduled. + +A ``DagRunInfo`` can be created like this: + +.. code-block:: python + + info = DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=run_after, + ) + +Since we typically want to schedule a run as soon as the data interval ends, +``end`` and ``run_after`` above are generally the same. ``DagRunInfo`` therefore +provides a shortcut for this: + +.. code-block:: python + + info = DagRunInfo.interval(start=start, end=end) + assert info.data_interval.end == info.run_after # Always True. + +For reference, here's our plugin and DAG files in their entirety: + +.. exampleinclude:: /../../airflow/example_dags/plugins/workday.py + :language: python + :start-after: [START howto_timetable] + :end-before: [END howto_timetable] + +.. code-block:: python + + import datetime + + from airflow import DAG + from airflow.example_dags.plugins.workday import AfterWorkdayTimetable + from airflow.operators.dummy import DummyOperator + + + with DAG( + dag_id="example_workday_timetable", + start_date=datetime.datetime(2021, 1, 1), + timetable=AfterWorkdayTimetable(), + tags=["example", "timetable"], + ) as dag: + DummyOperator(task_id="run_this") + + +Parameterized Timetables +------------------------ + +Sometimes we need to pass some run-time arguments to the timetable. Continuing +with our ``AfterWorkdayTimetable`` example, maybe we have DAGs running on +different timezones, and we want to schedule some DAGs at 8am the next day, +instead of on midnight. Instead of creating a separate timetable for each +purpose, we'd want to do something like: + +.. code-block:: python + + class SometimeAfterWorkdayTimetable(Timetable): + def __init__(self, schedule_at: Time) -> None: + self._schedule_at = schedule_at + + def next_dagrun_info(self, last_automated_dagrun, restriction): + ... + end = start + timedelta(days=1) + return DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=DateTime.combine(end.date(), self._schedule_at), + ) + +However, since the timetable is a part of the DAG, we need to tell Airflow how +to serialize it with the context we provide in ``__init__``. This is done by +implementing two additional methods on our timetable class: + +.. code-block:: python + + class SometimeAfterWorkdayTimetable(Timetable): + ... + + def serialize(self) -> Dict[str, Any]: + return {"schedule_at": self._schedule_at.isoformat()} + + @classmethod + def deserialize(cls, value: Dict[str, Any]) -> Timetable: + return cls(Time.fromisoformat(value["schedule_at"])) + +When the DAG is being serialized, ``serialize`` is called to obtain a +JSON-serializable value. That value is passed to ``deserialize`` when the +serialized DAG is accessed by the scheduler to reconstruct the timetable. + + +Timetable Display in UI +======================= + +By default, a custom timetable is displayed by their class name in the UI (e.g. +the *Schedule* column in the "DAGs" table. It is possible to customize this +by overriding the ``summary`` property. This is especially useful for +parameterized timetables to include arguments provided in ``__init__``. For +our ``SometimeAfterWorkdayTimetable`` class, for example, we could have: + +.. code-block:: python + + @property + def summary(self) -> str: + return f"after each workday, at {self._schedule_at}" + +So for a DAG declared like this: + +.. code-block:: python + + with DAG( + timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am. + ..., + ) as dag: + ... + +The *Schedule* column would say ``after each workday, at 08:00:00``. + + +.. seealso:: + + Module :mod:`airflow.timetables.base` + The public interface is heavily documented to explain what should be + implemented by subclasses. diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 227a386a333c1..3680010a08b9c 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -57,7 +57,7 @@ works. f_in = File(url="/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: - f_out = File(url="/tmp/{}/{{{{ execution_date }}}}".format(file)) + f_out = File(url="/tmp/{}/{{{{ data_interval_start }}}}".format(file)) outlets.append(f_out) run_this = BashOperator( @@ -73,7 +73,7 @@ for the downstream task. .. note:: Operators can add inlets and outlets automatically if the operator supports it. In the example DAG task ``run_this`` (task_id=``run_me_first``) is a BashOperator that takes 3 inlets: ``CAT1``, ``CAT2``, ``CAT3``, that are -generated from a list. Note that ``execution_date`` is a templated field and will be rendered when the task is running. +generated from a list. Note that ``data_interval_start`` is a templated field and will be rendered when the task is running. .. note:: Behind the scenes Airflow prepares the lineage metadata as part of the ``pre_execute`` method of a task. When the task has finished execution ``post_execute`` is called and lineage metadata is pushed into XCOM. Thus if you are creating diff --git a/docs/apache-airflow/logging-monitoring/errors.rst b/docs/apache-airflow/logging-monitoring/errors.rst index c8c303da6e66c..7a5df129585fa 100644 --- a/docs/apache-airflow/logging-monitoring/errors.rst +++ b/docs/apache-airflow/logging-monitoring/errors.rst @@ -52,14 +52,19 @@ Name Description ======================================= ================================================== ``dag_id`` Dag name of the dag that failed ``task_id`` Task name of the task that failed -``execution_date`` Execution date when the task failed +``data_interval_start`` Start of data interval when the task failed +``data_interval_end`` End of data interval when the task failed ``operator`` Operator name of the task that failed ======================================= ================================================== +For backward compatibility, an additional tag ``execution_date`` is also +available to represent the logical date. The tag should be considered deprecated +in favor of ``data_interval_start``. + + Breadcrumbs ------------ - When a task fails with an error `breadcrumbs `__ will be added for the other tasks in the current dag run. ======================================= ============================================================== diff --git a/docs/apache-airflow/logging-monitoring/logging-tasks.rst b/docs/apache-airflow/logging-monitoring/logging-tasks.rst index c1b8636aa3a2b..aa5a6fa2e34e2 100644 --- a/docs/apache-airflow/logging-monitoring/logging-tasks.rst +++ b/docs/apache-airflow/logging-monitoring/logging-tasks.rst @@ -38,7 +38,7 @@ directory. .. note:: For more information on setting the configuration, see :doc:`/howto/set-config` -The following convention is followed while naming logs: ``{dag_id}/{task_id}/{execution_date}/{try_number}.log`` +The following convention is followed while naming logs: ``{dag_id}/{task_id}/{logical_date}/{try_number}.log`` In addition, users can supply a remote location to store current logs and backups. diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst index d59659e7fd83b..3543aef05dfed 100644 --- a/docs/apache-airflow/plugins.rst +++ b/docs/apache-airflow/plugins.rst @@ -139,6 +139,9 @@ looks like: # buttons. operator_extra_links = [] + # A list of timetable classes to register so they can be used in DAGs. + timetables = [] + You can derive it by inheritance (please refer to the example below). In the example, all options have been defined as class attributes, but you can also define them as properties if you need to perform additional initialization. Please note ``name`` inside this class must be specified. @@ -244,10 +247,10 @@ definitions in Airflow. operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{logical_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, - execution_date=dttm, + logical_date=dttm, ) diff --git a/docs/apache-airflow/python-api-ref.rst b/docs/apache-airflow/python-api-ref.rst index e79842ebc8e39..591bb84f5ecca 100644 --- a/docs/apache-airflow/python-api-ref.rst +++ b/docs/apache-airflow/python-api-ref.rst @@ -137,3 +137,14 @@ All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. :maxdepth: 1 _api/airflow/secrets/index + +Timetables +---------- +Custom timetable implementations provide Airflow's scheduler additional logic to +schedule DAG runs in ways not possible with built-in schedule expressions. + +.. toctree:: + :includehidden: + :maxdepth: 1 + + _api/airflow/timetables/index diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 2a191f2dc70c2..8a371e717ebde 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -140,10 +140,12 @@ using ``pendulum``. op = DummyOperator(task_id="dummy", dag=dag) print(dag.timezone) # -Please note that while it is possible to set a ``start_date`` and ``end_date`` for Tasks always the DAG timezone -or global timezone (in that order) will be used to calculate the next execution date. Upon first encounter -the start date or end date will be converted to UTC using the timezone associated with start_date or end_date, -then for calculations this timezone information will be disregarded. +Please note that while it is possible to set a ``start_date`` and ``end_date`` +for Tasks, the DAG timezone or global timezone (in that order) will always be +used to calculate data intervals. Upon first encounter, the start date or end +date will be converted to UTC using the timezone associated with ``start_date`` +or ``end_date``, then for calculations this timezone information will be +disregarded. Templates ''''''''' @@ -156,7 +158,7 @@ It is left up to the DAG to handle this. import pendulum local_tz = pendulum.timezone("Europe/Amsterdam") - local_tz.convert(execution_date) + local_tz.convert(logical_date) Cron schedules '''''''''''''' diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index 70f89fd2c09ce..c541d0e488467 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -288,11 +288,17 @@ Let's run a few commands to validate this script further. Testing ''''''' -Let's test by running the actual task instances for a specific date. The -date specified in this context is called ``execution_date``. This is the -*logical* date, which simulates the scheduler running your task or dag at -a specific date and time, even though it *physically* will run now ( -or as soon as its dependencies are met). +Let's test by running the actual task instances for a specific date. The date +specified in this context is called the *logical date* (also called *execution +date* for historical reasons), which simulates the scheduler running your task +or DAG for a specific date and time, even though it *physically* will run now +(or as soon as its dependencies are met). + +We said the scheduler runs your task *for* a specific date and time, not *at*. +This is because each run of a DAG conceptually represents not a specific date +and time, but an interval between two times, called a +:ref:`data interval `. A DAG run's logical date is the start of +its data interval. .. code-block:: bash @@ -320,10 +326,11 @@ their log to stdout (on screen), does not bother with dependencies, and does not communicate state (running, success, failed, ...) to the database. It simply allows testing a single task instance. -The same applies to ``airflow dags test [dag_id] [execution_date]``, but on a DAG level. It performs a single -DAG run of the given DAG id. While it does take task dependencies into account, no state is registered in the -database. It is convenient for locally testing a full run of your DAG, given that e.g. if one of your tasks -expects data at some location, it is available. +The same applies to ``airflow dags test [dag_id] [logical_date]``, but on a DAG +level. It performs a single DAG run of the given DAG id. While it does take task +dependencies into account, no state is registered in the database. It is +convenient for locally testing a full run of your DAG, given that e.g. if one of +your tasks expects data at some location, it is available. Backfill '''''''' @@ -335,9 +342,9 @@ are interested in tracking the progress visually as your backfill progresses. Note that if you use ``depends_on_past=True``, individual task instances will depend on the success of their previous task instance (that is, previous -according to ``execution_date``). Task instances with ``execution_date==start_date`` -will disregard this dependency because there would be no -past task instances created for them. +according to the logical date). Task instances with their logical dates equal to +``start_date`` will disregard this dependency because there would be no past +task instances created for them. You may also want to consider ``wait_for_downstream=True`` when using ``depends_on_past=True``. While ``depends_on_past=True`` causes a task instance to depend on the success diff --git a/docs/conf.py b/docs/conf.py index b3af13c103f8c..8afba7fed2601 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -215,7 +215,16 @@ def _get_rst_filepath_from_path(filepath: str): name = os.path.basename(path) if os.path.isfile(path) and not path.endswith(_allowed_top_level): exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}") - browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"] + browsable_packages = [ + "hooks", + "executors", + "models", + "operators", + "providers", + "secrets", + "sensors", + "timetables", + ] if os.path.isdir(path) and name not in browsable_packages: exclude_patterns.append(f"_api/airflow/{name}") else: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 66d6b29c6c964..df81f30e166ef 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -228,6 +228,7 @@ Memorystore Mesos MessageAttributes Metastore +Midnights Mixin Mongo Moto @@ -259,6 +260,7 @@ PTarget Pagerduty Papermill Parallelize +Parameterized Parameterizing Paramiko Params @@ -578,6 +580,7 @@ createDisposition creationTimestamp credssp cron +croniter cronjob crontab crypto @@ -953,6 +956,7 @@ mget microservice microsoft middleware +midnights milton minicluster minikube diff --git a/tests/core/test_sentry.py b/tests/core/test_sentry.py index bb962ccd2b38a..99fac4064f154 100644 --- a/tests/core/test_sentry.py +++ b/tests/core/test_sentry.py @@ -29,6 +29,8 @@ from tests.test_utils.config import conf_vars EXECUTION_DATE = timezone.utcnow() +SCHEDULE_INTERVAL = datetime.timedelta(days=1) +DATA_INTERVAL = (EXECUTION_DATE, EXECUTION_DATE + SCHEDULE_INTERVAL) DAG_ID = "test_dag" TASK_ID = "test_task" OPERATOR = "PythonOperator" @@ -37,6 +39,8 @@ TEST_SCOPE = { "dag_id": DAG_ID, "task_id": TASK_ID, + "data_interval_start": DATA_INTERVAL[0], + "data_interval_end": DATA_INTERVAL[1], "execution_date": EXECUTION_DATE, "operator": OPERATOR, "try_number": TRY_NUMBER, @@ -62,10 +66,10 @@ class TestSentryHook: @pytest.fixture def task_instance(self, dag_maker): # Mock the Dag - with dag_maker(DAG_ID): + with dag_maker(DAG_ID, schedule_interval=SCHEDULE_INTERVAL): task = PythonOperator(task_id=TASK_ID, python_callable=int) - dr = dag_maker.create_dagrun(execution_date=EXECUTION_DATE) + dr = dag_maker.create_dagrun(data_interval=DATA_INTERVAL, execution_date=EXECUTION_DATE) ti = dr.task_instances[0] ti.state = STATE ti.task = task diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 004a8bcca4461..604e5aa44a164 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -43,7 +43,7 @@ class TestElasticsearchTaskHandler: TASK_ID = 'task_for_testing_es_log_handler' EXECUTION_DATE = datetime(2016, 1, 1) LOG_ID = f'{DAG_ID}-{TASK_ID}-2016-01-01T00:00:00+00:00-1' - JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_execution_date(EXECUTION_DATE)}-1' + JSON_LOG_ID = f'{DAG_ID}-{TASK_ID}-{ElasticsearchTaskHandler._clean_date(EXECUTION_DATE)}-1' @pytest.fixture() def ti(self, create_task_instance): @@ -406,8 +406,8 @@ def test_render_log_id(self, ti): self.es_task_handler.json_format = True assert self.JSON_LOG_ID == self.es_task_handler._render_log_id(ti, 1) - def test_clean_execution_date(self): - clean_execution_date = self.es_task_handler._clean_execution_date(datetime(2016, 7, 8, 9, 10, 11, 12)) + def test_clean_date(self): + clean_execution_date = self.es_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12)) assert '2016_07_08T09_10_11_000012' == clean_execution_date @pytest.mark.parametrize(