From 0ddc90f12020cd7197189a80ea56a7ec0a81273b Mon Sep 17 00:00:00 2001 From: David Caron Date: Tue, 17 May 2022 08:18:29 -0400 Subject: [PATCH] Add 'reschedule' to the serialized fields for the BaseSensorOperator (#23674) fix #23411 (cherry picked from commit f9e2a3051cd3a5b6fcf33bca4c929d220cf5661e) --- airflow/sensors/base.py | 4 ++++ tests/serialization/test_dag_serialization.py | 1 + tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 7 ++++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 7f1cd87c3dd65..f00b3a6761d5b 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -339,6 +339,10 @@ def reschedule(self): """Define mode rescheduled sensors.""" return self.mode == 'reschedule' + @classmethod + def get_serialized_fields(cls): + return super().get_serialized_fields() | {"reschedule"} + def poke_mode_only(cls): """ diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 0144501f1a13d..fe9fc7c7e5447 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1462,6 +1462,7 @@ def poke(self, context: Context): assert "deps" in blob serialized_op = SerializedBaseOperator.deserialize_operator(blob) + assert serialized_op.reschedule == (mode == "reschedule") assert op.deps == serialized_op.deps @pytest.mark.parametrize( diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py index 470166db21c8d..99416bbbc8927 100644 --- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -31,7 +31,7 @@ class TestNotInReschedulePeriodDep(unittest.TestCase): def _get_task_instance(self, state): dag = DAG('test_dag') - task = Mock(dag=dag) + task = Mock(dag=dag, reschedule=True) ti = TaskInstance(task=task, state=state, run_id=None) return ti @@ -52,6 +52,11 @@ def test_should_pass_if_ignore_in_reschedule_period_is_set(self): dep_context = DepContext(ignore_in_reschedule_period=True) assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context) + def test_should_pass_if_not_reschedule_mode(self): + ti = self._get_task_instance(State.UP_FOR_RESCHEDULE) + del ti.task.reschedule + assert ReadyToRescheduleDep().is_met(ti=ti) + def test_should_pass_if_not_in_none_state(self): ti = self._get_task_instance(State.UP_FOR_RETRY) assert ReadyToRescheduleDep().is_met(ti=ti)