Skip to content

Commit

Permalink
Add TaskInstance State 'REMOVED' to finished states and success states (
Browse files Browse the repository at this point in the history
#23797)

Now that we support dynamic task mapping, we should have the 'REMOVED'
state of task instances as a finished state because
for dynamic tasks with a removed task instance, the dagrun would be stuck in
running state if 'REMOVED' state is not in finished states.
  • Loading branch information
ephraimbuddy authored May 28, 2022
1 parent 33eef7b commit 73446f2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ def update_state(
)

leaf_task_ids = {t.task_id for t in dag.leaves}
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids]
leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids if ti.state != TaskInstanceState.REMOVED]

# if all roots finished and at least one failed, the run failed
if not unfinished_tis and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis):
Expand Down
1 change: 1 addition & 0 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def color_fg(cls, state):
TaskInstanceState.FAILED,
TaskInstanceState.SKIPPED,
TaskInstanceState.UPSTREAM_FAILED,
TaskInstanceState.REMOVED,
]
)
"""
Expand Down
23 changes: 23 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,29 @@ def test_dagrun_success_when_all_skipped(self, session):
dag_run.update_state()
assert DagRunState.SUCCESS == dag_run.state

def test_dagrun_not_stuck_in_running_when_all_tasks_instances_are_removed(self, session):
"""
Tests that a DAG run succeeds when all tasks are removed
"""
dag = DAG(dag_id='test_dagrun_success_when_all_skipped', start_date=timezone.datetime(2017, 1, 1))
dag_task1 = ShortCircuitOperator(
task_id='test_short_circuit_false', dag=dag, python_callable=lambda: False
)
dag_task2 = EmptyOperator(task_id='test_state_skipped1', dag=dag)
dag_task3 = EmptyOperator(task_id='test_state_skipped2', dag=dag)
dag_task1.set_downstream(dag_task2)
dag_task2.set_downstream(dag_task3)

initial_task_states = {
'test_short_circuit_false': TaskInstanceState.REMOVED,
'test_state_skipped1': TaskInstanceState.REMOVED,
'test_state_skipped2': TaskInstanceState.REMOVED,
}

dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session)
dag_run.update_state()
assert DagRunState.SUCCESS == dag_run.state

def test_dagrun_success_conditions(self, session):
dag = DAG('test_dagrun_success_conditions', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})

Expand Down

0 comments on commit 73446f2

Please sign in to comment.