diff --git a/airflow/example_dags/example_workday_timetable.py b/airflow/example_dags/example_workday_timetable.py new file mode 100644 index 0000000000000..dd29b978b1919 --- /dev/null +++ b/airflow/example_dags/example_workday_timetable.py @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Example DAG demostrating how to implement a custom timetable for a DAG.""" + +# [START howto_timetable] +from datetime import timedelta +from typing import Optional + +from pendulum import Date, DateTime, Time, timezone + +from airflow import DAG +from airflow.operators.dummy import DummyOperator +from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable + +UTC = timezone("UTC") + + +class AfterWorkdayTimetable(Timetable): + + # [START howto_timetable_infer_data_interval] + def infer_data_interval(self, run_after: DateTime) -> DataInterval: + weekday = run_after.weekday() + if weekday in (0, 6): # Monday and Sunday -- interval is last Friday. + days_since_friday = (run_after.weekday() - 4) % 7 + delta = timedelta(days=days_since_friday) + else: # Otherwise the interval is yesterday. + delta = timedelta(days=1) + start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC) + return DataInterval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_infer_data_interval] + + # [START howto_timetable_next_dagrun_info] + def next_dagrun_info( + self, + last_automated_dagrun: Optional[DateTime], + restriction: TimeRestriction, + ) -> Optional[DagRunInfo]: + if last_automated_dagrun is not None: + # There was a previous run on the regular schedule. + # last_automated_dagrun os the last run's logical date. + weekday = last_automated_dagrun.weekday() + if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow. + delta = timedelta(days=1) + else: # Week is ending -- skip to next Monday. + delta = timedelta(days=(7 - weekday)) + start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min) + else: # This is the first ever run on the regular schedule. + if restriction.earliest is None: # No start_date. Don't schedule. + return None + start = restriction.earliest + if start.time() != Time.min: + # If earliest does not fall on midnight, skip to the next day. + start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1) + if not restriction.catchup: + # If the DAG has catchup=False, today is the earliest to consider. + start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC)) + weekday = start.weekday() + if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday. + delta = timedelta(days=(7 - weekday)) + start = start + delta + if start > restriction.latest: # Over the DAG's scheduled end; don't schedule. + return None + return DagRunInfo.interval(start=start, end=(start + timedelta(days=1))) + + # [END howto_timetable_next_dagrun_info] + + +with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag: + DummyOperator(task_id="run_this") + + +if __name__ == "__main__": + dag.cli() + +# [END howto_timetable] diff --git a/airflow/timetables/schedules.py b/airflow/timetables/schedules.py index 8129a8845bf68..b6e7c614e0fb2 100644 --- a/airflow/timetables/schedules.py +++ b/airflow/timetables/schedules.py @@ -88,7 +88,7 @@ class CronSchedule(Schedule): """Schedule things from a cron expression. The implementation extends on croniter to add timezone awareness. This is - because crontier works only with naive timestamps, and cannot consider DST + because croniter works only with naive timestamps, and cannot consider DST when determining the next/previous time. """ diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index c564ef85b7efc..43f8cf86b4c71 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -148,11 +148,20 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab `. DAG Runs can run in parallel for the same DAG, and each has a defined ``execution_date``, which identifies the *logical* date and time it is running for - not the *actual* time when it was started. +.. tip:: + + For more information on ``schedule_interval`` values, see :doc:`DAG Run `. + + If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables `. + +Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a :doc:`DAG Run `. DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the *logical* date and time range it is running for - not the *actual* time when it was started. As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It's been rewritten, and you want to run it on the previous 3 months of data - no problem, since Airflow can *backfill* the DAG and run copies of it for every day in those previous 3 months, all at once. -Those DAG Runs will all have been started on the same actual day, but their ``execution_date`` values will cover those last 3 months, and that's what all the tasks, operators and sensors inside the DAG look at when they run. +Those DAG Runs will all have been started on the same actual day, but each DAG +run will have one data interval covering a single day in that 3 month period, +and that data interval is all the tasks, operators and sensors inside the DAG +look at when they run. In much the same way a DAG instantiates into a DAG Run every time it's run, Tasks specified inside a DAG also instantiate into :ref:`Task Instances ` along with it. diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 5d47a0be46657..fb196a66df7c0 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -54,17 +54,33 @@ Cron Presets Your DAG will be instantiated for each schedule along with a corresponding DAG Run entry in the database backend. -.. note:: +Data Interval +------------- - If you run a DAG on a schedule_interval of one day, the run stamped 2020-01-01 - will be triggered soon after 2020-01-01T23:59. In other words, the job instance is - started once the period it covers has ended. The ``execution_date`` available in the context - will also be 2020-01-01. +Each DAG run in Airflow has an assigned "data interval" that represents the time +range it operates in. For a DAG scheduled with ``@daily``, for example, each of +its data interval would start at midnight of each day and end at midnight of the +next day. - The first DAG Run is created based on the minimum ``start_date`` for the tasks in your DAG. - Subsequent DAG Runs are created by the scheduler process, based on your DAG’s ``schedule_interval``, - sequentially. If your start_date is 2020-01-01 and schedule_interval is @daily, the first run - will be created on 2020-01-02 i.e., after your start date has passed. +A DAG run is scheduled *after* its associated data interval has ended, to ensure +the run is able to collect all the data within the time period. Therefore, a run +covering the data period of 2020-01-01 will not start to run until 2020-01-01 +has ended, i.e. after 2020-01-02 00:00:00. + +All dates in Airflow are tied to the data interval concept in some way. The +"logical date" (also called ``execution_date`` in Airflow versions prior to 2.2) +of a DAG run, for example, usually denotes the start of the data interval, not +when the DAG is actually executed. + +Similarly, since the ``start_date`` argument for the DAG and its tasks points to +the same logical date, it marks the start of *the DAG's fist data interval*, not +when tasks in the DAG will start running. In other words, a DAG run will only be +scheduled one interval after ``start_date``. + +.. tip:: + + If ``schedule_interval`` is not enough to express your DAG's schedule, + logical date, or data interval, see :doc:`Customizing imetables `. Re-run DAG '''''''''' diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index efd5c481b5be0..9fb80fba108cd 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -33,6 +33,7 @@ configuring an Airflow environment. set-config set-up-database operator/index + timetable customize-state-colors-ui customize-dag-ui-page-instance-name custom-operator diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst new file mode 100644 index 0000000000000..bfb66c22d8fb8 --- /dev/null +++ b/docs/apache-airflow/howto/timetable.rst @@ -0,0 +1,261 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +Customizing DAG Scheduling with Timetables +========================================== + +A DAG's scheduling strategy is determined by its internal "timetable". This +timetable can be created by specifying the DAG's ``schedule_interval`` argument, +as described in :doc:`DAG Run `. The timetable also dictates the data +interval and the logical time of each run created for the DAG. + +However, there are situations when a cron expression or simple ``timedelta`` +periods cannot properly express the schedule. Some of the examples are: + +* Data intervals with "holes" between. (Instead of continuous, as both the cron + expression and ``timedelta`` schedules represent.) +* Run tasks at different times each day. For example, an astronomer may find it + useful to run a task at sunset to process data collected from the previous + sunlight period. +* Schedules not following the Gregorian calendar. For example, create a run for + each month in the `Traditional Chinese Calendar`_. This is conceptually + similar to the sunset case above, but for a different time scale. +* Rolling windows, or overlapping data intervals. For example, one may want to + have a run each day, but make each run cover the period of the previous seven + days. It is possible to "hack" this with a cron expression, but a custom data + interval would be a more natural representation. + +.. _`Traditional Chinese Calendar`: https://en.wikipedia.org/wiki/Chinese_calendar + + +For our example, let's say a company wants to run a job after each weekday to +process data collected during the work day. The first intuitive answer to this +would be ``schedule_interval="0 0 * * 1-5"`` (midnight on Monday to Friday), but +this means data collected on Friday will *not* be processed right after Friday +ends, but on the next Monday, and that run's interval would be from midnight +Friday to midnight *Monday*. + +This is, therefore, an example in the "holes" category above; the intended +schedule should not include the two weekend days. What we want is: + +* Schedule a run for each Monday, Tuesday, Wednesday, Thursday, and Friday. The + run's data interval would cover from midnight of each day, to midnight of the + next day (e.g. 2021-01-01 00:00:00 to 2021-01-02 00:00:00). +* Each run would be created right after the data interval ends. The run covering + Monday happens on midnight Tuesday and so on. The run covering Friday happens + on midnight Saturday. No runs happen on midnights Sunday and Monday. + +For simplicity, we will only deal with UTC datetimes in this example. + + +Define Scheduling Logic +----------------------- + +When Airflow's scheduler encounters a DAG, it calls one of the two methods to +know when to schedule the DAG's next run. + +* ``next_dagrun_info``: The scheduler uses this to learn the timetable's regular + schedule, i.e. the "one for every workday, run at the end of it" part in our + example. +* ``infer_data_interval``: When a DAG run is manually triggered (from the web + UI, for example), the scheduler uses this method to learn about how to + reverse-infer the out-of-schedule run's data interval. + +We'll start with ``infer_data_interval`` since it's the easier of the two: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_infer_data_interval] + :end-before: [END howto_timetable_infer_data_interval] + +The method accepts one argument ``run_after``, a ``pendulum.DateTime`` object +that indicates when the DAG is externally triggered. Since our timetable creates +a data interval for each complete work day, the data interval inferred here +should usually start at the midnight one day prior to ``run_after``, but if +``run_after`` falls on a Sunday or Monday (i.e. the prior day is Saturday or +Sunday), it should be pushed further back to the previous Friday. Once we know +the start of the interval, the end is simply one full day after it. We then +create a :class:`~airflow.timetables.base.DataInterval` object to describe this +interval. + +Next is the implementation of ``next_dagrun_info``: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :dedent: 4 + :start-after: [START howto_timetable_next_dagrun_info] + :end-before: [END howto_timetable_next_dagrun_info] + +This method accepts two arguments. ``last_automated_dagrun`` is a +``pendulum.DateTime`` object indicating the logical date of this DAG's previous +non-manually-triggered run, or ``None`` if this is the first time ever the DAG +is being scheduled. ``restriction`` encapsulates how the DAG and its tasks +specify the schedule, and contains three attributes: + +* ``earliest``: The earliest time the DAG may be scheduled. This is a + calculated ``pendulum.DateTime`` from all the ``start_date`` arguments from + the DAG and its tasks, or ``Non`` of there are no ``start_date`` arguments + found at all. +* ``latest``: Similar to ``earliest``, this is the latest time the DAG may be + scheduled, calculated from ``end_date`` arguments. +* ``catchup``: A boolean reflecting the DAG's ``catchup`` argument. + +.. note:: + + Both ``earliest`` and ``latest`` apply to the DAG run's logical date + (the *start* of the data interval), not when the run will be scheduled + (usually after the end of the data interval). + +If there was a run scheduled previously, we should now schedule for the next +weekday, i.e. plus one day if the previous run was on Monday through Thursday, +or three days if it was on Friday. If there was not a previous scheduled run, +however, we pick the next workday's midnight after ``restriction.earliest`` +(unless it *is* a workday's midnight; in which case it's used directly). +``restriction.catchup`` also needs to be considered---if it's ``False``, we +can't schedule before the current time, even if ``start_date`` values are in the +past. Finally, if our calculated data interval is later than +``restriction.latest``, we must respect it and not schedule a run by returning +``None``. + +If we decide to schedule a run, we need to describe it with a +:class:`~airflow.timetables.base.DagRunInfo`. This type has two arguments and +attributes: + +* ``data_interval``: A :class:`~airflow.timetables.base.DataInterval` instance + like ``infer_data_interval``'s return value. This describes the next run's + data interval. +* ``run_after: A ``pendulum.DateTime`` instance that tells the scheduler when + the DAG run can be scheduled. + +.. note:: + + In case you're wondering---yes, the argument and return value of + ``infer_data_interval`` are also internally combined into a ``DagRunInfo``. + +A ``DagRunInfo`` can be created like this: + +.. code-block:: python + + info = DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=run_after, + ) + +But since we typically wan to schedule a run as soon as the data interval ends, +``end`` and ``run_after`` above are generally the same. ``DagRunInfo`` therefore +provides a shortcut for this: + +.. code-block:: python + + info = DagRunInfo.interval(start=start, end=end) + assert info.data_interval.end == info.run_after # Always True. + +For reference, here's our DAG file in its entirety: + +.. exampleinclude:: /../../airflow/example_dags/example_workday_timetable.py + :language: python + :start-after: [START howto_timetable] + :end-before: [END howto_timetable] + + +DAG Serialization and Parameterized Timetables +---------------------------------------------- + +Sometimes we need to pass some run-time arguments to the timetable. Continuing +with out ``AfterWorkdayTimetable``, maybe we may have DAGs running on different +timezones, and we want to schedule some DAGs at 8am the next day, instead of +on midnight. Instead of creating a separate timetable for each purpose, we'd +want to do something like: + +.. code-block:: python + + from datetime import timedelta + + from pendulum import DateTime, Time + + + class SometimeAfterWorkdayTimetable(Timetable): + def __init__(self, schedule_at: Time) -> None: + self._schedule_at = schedule_at + + def next_dagrun_info(self, last_automated_dagrun, restriction): + ... + end = start + timedelta(days=1) + return DagRunInfo( + data_interval=DataInterval(start=start, end=end), + run_after=DateTime.combine(end.date(), self._schedule_at).replace( + tzinfo=end.tzinfo + ), + ) + +However, since the timetable is a part of the DAG, we need to tell Airflow how +to serialize it with the context we provide in ``__init__``. This is done by +implementing two additional methods on our timetable class: + +.. code-block:: python + + def serialize(self) -> Dict[str, Any]: + return {"schedule_at": self._schedule_at.isoformat()} + + + @classmethod + def deserialize(cls, value: Dict[str, Any]) -> Timetable: + return cls(Time.fromisoformat(value["schedule_at"])) + +When the DAG is being serialized, ``serialize`` is called to obtain a +JSON-serializable value. That value is passed to ``deserialize`` when the +serialized DAG is accessed by the scheduler to reconstruct the timetable. + + +Timetable Display in UI +======================= + +By default, a custom timetable is displayed by their class name in the UI (e.g. +the *Schedule* column in the "DAGs" table. It is possible to customize this +by overriding the ``summary`` property. This is especially useful for +parameterized timetables to include arguments provided in ``__init__``. For +our ``SometimeAfterWorkdayTimetable`` class, for example, we could have: + +.. code-block:: python + + @property + def summary(self) -> str: + return f"after each workday, at {self._schedule_at}" + +So for a DAG declared like this: + +.. code-block:: python + + from pendulum import Time + + + with DAG( + timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am. + ..., + ) as dag: + ... + +The *Schedule* column would say ``after each workday, at 08:00:00``. + + +.. seealso:: + + Module :mod:`airflow.timetables.base` + The public interface is heavily documented to explain what they should + be implemented by subclasses. diff --git a/docs/apache-airflow/python-api-ref.rst b/docs/apache-airflow/python-api-ref.rst index e79842ebc8e39..591bb84f5ecca 100644 --- a/docs/apache-airflow/python-api-ref.rst +++ b/docs/apache-airflow/python-api-ref.rst @@ -137,3 +137,14 @@ All secrets backends derive from :class:`~airflow.secrets.BaseSecretsBackend`. :maxdepth: 1 _api/airflow/secrets/index + +Timetables +---------- +Custom timetable implementations provide Airflow's scheduler additional logic to +schedule DAG runs in ways not possible with built-in schedule expressions. + +.. toctree:: + :includehidden: + :maxdepth: 1 + + _api/airflow/timetables/index diff --git a/docs/conf.py b/docs/conf.py index af642b8ab3f73..7570dab8277a4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -201,7 +201,16 @@ def _get_rst_filepath_from_path(filepath: str): name = os.path.basename(path) if os.path.isfile(path) and not path.endswith(_allowed_top_level): exclude_patterns.append(f"_api/airflow/{name.rpartition('.')[0]}") - browsable_packages = ["operators", "hooks", "sensors", "providers", "executors", "models", "secrets"] + browsable_packages = [ + "hooks", + "executors", + "models", + "operators", + "providers", + "secrets", + "sensors", + "timetables", + ] if os.path.isdir(path) and name not in browsable_packages: exclude_patterns.append(f"_api/airflow/{name}") else: diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 24defa4064907..951b89d14fa22 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -227,6 +227,7 @@ Memorystore Mesos MessageAttributes Metastore +Midnights Mixin Mongo Moto @@ -256,6 +257,7 @@ PTarget Pagerduty Papermill Parallelize +Parameterized Parameterizing Paramiko Params @@ -573,6 +575,7 @@ createDisposition creationTimestamp credssp cron +croniter cronjob crontab crypto @@ -943,6 +946,7 @@ mget microservice microsoft middleware +midnights milton minicluster minikube