Skip to content

Commit

Permalink
Add 'reschedule' to the serialized fields for the BaseSensorOperator (#…
Browse files Browse the repository at this point in the history
…23674)

fix #23411

(cherry picked from commit f9e2a30)
  • Loading branch information
davidcaron authored and ephraimbuddy committed May 19, 2022
1 parent 1c37a96 commit 0ddc90f
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
4 changes: 4 additions & 0 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ def reschedule(self):
"""Define mode rescheduled sensors."""
return self.mode == 'reschedule'

@classmethod
def get_serialized_fields(cls):
return super().get_serialized_fields() | {"reschedule"}


def poke_mode_only(cls):
"""
Expand Down
1 change: 1 addition & 0 deletions tests/serialization/test_dag_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,7 @@ def poke(self, context: Context):
assert "deps" in blob

serialized_op = SerializedBaseOperator.deserialize_operator(blob)
assert serialized_op.reschedule == (mode == "reschedule")
assert op.deps == serialized_op.deps

@pytest.mark.parametrize(
Expand Down
7 changes: 6 additions & 1 deletion tests/ti_deps/deps/test_ready_to_reschedule_dep.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
class TestNotInReschedulePeriodDep(unittest.TestCase):
def _get_task_instance(self, state):
dag = DAG('test_dag')
task = Mock(dag=dag)
task = Mock(dag=dag, reschedule=True)
ti = TaskInstance(task=task, state=state, run_id=None)
return ti

Expand All @@ -52,6 +52,11 @@ def test_should_pass_if_ignore_in_reschedule_period_is_set(self):
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)

def test_should_pass_if_not_reschedule_mode(self):
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
del ti.task.reschedule
assert ReadyToRescheduleDep().is_met(ti=ti)

def test_should_pass_if_not_in_none_state(self):
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)
Expand Down

0 comments on commit 0ddc90f

Please sign in to comment.