Skip to content

Commit

Permalink
Fix missing dagruns when catchup=True (#19528)
Browse files Browse the repository at this point in the history
There's a bug that when the max_active_runs is reached, run dates could skip.
This PR fixes it

Closes: #19461
(cherry picked from commit 2bd4b55)
  • Loading branch information
ephraimbuddy authored and kaxil committed Nov 11, 2021
1 parent eaead7d commit 2aed7c1
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
25 changes: 12 additions & 13 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,26 +914,23 @@ def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -
creating_job_id=self.id,
)
active_runs_of_dags[dag.dag_id] += 1
self._update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id])
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs_of_dags[dag.dag_id]):
dag_model.calculate_dagrun_date_fields(dag, data_interval)
# TODO[HA]: Should we do a session.flush() so we don't have to keep lots of state/object in
# memory for larger dags? or expunge_all()

def _update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> None:
"""
Update the next_dagrun, next_dagrun_data_interval_start/end
and next_dagrun_create_after for this dag.
"""
if total_active_runs >= dag_model.max_active_runs:
def _should_update_dag_next_dagruns(self, dag, dag_model: DagModel, total_active_runs) -> bool:
"""Check if the dag's next_dagruns_create_after should be updated."""
if total_active_runs >= dag.max_active_runs:
self.log.info(
"DAG %s is at (or above) max_active_runs (%d of %d), not creating any more runs",
dag_model.dag_id,
total_active_runs,
dag_model.max_active_runs,
dag.max_active_runs,
)
dag_model.next_dagrun_create_after = None
else:
data_interval = dag.get_next_data_interval(dag_model)
dag_model.calculate_dagrun_date_fields(dag, data_interval)
return False
return True

def _start_queued_dagruns(
self,
Expand Down Expand Up @@ -1016,7 +1013,8 @@ def _schedule_dag_run(
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
self._update_dag_next_dagruns(dag, dag_model, active_runs)
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))

callback_to_execute = DagCallbackRequest(
full_filepath=dag.fileloc,
Expand All @@ -1041,7 +1039,8 @@ def _schedule_dag_run(
if dag_run.state in State.finished:
active_runs = dag.get_num_active_runs(only_running=False, session=session)
# Work out if we should allow creating a new DagRun now?
self._update_dag_next_dagruns(dag, dag_model, active_runs)
if self._should_update_dag_next_dagruns(dag, dag_model, active_runs):
dag_model.calculate_dagrun_date_fields(dag, dag.get_run_data_interval(dag_run))

# This will do one query per dag run. We "could" build up a complex
# query to update all the TIs across all the execution dates and dag
Expand Down
1 change: 1 addition & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ def get_next_data_interval(self, dag_model: "DagModel") -> Optional[DataInterval
data_interval = dag_model.next_dagrun_data_interval
if data_interval is not None:
return data_interval

# Compatibility: A run was scheduled without an explicit data interval.
# This means the run was scheduled before AIP-39 implementation. Try to
# infer from the logical date.
Expand Down
63 changes: 61 additions & 2 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1274,8 +1274,8 @@ def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
session.flush()
session.refresh(dr)
assert dr.state == State.FAILED
# check that next_dagrun has been updated by Schedulerjob._update_dag_next_dagruns
assert dag_maker.dag_model.next_dagrun == dr.execution_date + timedelta(days=1)
# check that next_dagrun_create_after has been updated by calculate_dagrun_date_fields
assert dag_maker.dag_model.next_dagrun_create_after == dr.execution_date + timedelta(days=1)
# check that no running/queued runs yet
assert (
session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count()
Expand Down Expand Up @@ -2871,6 +2871,65 @@ def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_mak
== 0
)

def test_max_active_runs_creation_phasing(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached that the runs come in the right order
without gaps
"""

def complete_one_dagrun():
ti = (
session.query(TaskInstance)
.join(TaskInstance.dag_run)
.filter(TaskInstance.state != State.SUCCESS)
.order_by(DagRun.execution_date)
.first()
)
if ti:
ti.state = State.SUCCESS
session.flush()

with dag_maker(max_active_runs=3, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id='task', bash_command='true')

self.scheduler_job = SchedulerJob(subdir=os.devnull)
self.scheduler_job.executor = MockExecutor(do_update=True)
self.scheduler_job.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)

DagModel.dags_needing_dagruns(session).all()
for _ in range(3):
self.scheduler_job._do_scheduling(session)

model: DagModel = session.query(DagModel).get(dag.dag_id)

# Pre-condition
assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}

assert model.next_dagrun == timezone.convert_to_utc(
timezone.DateTime(
2016,
1,
3,
)
)
assert model.next_dagrun_create_after is None

complete_one_dagrun()

assert DagRun.active_runs_of_dags(session=session) == {'test_dag': 3}

for _ in range(5):
self.scheduler_job._do_scheduling(session)
complete_one_dagrun()
model: DagModel = session.query(DagModel).get(dag.dag_id)

expected_execution_dates = [datetime.datetime(2016, 1, d, tzinfo=timezone.utc) for d in range(1, 6)]
dagrun_execution_dates = [
dr.execution_date for dr in session.query(DagRun).order_by(DagRun.execution_date).all()
]
assert dagrun_execution_dates == expected_execution_dates

def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker):
"""
Make sure that when a DAG is already at max_active_runs, that manually triggered
Expand Down

0 comments on commit 2aed7c1

Please sign in to comment.