From 3e8782a485d647886642cfeac0f10c6f294d1bce Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Sat, 6 Nov 2021 06:48:15 +0800 Subject: [PATCH] Fix Serialization when``relativedelta`` is passed as ``schedule_interval`` (#19418) Also add relativedelta to timetable test cases to ensure this does not regress. Fix #19416 (cherry picked from commit b590cc8a976da9fa7fe8c5850bd16d3dc856c52c) --- airflow/timetables/interval.py | 5 +- tests/timetables/test_interval_timetable.py | 66 ++++++++++++++++++++- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/airflow/timetables/interval.py b/airflow/timetables/interval.py index a095565b54be5..d669cb652d153 100644 --- a/airflow/timetables/interval.py +++ b/airflow/timetables/interval.py @@ -271,8 +271,9 @@ def serialize(self) -> Dict[str, Any]: return {"delta": delta} def validate(self) -> None: - if self._delta.total_seconds() <= 0: - raise AirflowTimetableInvalid("schedule interval must be positive") + now = datetime.datetime.now() + if (now + self._delta) <= now: + raise AirflowTimetableInvalid(f"schedule interval must be positive, not {self._delta!r}") def _get_next(self, current: DateTime) -> DateTime: return convert_to_utc(current + self._delta) diff --git a/tests/timetables/test_interval_timetable.py b/tests/timetables/test_interval_timetable.py index 53f5aebebcf21..842cc1f234f3c 100644 --- a/tests/timetables/test_interval_timetable.py +++ b/tests/timetables/test_interval_timetable.py @@ -18,10 +18,12 @@ import datetime from typing import Optional +import dateutil.relativedelta import freezegun import pendulum import pytest +from airflow.exceptions import AirflowTimetableInvalid from airflow.settings import TIMEZONE from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable @@ -35,12 +37,17 @@ CURRENT_TIME = pendulum.DateTime(2021, 9, 7, tzinfo=TIMEZONE) HOURLY_CRON_TIMETABLE = CronDataIntervalTimetable("@hourly", TIMEZONE) -HOURLY_DELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) +HOURLY_TIMEDELTA_TIMETABLE = DeltaDataIntervalTimetable(datetime.timedelta(hours=1)) +HOURLY_RELATIVEDELTA_TIMETABLE = DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(hours=1)) @pytest.mark.parametrize( "timetable", - [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")], + [ + pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], ) @pytest.mark.parametrize( "last_automated_data_interval", @@ -62,7 +69,11 @@ def test_no_catchup_next_info_starts_at_current_time( @pytest.mark.parametrize( "timetable", - [pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), pytest.param(HOURLY_DELTA_TIMETABLE, id="delta")], + [ + pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], ) def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable) -> None: """If ``catchup=True``, the next interval starts at the previous's end.""" @@ -72,3 +83,52 @@ def test_catchup_next_info_starts_at_previous_interval_end(timetable: Timetable) ) expected_end = PREV_DATA_INTERVAL_END + datetime.timedelta(hours=1) assert next_info == DagRunInfo.interval(start=PREV_DATA_INTERVAL_END, end=expected_end) + + +@pytest.mark.parametrize( + "timetable", + [ + pytest.param(HOURLY_CRON_TIMETABLE, id="cron"), + pytest.param(HOURLY_TIMEDELTA_TIMETABLE, id="timedelta"), + pytest.param(HOURLY_RELATIVEDELTA_TIMETABLE, id="relativedelta"), + ], +) +def test_validate_success(timetable: Timetable) -> None: + timetable.validate() + + +@pytest.mark.parametrize( + "timetable, error_message", + [ + pytest.param( + CronDataIntervalTimetable("0 0 1 13 0", TIMEZONE), + "[0 0 1 13 0] is not acceptable, out of range", + id="invalid-cron", + ), + pytest.param( + DeltaDataIntervalTimetable(datetime.timedelta()), + "schedule interval must be positive, not datetime.timedelta(0)", + id="zero-timedelta", + ), + pytest.param( + DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta()), + "schedule interval must be positive, not relativedelta()", + id="zero-relativedelta", + ), + pytest.param( + DeltaDataIntervalTimetable(datetime.timedelta(days=-1)), + # Dynamically formatted since different Python versions display timedelta differently. + f"schedule interval must be positive, not {datetime.timedelta(days=-1)!r}", + id="negative-timedelta", + ), + pytest.param( + DeltaDataIntervalTimetable(dateutil.relativedelta.relativedelta(days=-1)), + "schedule interval must be positive, not relativedelta(days=-1)", + id="negative-relativedelta", + ), + ], +) +def test_validate_failure(timetable: Timetable, error_message: str) -> None: + with pytest.raises(AirflowTimetableInvalid) as ctx: + timetable.validate() + assert str(ctx.value) == error_message