Skip to content

Commit

Permalink
Implement CronTriggerTimetable (#23662)
Browse files Browse the repository at this point in the history
Relates #15432 

I also have gone through the above discussion and the related documents introduced there:
- [AIP-39 Richer scheduler_interval](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval)
- [Scoping out a new feature for 2.1: improving schedule_interval](https://lists.apache.org/thread.html/rb4e004e68574e5fb77ee5b51f4fd5bfb4b3392d884c178bc767681bf%40%3Cdev.airflow.apache.org%3E)
- [[DISCUSS][AIP-39] Richer (and pluggable) schedule_interval on DAGs](https://lists.apache.org/thread.html/rf8eeb06493681f48dc9e82ce605a9c3a930cfee0b4ca19462a4e55b3%40%3Cdev.airflow.apache.org%3E)

The default behavior of [`CronDataIntervalTimetable`](https://github.com/apache/airflow/blob/2.2.5/airflow/timetables/interval.py#L116) is a little bit confusing - a DAG Run is triggered immediately after the DAG is enabled (unpaused), unlike normal cron's behavior.

Hence I'd like to add another cron timetable which does not care about "data interval" and which starts a DAG Run at the start of the period.

I know even without this PR, each Airflow user can [customize DAG scheduling with Timetables](https://airflow.apache.org/docs/apache-airflow/stable/howto/timetable.html#). However, I assume this PR worth adding to the main repository so that the users can use the timetable easier.
  • Loading branch information
mai-nakagawa authored Aug 2, 2022
1 parent 2550066 commit d004841
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 30 deletions.
92 changes: 69 additions & 23 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ class _DataIntervalTimetable(Timetable):
instance), and schedule a DagRun at the end of each interval.
"""

def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
"""Bound the earliest time a run can be scheduled.
This is called when ``catchup=False``. See docstring of subclasses for
exact skipping behaviour of a schedule.
"""
raise NotImplementedError()

def _align(self, current: DateTime) -> DateTime:
"""Align given time to the scheduled.
Expand All @@ -66,6 +58,14 @@ def _get_prev(self, current: DateTime) -> DateTime:
"""Get the last schedule before the current time."""
raise NotImplementedError()

def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
"""Bound the earliest time a run can be scheduled.
This is called when ``catchup=False``. See docstring of subclasses for
exact skipping behaviour of a schedule.
"""
raise NotImplementedError()

def next_dagrun_info(
self,
*,
Expand Down Expand Up @@ -114,19 +114,7 @@ def _is_schedule_fixed(expression: str) -> bool:
return next_b.minute == next_a.minute and next_b.hour == next_a.hour


class CronDataIntervalTimetable(_DataIntervalTimetable):
"""Timetable that schedules data intervals with a cron expression.
This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
a five/six-segment representation, or one of ``cron_presets``.
The implementation extends on croniter to add timezone awareness. This is
because croniter works only with naive timestamps, and cannot consider DST
when determining the next/previous time.
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""

class _CronMixin:
def __init__(self, cron: str, timezone: Union[str, Timezone]) -> None:
self._expression = cron_presets.get(cron, cron)

Expand All @@ -151,14 +139,16 @@ def __init__(self, cron: str, timezone: Union[str, Timezone]) -> None:
def deserialize(cls, data: Dict[str, Any]) -> "Timetable":
from airflow.serialization.serialized_objects import decode_timezone

return cls(data["expression"], decode_timezone(data["timezone"]))
# We ignore typing on the next line because mypy expects it to return _CronMixin type.
# However, this should return Timetable since it should only be called against a timetable subclass
return cls(data["expression"], decode_timezone(data["timezone"])) # type: ignore

def __eq__(self, other: Any) -> bool:
"""Both expression and timezone should match.
This is only for testing purposes and should not be relied on otherwise.
"""
if not isinstance(other, CronDataIntervalTimetable):
if not isinstance(other, type(self)):
return NotImplemented
return self._expression == other._expression and self._timezone == other._timezone

Expand Down Expand Up @@ -214,6 +204,20 @@ def _align(self, current: DateTime) -> DateTime:
return next_time
return current


class CronDataIntervalTimetable(_CronMixin, _DataIntervalTimetable):
"""Timetable that schedules data intervals with a cron expression.
This corresponds to ``schedule_interval=<cron>``, where ``<cron>`` is either
a five/six-segment representation, or one of ``cron_presets``.
The implementation extends on croniter to add timezone awareness. This is
because croniter works only with naive timestamps, and cannot consider DST
when determining the next/previous time.
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""

def _skip_to_latest(self, earliest: Optional[DateTime]) -> DateTime:
"""Bound the earliest time a run can be scheduled.
Expand Down Expand Up @@ -245,6 +249,48 @@ def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval(start=self._get_prev(end), end=end)


class CronTriggerTimetable(_CronMixin, Timetable):
"""A cron-compliant timetable.
The main difference from ``CronDataIntervalTimetable`` is that a first
DAG Run is kicked off at the start of the period like a normal cron,
while a first DAG Run of ``CronDataIntervalTimetable`` starts immediately
after the DAG is registered.
Note that this timetable does not care the idea of *data interval*. It
means the value of ``data_interval_start``, ``data_interval_end`` and
legacy ``execution_date`` are the same - the time when a DAG run is triggered.
Don't pass ``@once`` in here; use ``OnceTimetable`` instead.
"""

def infer_manual_data_interval(self, *, run_after: DateTime) -> DataInterval:
return DataInterval.exact(run_after)

def next_dagrun_info(
self,
*,
last_automated_data_interval: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if restriction.catchup:
if last_automated_data_interval is None:
if restriction.earliest is None:
return None
next_start_time = self._align(restriction.earliest)
else:
next_start_time = self._get_next(last_automated_data_interval.end)
else:
current_time = DateTime.utcnow()
if restriction.earliest is not None and current_time < restriction.earliest:
next_start_time = self._align(restriction.earliest)
else:
next_start_time = self._align(current_time)
if restriction.latest is not None and restriction.latest < next_start_time:
return None
return DagRunInfo.exact(next_start_time)


class DeltaDataIntervalTimetable(_DataIntervalTimetable):
"""Timetable that schedules data intervals with a time delta.
Expand Down
86 changes: 80 additions & 6 deletions docs/apache-airflow/concepts/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ directly. The timetable also dictates the data interval and the logical time of
run created for the DAG.

Cron expressions and timedeltas are still supported (using
``CronDataIntervalTimetable`` and ``DeltaDataIntervalTimetable`` under the hood
`CronDataIntervalTimetable`_ and `DeltaDataIntervalTimetable`_ under the hood
respectively), however, there are situations where they cannot properly express
the schedule. Some examples are:

Expand Down Expand Up @@ -55,18 +55,28 @@ Built In Timetables
Airflow comes with several common timetables built in to cover the most common use cases. Additional timetables
may be available in plugins.

CronDataIntervalTimetable
^^^^^^^^^^^^^^^^^^^^^^^^^
.. _CronTriggerTimetable:

Set schedule based on a cron expression. Can be selected by providing a string that is a valid
cron expression to the ``schedule_interval`` parameter of a DAG as described in the :doc:`/concepts/dags` documentation.
CronTriggerTimetable
^^^^^^^^^^^^^^^^^^^^

A timetable which accepts a cron expression.

Note `CronDataIntervalTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.

.. code-block:: python
@dag(schedule_interval="0 1 * * 3") # At 01:00 on Wednesday.
from airflow.timetables.interval import CronTriggerTimetable
@dag(
timetable=CronTriggerTimetable(cron='0 1 * * 3', timezone='UTC'), # At 01:00 on Wednesday
)
def example_dag():
pass
.. _DeltaDataIntervalTimetable:

DeltaDataIntervalTimetable
^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand All @@ -79,6 +89,23 @@ Schedules data intervals with a time delta. Can be selected by providing a
def example_dag():
pass
.. _CronDataIntervalTimetable:

CronDataIntervalTimetable
^^^^^^^^^^^^^^^^^^^^^^^^^

Another timetable which accepts a cron expression. Can be selected by providing a string that is a valid
cron expression to the ``schedule_interval`` parameter of a DAG as described in the :doc:`/concepts/dags` documentation.


Note `CronTriggerTimetable`_ also accepts a cron expression. See `Differences between the two cron timetables`_.

.. code-block:: python
@dag(schedule_interval="0 1 * * 3") # At 01:00 on Wednesday.
def example_dag():
pass
EventsTimetable
^^^^^^^^^^^^^^^

Expand Down Expand Up @@ -109,3 +136,50 @@ first) event for the data interval, otherwise manual runs will run with a ``data
)
def example_dag():
pass
.. _Differences between the two cron timetables:

Differences between the two cron timetables
-------------------------------------------

There are two timetables `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ that accepts a cron expression.
There are some differences between the two:
- `CronTriggerTimetable`_ does not take care of *Data Interval*, while `CronDataIntervalTimetable`_ does.
- The time when a DAG run is triggered by `CronTriggerTimetable`_ is more intuitive and more similar to what people
expect cron to behave than that of `CronDataIntervalTimetable`_ (when ``catchup`` is ``False``).

Whether taking care of *Data Interval*
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

`CronTriggerTimetable`_ *does not* care the idea of *data interval*. It means the value of ``data_interval_start``,
``data_interval_end`` and legacy ``execution_date`` are the same - the time when a DAG run is triggered.

On the other hand, `CronDataIntervalTimetable`_ *does* care the idea of *data interval*. It means the value of
``data_interval_start`` and ``data_interval_end`` (and legacy ``execution_date``) are different. They are the start
and end of the interval respectively.

The time when a DAG run is triggered
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

There is no difference between the two when ``catchup`` is ``True``. :ref:`dag-catchup` tells you how DAG runs are
triggered when ``catchup`` is ``True``.

When ``catchup`` is ``False``, there is difference in how a new DAG run is triggered. `CronTriggerTimetable`_ triggers
a new DAG run *after* the current time, while `CronDataIntervalTimetable`_ does *before* the current time (assuming
the value of ``start_date`` is past time).

Here is an example showing how the first DAG run is triggered. Supposes there is a cron expression ``@daily`` or
``0 0 * * *``, which is aimed to run at 12AM every day. If you enable DAGs using the two timetables at 3PM on January
31st, `CronTriggerTimetable`_ will trigger a new DAG run at 12AM on February 1st. `CronDataIntervalTimetable`_, on the other
hand, will immediately trigger a new DAG run which is supposed to trigger at 12AM on January 31st if the DAG had been
enabled beforehand.

This is another example showing the difference in the case of skipping DAG runs. Suppose there are two running DAGs
using the two timetables with a cron expression ``@daily`` or ``0 0 * * *``. If you pause the DAGs at 3PM on January
31st and re-enable them at 3PM on February 2nd, `CronTriggerTimetable`_ skips the DAG runs which are supposed to
trigger on February 1st and 2nd. The next DAG run will be triggered at 12AM on February 3rd. `CronDataIntervalTimetable`_,
on the other hand, skips the DAG runs which are supposed to trigger on February 1st only. A DAG run for February 2nd
is immediately triggered after you re-enable the DAG.

By these examples, you see how `CronTriggerTimetable`_ triggers DAG runs is more intuitive and more similar to what
people expect cron to behave than how `CronDataIntervalTimetable`_ does.
Loading

0 comments on commit d004841

Please sign in to comment.