Skip to content

Commit

Permalink
Only execute TIs of running DagRuns
Browse files Browse the repository at this point in the history
Since we can no longer have TIs without DagRun, we can
also, stop executing TIs if the DagRun is not in a RUNNING state.

Less work for the Scheduler
  • Loading branch information
ephraimbuddy committed Dec 10, 2021
1 parent 64bb592 commit c67aa62
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =
query = (
session.query(TI)
.join(TI.dag_run)
.filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state != DagRunState.QUEUED)
.filter(DR.run_type != DagRunType.BACKFILL_JOB, DR.state == DagRunState.RUNNING)
.join(TI.dag_model)
.filter(not_(DM.is_paused))
.filter(TI.state == State.SCHEDULED)
Expand Down
34 changes: 34 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,40 @@ def test_find_executable_task_instances_pool(self, dag_maker):
assert tis[3].key in res_keys
session.rollback()

@pytest.mark.parametrize(
"state, total_executed_ti",
[
(DagRunState.SUCCESS, 0),
(DagRunState.FAILED, 0),
(DagRunState.RUNNING, 2),
(DagRunState.QUEUED, 0),
],
)
def test_find_executable_task_instances_only_running_dagruns(
self, state, total_executed_ti, dag_maker, session
):
"""Test that only task instances of 'running' dagruns are executed"""
dag_id = 'SchedulerJobTest.test_find_executable_task_instances_only_running_dagruns'
task_id_1 = 'dummy'
task_id_2 = 'dummydummy'

with dag_maker(dag_id=dag_id, session=session):
DummyOperator(task_id=task_id_1)
DummyOperator(task_id=task_id_2)

self.scheduler_job = SchedulerJob(subdir=os.devnull)

dr = dag_maker.create_dagrun(state=state)

tis = dr.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.scheduler_job._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert total_executed_ti == len(res)

def test_find_executable_task_instances_order_execution_date(self, dag_maker):
"""
Test that task instances follow execution_date order priority. If two dagruns with
Expand Down

0 comments on commit c67aa62

Please sign in to comment.