diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 58974c39be239..eeec4d5b099b2 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -540,7 +540,7 @@ def update_state( ) leaf_task_ids = {t.task_id for t in dag.leaves} - leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids] + leaf_tis = [ti for ti in tis if ti.task_id in leaf_task_ids if ti.state != TaskInstanceState.REMOVED] # if all roots finished and at least one failed, the run failed if not unfinished_tis and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis): diff --git a/airflow/utils/state.py b/airflow/utils/state.py index 8415dd16667a8..a79169f86169f 100644 --- a/airflow/utils/state.py +++ b/airflow/utils/state.py @@ -154,6 +154,7 @@ def color_fg(cls, state): TaskInstanceState.FAILED, TaskInstanceState.SKIPPED, TaskInstanceState.UPSTREAM_FAILED, + TaskInstanceState.REMOVED, ] ) """ diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index f73f5d1c45147..14f4b7f34b0ca 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -196,6 +196,29 @@ def test_dagrun_success_when_all_skipped(self, session): dag_run.update_state() assert DagRunState.SUCCESS == dag_run.state + def test_dagrun_not_stuck_in_running_when_all_tasks_instances_are_removed(self, session): + """ + Tests that a DAG run succeeds when all tasks are removed + """ + dag = DAG(dag_id='test_dagrun_success_when_all_skipped', start_date=timezone.datetime(2017, 1, 1)) + dag_task1 = ShortCircuitOperator( + task_id='test_short_circuit_false', dag=dag, python_callable=lambda: False + ) + dag_task2 = EmptyOperator(task_id='test_state_skipped1', dag=dag) + dag_task3 = EmptyOperator(task_id='test_state_skipped2', dag=dag) + dag_task1.set_downstream(dag_task2) + dag_task2.set_downstream(dag_task3) + + initial_task_states = { + 'test_short_circuit_false': TaskInstanceState.REMOVED, + 'test_state_skipped1': TaskInstanceState.REMOVED, + 'test_state_skipped2': TaskInstanceState.REMOVED, + } + + dag_run = self.create_dag_run(dag=dag, task_states=initial_task_states, session=session) + dag_run.update_state() + assert DagRunState.SUCCESS == dag_run.state + def test_dagrun_success_conditions(self, session): dag = DAG('test_dagrun_success_conditions', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'})