From bc1d7b03ab0ba903718b4098196de24977ee1261 Mon Sep 17 00:00:00 2001 From: doiken <6147573+doiken@users.noreply.github.com> Date: Tue, 8 Aug 2023 21:22:13 +0900 Subject: [PATCH] Fix future DagRun rarely triggered by race conditions when max_active_runs reached its upper limit. (#31414) * feat: select dag_model with row lock * fix: logging that scheduling was skipped * fix: remove unused get_dagmodel * fix: correct log message to more generic word --------- Co-authored-by: doiken Co-authored-by: Tzu-ping Chung Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com> (cherry picked from commit b53e2aeefc1714d306f93e58d211ad9d52356470) --- airflow/jobs/scheduler_job_runner.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/scheduler_job_runner.py b/airflow/jobs/scheduler_job_runner.py index 3b399d75b58902..e47fc7bd86558e 100644 --- a/airflow/jobs/scheduler_job_runner.py +++ b/airflow/jobs/scheduler_job_runner.py @@ -35,7 +35,7 @@ from sqlalchemy import and_, delete, func, not_, or_, select, text, update from sqlalchemy.engine import Result from sqlalchemy.exc import OperationalError -from sqlalchemy.orm import Query, Session, load_only, make_transient, selectinload +from sqlalchemy.orm import Query, Session, joinedload, load_only, make_transient, selectinload from sqlalchemy.sql import expression from airflow import settings @@ -1397,11 +1397,24 @@ def _schedule_dag_run( callback: DagCallbackRequest | None = None dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session) - dag_model = DM.get_dagmodel(dag_run.dag_id, session) + # Adopt row locking to account for inconsistencies when next_dagrun_create_after = None + query = ( + session.query(DagModel) + .filter(DagModel.dag_id == dag_run.dag_id) + .options(joinedload(DagModel.parent_dag)) + ) + dag_model = with_row_locks( + query, of=DagModel, session=session, **skip_locked(session=session) + ).one_or_none() - if not dag or not dag_model: + if not dag: self.log.error("Couldn't find DAG %s in DAG bag or database!", dag_run.dag_id) return callback + if not dag_model: + self.log.info( + "DAG %s scheduling was skipped, probably because the DAG record was locked", dag_run.dag_id + ) + return callback if ( dag_run.start_date