Skip to content

Commit

Permalink
Mention data interval and add timetable how-to
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Aug 17, 2021
1 parent 2935be1 commit 9238ce6
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 13 deletions.
92 changes: 92 additions & 0 deletions airflow/example_dags/example_workday_timetable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Example DAG demostrating how to implement a custom timetable for a DAG."""

# [START howto_timetable]
from datetime import timedelta
from typing import Optional

from pendulum import Date, DateTime, Time, timezone

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable

UTC = timezone("UTC")


class AfterWorkdayTimetable(Timetable):

# [START howto_timetable_infer_data_interval]
def infer_data_interval(self, run_after: DateTime) -> DataInterval:
weekday = run_after.weekday()
if weekday in (0, 6): # Monday and Sunday -- interval is last Friday.
days_since_friday = (run_after.weekday() - 4) % 7
delta = timedelta(days=days_since_friday)
else: # Otherwise the interval is yesterday.
delta = timedelta(days=1)
start = DateTime.combine((run_after - delta).date(), Time.min).replace(tzinfo=UTC)
return DataInterval(start=start, end=(start + timedelta(days=1)))

# [END howto_timetable_infer_data_interval]

# [START howto_timetable_next_dagrun_info]
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_dagrun is not None:
# There was a previous run on the regular schedule.
# last_automated_dagrun os the last run's logical date.
weekday = last_automated_dagrun.weekday()
if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow.
delta = timedelta(days=1)
else: # Week is ending -- skip to next Monday.
delta = timedelta(days=(7 - weekday))
start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min)
else: # This is the first ever run on the regular schedule.
if restriction.earliest is None: # No start_date. Don't schedule.
return None
start = restriction.earliest
if start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1)
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
weekday = start.weekday()
if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday.
delta = timedelta(days=(7 - weekday))
start = start + delta
if start > restriction.latest: # Over the DAG's scheduled end; don't schedule.
return None
return DagRunInfo.interval(start=start, end=(start + timedelta(days=1)))

# [END howto_timetable_next_dagrun_info]


with DAG(timetable=AfterWorkdayTimetable(), tags=["example", "timetable"]) as dag:
DummyOperator(task_id="run_this")


if __name__ == "__main__":
dag.cli()

# [END howto_timetable]
2 changes: 1 addition & 1 deletion airflow/timetables/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class CronSchedule(Schedule):
"""Schedule things from a cron expression.
The implementation extends on croniter to add timezone awareness. This is
because crontier works only with naive timestamps, and cannot consider DST
because croniter works only with naive timestamps, and cannot consider DST
when determining the next/previous time.
"""

Expand Down
13 changes: 11 additions & 2 deletions docs/apache-airflow/concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,20 @@ The ``schedule_interval`` argument takes any value that is a valid `Crontab <htt
with DAG("my_daily_dag", schedule_interval="0 * * * *"):
...

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a :doc:`DAG Run </dag-run>`. DAG Runs can run in parallel for the same DAG, and each has a defined ``execution_date``, which identifies the *logical* date and time it is running for - not the *actual* time when it was started.
.. tip::

For more information on ``schedule_interval`` values, see :doc:`DAG Run </dag-run>`.

If ``schedule_interval`` is not enough to express the DAG's schedule, see :doc:`Timetables </howto/timetable>`.

Every time you run a DAG, you are creating a new instance of that DAG which Airflow calls a :doc:`DAG Run </dag-run>`. DAG Runs can run in parallel for the same DAG, and each has a defined data interval, which identifies the *logical* date and time range it is running for - not the *actual* time when it was started.

As an example of why this is useful, consider writing a DAG that processes a daily set of experimental data. It's been rewritten, and you want to run it on the previous 3 months of data - no problem, since Airflow can *backfill* the DAG and run copies of it for every day in those previous 3 months, all at once.

Those DAG Runs will all have been started on the same actual day, but their ``execution_date`` values will cover those last 3 months, and that's what all the tasks, operators and sensors inside the DAG look at when they run.
Those DAG Runs will all have been started on the same actual day, but each DAG
run will have one data interval covering a single day in that 3 month period,
and that data interval is all the tasks, operators and sensors inside the DAG
look at when they run.

In much the same way a DAG instantiates into a DAG Run every time it's run, Tasks specified inside a DAG also instantiate into :ref:`Task Instances <concepts:task-instances>` along with it.

Expand Down
34 changes: 25 additions & 9 deletions docs/apache-airflow/dag-run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,33 @@ Cron Presets
Your DAG will be instantiated for each schedule along with a corresponding
DAG Run entry in the database backend.

.. note::
Data Interval
-------------

If you run a DAG on a schedule_interval of one day, the run stamped 2020-01-01
will be triggered soon after 2020-01-01T23:59. In other words, the job instance is
started once the period it covers has ended. The ``execution_date`` available in the context
will also be 2020-01-01.
Each DAG run in Airflow has an assigned "data interval" that represents the time
range it operates in. For a DAG scheduled with ``@daily``, for example, each of
its data interval would start at midnight of each day and end at midnight of the
next day.

The first DAG Run is created based on the minimum ``start_date`` for the tasks in your DAG.
Subsequent DAG Runs are created by the scheduler process, based on your DAG’s ``schedule_interval``,
sequentially. If your start_date is 2020-01-01 and schedule_interval is @daily, the first run
will be created on 2020-01-02 i.e., after your start date has passed.
A DAG run is scheduled *after* its associated data interval has ended, to ensure
the run is able to collect all the data within the time period. Therefore, a run
covering the data period of 2020-01-01 will not start to run until 2020-01-01
has ended, i.e. after 2020-01-02 00:00:00.

All dates in Airflow are tied to the data interval concept in some way. The
"logical date" (also called ``execution_date`` in Airflow versions prior to 2.2)
of a DAG run, for example, usually denotes the start of the data interval, not
when the DAG is actually executed.

Similarly, since the ``start_date`` argument for the DAG and its tasks points to
the same logical date, it marks the start of *the DAG's fist data interval*, not
when tasks in the DAG will start running. In other words, a DAG run will only be
scheduled one interval after ``start_date``.

.. tip::

If ``schedule_interval`` is not enough to express your DAG's schedule,
logical date, or data interval, see :doc:`Customizing imetables </howto/timetable>`.

Re-run DAG
''''''''''
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/howto/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ configuring an Airflow environment.
set-config
set-up-database
operator/index
timetable
customize-state-colors-ui
customize-dag-ui-page-instance-name
custom-operator
Expand Down
Loading

0 comments on commit 9238ce6

Please sign in to comment.