Skip to content

Commit

Permalink
Rewrite example with new last_automated_dagrun
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Sep 13, 2021
1 parent 8897d1a commit 159b186
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 47 deletions.
39 changes: 19 additions & 20 deletions airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,34 @@ def infer_data_interval(self, run_after: DateTime) -> DataInterval:
# [START howto_timetable_next_dagrun_info]
def next_dagrun_info(
self,
last_automated_dagrun: Optional[DateTime],
last_automated_dagrun: Optional[DataInterval],
restriction: TimeRestriction,
) -> Optional[DagRunInfo]:
if last_automated_dagrun is not None:
# There was a previous run on the regular schedule.
# last_automated_dagrun os the last run's logical date.
weekday = last_automated_dagrun.weekday()
if 0 <= weekday < 4: # Monday through Thursday -- next is tomorrow.
if last_automated_dagrun is not None: # There was a previous run on the regular schedule.
last_start_weekday = last_automated_dagrun.start.weekday()
if 0 <= last_start_weekday < 4: # Last run on Monday through Thursday -- next is tomorrow.
delta = timedelta(days=1)
else: # Week is ending -- skip to next Monday.
delta = timedelta(days=(7 - weekday))
start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min)
else: # Last run on Friday -- skip to next Monday.
delta = timedelta(days=(7 - last_start_weekday))
next_start = DateTime.combine((last_automated_dagrun + delta).date(), Time.min)
else: # This is the first ever run on the regular schedule.
if restriction.earliest is None: # No start_date. Don't schedule.
return None
start = restriction.earliest
if start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
start = DateTime.combine(start.date(), Time.min).replace(tzinfo=UTC) + timedelta(days=1)
next_start = restriction.earliest
if not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
start = max(start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
weekday = start.weekday()
if weekday in (5, 6): # If 'start' is in the weekend, go to next Monday.
delta = timedelta(days=(7 - weekday))
start = start + delta
if start > restriction.latest: # Over the DAG's scheduled end; don't schedule.
next_start = max(next_start, DateTime.combine(Date.today(), Time.min).replace(tzinfo=UTC))
elif next_start.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_day = next_start.date() + timedelta(days=1)
next_start = DateTime.combine(next_day, Time.min).replace(tzinfo=UTC)
next_start_weekday = next_start.weekday()
if next_start_weekday in (5, 6): # If next start is in the weekend, go to next Monday.
delta = timedelta(days=(7 - next_start_weekday))
next_start = next_start + delta
if next_start > restriction.latest: # Over the DAG's scheduled end; don't schedule.
return None
return DagRunInfo.interval(start=start, end=(start + timedelta(days=1)))
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

# [END howto_timetable_next_dagrun_info]

Expand Down
40 changes: 13 additions & 27 deletions docs/apache-airflow/howto/timetable.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ Next is the implementation of ``next_dagrun_info``:
:end-before: [END howto_timetable_next_dagrun_info]

This method accepts two arguments. ``last_automated_dagrun`` is a
``pendulum.DateTime`` object indicating the logical date of this DAG's previous
non-manually-triggered run, or ``None`` if this is the first time ever the DAG
is being scheduled. ``restriction`` encapsulates how the DAG and its tasks
specify the schedule, and contains three attributes:
:class:`~airflow.timetables.base.DataInterval` instance indicating the data
interval of this DAG's previous non-manually-triggered run, or ``None`` if this
is the first time ever the DAG is being scheduled. ``restriction`` encapsulates
how the DAG and its tasks specify the schedule, and contains three attributes:

* ``earliest``: The earliest time the DAG may be scheduled. This is a
``pendulum.DateTime`` calculated from all the ``start_date`` arguments from
Expand Down Expand Up @@ -172,16 +172,10 @@ If we decide to schedule a run, we need to describe it with a
attributes:

* ``data_interval``: A :class:`~airflow.timetables.base.DataInterval` instance
like ``infer_data_interval``'s return value. This describes the next run's
data interval.
describing the next run's data interval.
* ``run_after``: A ``pendulum.DateTime`` instance that tells the scheduler when
the DAG run can be scheduled.

.. note::

In case you're wondering---yes, the argument and return value of
``infer_data_interval`` are also internally combined into a ``DagRunInfo``.

A ``DagRunInfo`` can be created like this:

.. code-block:: python
Expand Down Expand Up @@ -224,11 +218,6 @@ purpose, we'd want to do something like:

.. code-block:: python
from datetime import timedelta
from pendulum import DateTime, Time
class SometimeAfterWorkdayTimetable(Timetable):
def __init__(self, schedule_at: Time) -> None:
self._schedule_at = schedule_at
Expand All @@ -238,9 +227,7 @@ purpose, we'd want to do something like:
end = start + timedelta(days=1)
return DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=DateTime.combine(end.date(), self._schedule_at).replace(
tzinfo=end.tzinfo
),
run_after=DateTime.combine(end.date(), self._schedule_at),
)
However, since the timetable is a part of the DAG, we need to tell Airflow how
Expand All @@ -249,13 +236,15 @@ implementing two additional methods on our timetable class:

.. code-block:: python
def serialize(self) -> Dict[str, Any]:
return {"schedule_at": self._schedule_at.isoformat()}
class SometimeAfterWorkdayTimetable(Timetable):
...
def serialize(self) -> Dict[str, Any]:
return {"schedule_at": self._schedule_at.isoformat()}
@classmethod
def deserialize(cls, value: Dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["schedule_at"]))
@classmethod
def deserialize(cls, value: Dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["schedule_at"]))
When the DAG is being serialized, ``serialize`` is called to obtain a
JSON-serializable value. That value is passed to ``deserialize`` when the
Expand All @@ -281,9 +270,6 @@ So for a DAG declared like this:

.. code-block:: python
from pendulum import Time
with DAG(
timetable=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
Expand Down

0 comments on commit 159b186

Please sign in to comment.