From 32d3429b8aef442640d722c3367a448fa90c5b94 Mon Sep 17 00:00:00 2001 From: David Caron Date: Thu, 12 May 2022 10:38:18 -0400 Subject: [PATCH 1/2] add 'reschedule' to the serialized fields... for BaseSensorOperator fix #23411 --- airflow/sensors/base.py | 4 ++++ tests/ti_deps/deps/test_ready_to_reschedule_dep.py | 7 ++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py index 5cbe009c82b95..590b48f04bc73 100644 --- a/airflow/sensors/base.py +++ b/airflow/sensors/base.py @@ -339,6 +339,10 @@ def reschedule(self): """Define mode rescheduled sensors.""" return self.mode == 'reschedule' + @classmethod + def get_serialized_fields(cls): + return super().get_serialized_fields() | {"reschedule"} + def poke_mode_only(cls): """ diff --git a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py index 470166db21c8d..99416bbbc8927 100644 --- a/tests/ti_deps/deps/test_ready_to_reschedule_dep.py +++ b/tests/ti_deps/deps/test_ready_to_reschedule_dep.py @@ -31,7 +31,7 @@ class TestNotInReschedulePeriodDep(unittest.TestCase): def _get_task_instance(self, state): dag = DAG('test_dag') - task = Mock(dag=dag) + task = Mock(dag=dag, reschedule=True) ti = TaskInstance(task=task, state=state, run_id=None) return ti @@ -52,6 +52,11 @@ def test_should_pass_if_ignore_in_reschedule_period_is_set(self): dep_context = DepContext(ignore_in_reschedule_period=True) assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context) + def test_should_pass_if_not_reschedule_mode(self): + ti = self._get_task_instance(State.UP_FOR_RESCHEDULE) + del ti.task.reschedule + assert ReadyToRescheduleDep().is_met(ti=ti) + def test_should_pass_if_not_in_none_state(self): ti = self._get_task_instance(State.UP_FOR_RETRY) assert ReadyToRescheduleDep().is_met(ti=ti) From eee4b5c6895bcbc15ccea73ceb4baec6fd789202 Mon Sep 17 00:00:00 2001 From: David Caron Date: Thu, 12 May 2022 11:12:41 -0400 Subject: [PATCH 2/2] add 'reschedule' assert in test_serialize_sensor --- tests/serialization/test_dag_serialization.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 0144501f1a13d..fe9fc7c7e5447 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -1462,6 +1462,7 @@ def poke(self, context: Context): assert "deps" in blob serialized_op = SerializedBaseOperator.deserialize_operator(blob) + assert serialized_op.reschedule == (mode == "reschedule") assert op.deps == serialized_op.deps @pytest.mark.parametrize(