From c67aa629371dc2c7965535ca23fc375b87d4c94e Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Thu, 9 Dec 2021 22:03:05 +0100 Subject: [PATCH] Only execute TIs of running DagRuns Since we can no longer have TIs without DagRun, we can also, stop executing TIs if the DagRun is not in a RUNNING state. Less work for the Scheduler --- airflow/jobs/scheduler_job.py | 2 +- tests/jobs/test_scheduler_job.py | 34 ++++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 9cad91f7f2a78..51d7b41294697 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -273,7 +273,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session = query = ( session.query(TI) .join(TI.dag_run) - .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED) + .filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING) .join(TI.dag_model) .filter(not_(DM.is_paused)) .filter(TI.state == State.SCHEDULED) diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 5f4d854878a29..5e72feebb0573 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -473,6 +473,40 @@ def test_find_executable_task_instances_pool(self, dag_maker): assert tis[3].key in res_keys session.rollback() + @pytest.mark.parametrize( + "state, total_executed_ti", + [ + (DagRunState.SUCCESS, 0), + (DagRunState.FAILED, 0), + (DagRunState.RUNNING, 2), + (DagRunState.QUEUED, 0), + ], + ) + def test_find_executable_task_instances_only_running_dagruns( + self, state, total_executed_ti, dag_maker, session + ): + """Test that only task instances of 'running' dagruns are executed""" + dag_id = 'SchedulerJobTest.test_find_executable_task_instances_only_running_dagruns' + task_id_1 = 'dummy' + task_id_2 = 'dummydummy' + + with dag_maker(dag_id=dag_id, session=session): + DummyOperator(task_id=task_id_1) + DummyOperator(task_id=task_id_2) + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + + dr = dag_maker.create_dagrun(state=state) + + tis = dr.task_instances + for ti in tis: + ti.state = State.SCHEDULED + session.merge(ti) + session.flush() + res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session) + session.flush() + assert total_executed_ti == len(res) + def test_find_executable_task_instances_order_execution_date(self, dag_maker): """ Test that task instances follow execution_date order priority. If two dagruns with