Skip to content

Commit

Permalink
Fix max_active_runs=1 not scheduling runs when min_file_process_inter…
Browse files Browse the repository at this point in the history
…val is high (#21413)

The finished dagrun was still being seen as running when we call dag.get_num_active_runs
because the session was not flushed. This PR fixes it

(cherry picked from commit feea143)
  • Loading branch information
ephraimbuddy committed Mar 16, 2022
1 parent a80213c commit 68e6e8f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,11 +604,13 @@ def update_state(
self.data_interval_end,
self.dag_hash,
)
session.flush()

self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks)
self._emit_duration_stats_for_finished_state()

session.merge(self)
# We do not flush here for performance reasons(It increases queries count by +20)

return schedulable_tis, callback

Expand Down
35 changes: 35 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,41 @@ def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_make
assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0
assert orm_dag.next_dagrun_create_after is None

def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached the runs does not stick
"""
self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor(do_update=True)
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

with dag_maker(max_active_runs=1, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id='task', bash_command='true')

dag_run = dag_maker.create_dagrun(
state=State.RUNNING,
session=session,
)

# Reach max_active_runs
for _ in range(3):
self.scheduler_job._do_scheduling(session)

# Complete dagrun
# Add dag_run back in to the session (_do_scheduling does an expunge_all)
dag_run = session.merge(dag_run)
session.refresh(dag_run)
dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS

# create new run
for _ in range(3):
self.scheduler_job._do_scheduling(session)

# Assert that new runs has created
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2

def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
"""
Test if a a dagrun will not be scheduled if max_dag_runs
Expand Down

0 comments on commit 68e6e8f

Please sign in to comment.