From ca5d37288fdc19680cd087b62a89c5dbd5e68a69 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Sun, 6 Feb 2022 15:44:13 +0100 Subject: [PATCH] Avoid deadlock when rescheduling task The scheduler job performs scheduling after locking the "scheduled" DagRun row for writing. This should prevent from modifying DagRun and related task instances by another scheduler or "mini-scheduler" run after task is completed. However there is apparently one more case where the DagRun is being locked by "Task" processes - namely when task throws AirflowRescheduleException. In this case a new "TaskReschedule" entity is inserted into the database and it also performs lock on the DagRun (because TaskReschedule has "DagRun" relationship. This PR modifies handling the AirflowRescheduleException to obtain the very same DagRun lock before it attempts to insert TaskReschedule entity. Seems that TaskReschedule is the only one that has this relationship so likely all the misterious SchedulerJob deadlock cases we experienced might be explained (and fixed) by this one. It is likely that this one: * Fixes: #16982 * Fixes: #19957 --- airflow/models/taskinstance.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 7f151f4de3cd8..3532bf1fdc989 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -111,7 +111,7 @@ from airflow.utils.platform import getuser from airflow.utils.retries import run_with_db_retries from airflow.utils.session import NEW_SESSION, create_session, provide_session -from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime +from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks from airflow.utils.state import DagRunState, State, TaskInstanceState from airflow.utils.timeout import timeout @@ -1671,11 +1671,24 @@ def _handle_reschedule( # Don't record reschedule request in test mode if test_mode: return + + from airflow.models.dagrun import DagRun # Avoid circular import + self.refresh_from_db(session) self.end_date = timezone.utcnow() self.set_duration() + # Lock DAG run to be sure not to get into a deadlock situation when trying to insert + # TaskReschedule which apparently also creates lock on corresponding DagRun entity + with_row_locks( + session.query(DagRun).filter_by( + dag_id=self.dag_id, + run_id=self.run_id, + ), + session=session, + ).one() + # Log reschedule request session.add( TaskReschedule(