diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py index 20ec7cd3b2651..c42604deb59d9 100644 --- a/airflow/models/dagrun.py +++ b/airflow/models/dagrun.py @@ -604,11 +604,13 @@ def update_state( self.data_interval_end, self.dag_hash, ) + session.flush() self._emit_true_scheduling_delay_stats_for_finished_state(finished_tasks) self._emit_duration_stats_for_finished_state() session.merge(self) + # We do not flush here for performance reasons(It increases queries count by +20) return schedulable_tis, callback diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 718572004016e..168d452d6ab60 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1180,6 +1180,41 @@ def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_make assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0 assert orm_dag.next_dagrun_create_after is None + def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session): + """ + Test that when creating runs once max_active_runs is reached the runs does not stick + """ + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor(do_update=True) + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + with dag_maker(max_active_runs=1, session=session) as dag: + # Need to use something that doesn't immediately get marked as success by the scheduler + BashOperator(task_id='task', bash_command='true') + + dag_run = dag_maker.create_dagrun( + state=State.RUNNING, + session=session, + ) + + # Reach max_active_runs + for _ in range(3): + self.scheduler_job._do_scheduling(session) + + # Complete dagrun + # Add dag_run back in to the session (_do_scheduling does an expunge_all) + dag_run = session.merge(dag_run) + session.refresh(dag_run) + dag_run.get_task_instance(task_id='task', session=session).state = State.SUCCESS + + # create new run + for _ in range(3): + self.scheduler_job._do_scheduling(session) + + # Assert that new runs has created + dag_runs = DagRun.find(dag_id=dag.dag_id, session=session) + assert len(dag_runs) == 2 + def test_dagrun_timeout_verify_max_active_runs(self, dag_maker): """ Test if a a dagrun will not be scheduled if max_dag_runs