diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 2a230a77cd8b7..44c4df7b5e3fe 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -914,26 +914,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) - creating_job_id=self.id, ) active_runs_of_dags[dag.dag_id] += 1 - self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]) + if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]): + dag_model.calculate_dagrun_date_fields(dag, data_interval) # TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in # memory for larger dags? or expunge_all() - def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None: - """ - Update the next_dagrun, next_dagrun_data_interval_start/end - and next_dagrun_create_after for this dag. - """ - if total_active_runs >= dag_model.max_active_runs: + def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool: + """Check if the dag's next_dagruns_create_after should be updated.""" + if total_active_runs >= dag.max_active_runs: self.log.info( "DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs", dag_model.dag_id, total_active_runs, - dag_model.max_active_runs, + dag.max_active_runs, ) dag_model.next_dagrun_create_after = None - else: - data_interval = dag.get_next_data_interval(dag_model) - dag_model.calculate_dagrun_date_fields(dag, data_interval) + return False + return True def _start_queued_dagruns( self, @@ -1016,7 +1013,8 @@ def _schedule_dag_run( self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) active_runs = dag.get_num_active_runs(only_running=False, session=session) # Work out if we should allow creating a new DagRun now? - self._update_dag_next_dagruns(dag, dag_model, active_runs) + if self._should_update_dag_next_dagruns(dag, dag_model, active_runs): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) callback_to_execute = DagCallbackRequest( full_filepath=dag.fileloc, @@ -1041,7 +1039,8 @@ def _schedule_dag_run( if dag_run.state in State.finished: active_runs = dag.get_num_active_runs(only_running=False, session=session) # Work out if we should allow creating a new DagRun now? - self._update_dag_next_dagruns(dag, dag_model, active_runs) + if self._should_update_dag_next_dagruns(dag, dag_model, active_runs): + dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run)) # This will do one query per dag run. We "could" build up a complex # query to update all the TIs across all the execution dates and dag diff --git a/airflow/models/dag.py b/airflow/models/dag.py index d1c947fd32342..4e6f6cdba215f 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -629,6 +629,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval data_interval = dag_model.next_dagrun_data_interval if data_interval is not None: return data_interval + # Compatibility: A run was scheduled without an explicit data interval. # This means the run was scheduled before AIP-39 implementation. Try to # infer from the logical date. diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 53809603d6d09..db05e7d98b270 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -1274,8 +1274,8 @@ def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker): session.flush() session.refresh(dr) assert dr.state == State.FAILED - # check that next_dagrun has been updated by Schedulerjob._update_dag_next_dagruns - assert dag_maker.dag_model.next_dagrun == dr.execution_date + timedelta(days=1) + # check that next_dagrun_create_after has been updated by calculate_dagrun_date_fields + assert dag_maker.dag_model.next_dagrun_create_after == dr.execution_date + timedelta(days=1) # check that no running/queued runs yet assert ( session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count() @@ -2871,6 +2871,65 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak == 0 ) + def test_max_active_runs_creation_phasing(self, dag_maker, session): + """ + Test that when creating runs once max_active_runs is reached that the runs come in the right order + without gaps + """ + + def complete_one_dagrun(): + ti = ( + session.query(TaskInstance) + .join(TaskInstance.dag_run) + .filter(TaskInstance.state != State.SUCCESS) + .order_by(DagRun.execution_date) + .first() + ) + if ti: + ti.state = State.SUCCESS + session.flush() + + with dag_maker(max_active_runs=3, session=session) as dag: + # Need to use something that doesn't immediately get marked as success by the scheduler + BashOperator(task_id='task', bash_command='true') + + self.scheduler_job = SchedulerJob(subdir=os.devnull) + self.scheduler_job.executor = MockExecutor(do_update=True) + self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent) + + DagModel.dags_needing_dagruns(session).all() + for _ in range(3): + self.scheduler_job._do_scheduling(session) + + model: DagModel = session.query(DagModel).get(dag.dag_id) + + # Pre-condition + assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3} + + assert model.next_dagrun == timezone.convert_to_utc( + timezone.DateTime( + 2016, + 1, + 3, + ) + ) + assert model.next_dagrun_create_after is None + + complete_one_dagrun() + + assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3} + + for _ in range(5): + self.scheduler_job._do_scheduling(session) + complete_one_dagrun() + model: DagModel = session.query(DagModel).get(dag.dag_id) + + expected_execution_dates = [datetime.datetime(2016, 1, d, tzinfo=timezone.utc) for d in range(1, 6)] + dagrun_execution_dates = [ + dr.execution_date for dr in session.query(DagRun).order_by(DagRun.execution_date).all() + ] + assert dagrun_execution_dates == expected_execution_dates + def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker): """ Make sure that when a DAG is already at max_active_runs, that manually triggered