Skip to content

Commit

Permalink
Fix Serialization whenrelativedelta is passed as ``schedule_inter…
Browse files Browse the repository at this point in the history
…val`` (#19418)

Also add relativedelta to timetable test cases to ensure this does not
regress.

Fix #19416

(cherry picked from commit b590cc8)
  • Loading branch information
uranusjr authored and jedcunningham committed Nov 5, 2021
1 parent d478f93 commit 3e8782a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 5 deletions.
5 changes: 3 additions & 2 deletions airflow/timetables/interval.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ def serialize(self) -> Dict[str, Any]:
return {"delta": delta}

def validate(self) -> None:
if self._delta.total_seconds() <= 0:
raise AirflowTimetableInvalid("schedule interval must be positive")
now = datetime.datetime.now()
if (now + self._delta) <= now:
raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}")

def _get_next(self, current: DateTime) -> DateTime:
return convert_to_utc(current + self._delta)
Expand Down
66 changes: 63 additions & 3 deletions tests/timetables/test_interval_timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
import datetime
from typing import Optional

import dateutil.relativedelta
import freezegun
import pendulum
import pytest

from airflow.exceptions import AirflowTimetableInvalid
from airflow.settings import TIMEZONE
from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
Expand All @@ -35,12 +37,17 @@
CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE)

HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE)
HOURLY_DELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1))
HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1))


@pytest.mark.parametrize(
"timetable",
[pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
[
pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
],
)
@pytest.mark.parametrize(
"last_automated_data_interval",
Expand All @@ -62,7 +69,11 @@ def test_no_catchup_next_info_starts_at_current_time(

@pytest.mark.parametrize(
"timetable",
[pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")],
[
pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
],
)
def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable) -> None:
"""If ``catchup=True``, the next interval starts at the previous's end."""
Expand All @@ -72,3 +83,52 @@ def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable)
)
expected_end = PREV_DATA_INTERVAL_END + datetime.timedelta(hours=1)
assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, end=expected_end)


@pytest.mark.parametrize(
"timetable",
[
pytest.param(HOURLY_CRON_TIMETABLE, id="cron"),
pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"),
pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"),
],
)
def test_validate_success(timetable: Timetable) -> None:
timetable.validate()


@pytest.mark.parametrize(
"timetable, error_message",
[
pytest.param(
CronDataIntervalTimetable("0 0 1 13 0", TIMEZONE),
"[0 0 1 13 0] is not acceptable, out of range",
id="invalid-cron",
),
pytest.param(
DeltaDataIntervalTimetable(datetime.timedelta()),
"schedule interval must be positive, not datetime.timedelta(0)",
id="zero-timedelta",
),
pytest.param(
DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta()),
"schedule interval must be positive, not relativedelta()",
id="zero-relativedelta",
),
pytest.param(
DeltaDataIntervalTimetable(datetime.timedelta(days=-1)),
# Dynamically formatted since different Python versions display timedelta differently.
f"schedule interval must be positive, not {datetime.timedelta(days=-1)!r}",
id="negative-timedelta",
),
pytest.param(
DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(days=-1)),
"schedule interval must be positive, not relativedelta(days=-1)",
id="negative-relativedelta",
),
],
)
def test_validate_failure(timetable: Timetable, error_message: str) -> None:
with pytest.raises(AirflowTimetableInvalid) as ctx:
timetable.validate()
assert str(ctx.value) == error_message

0 comments on commit 3e8782a

Please sign in to comment.