Skip to content

Commit

Permalink
Avoid deadlock when rescheduling task (#21362)
Browse files Browse the repository at this point in the history
The scheduler job performs scheduling after locking the "scheduled"
DagRun row for writing. This should prevent from modifying DagRun
and related task instances by another scheduler or "mini-scheduler"
run after task is completed.

However there is apparently one more case where the DagRun is being
locked by "Task" processes - namely when task throws
AirflowRescheduleException. In this case a new "TaskReschedule"
entity is inserted into the database and it also performs lock
on the DagRun (because TaskReschedule has "DagRun" relationship.

This PR modifies handling the AirflowRescheduleException to obtain the
very same DagRun lock before it attempts to insert TaskReschedule
entity.

Seems that TaskReschedule is the only one that has this relationship
so likely all the misterious SchedulerJob deadlock cases we
experienced might be explained (and fixed) by this one.

It is likely that this one:

* Fixes: #16982
* Fixes: #19957

(cherry picked from commit 6d110b5)
  • Loading branch information
potiuk authored and jedcunningham committed Feb 8, 2022
1 parent 8971e0f commit e2f3e94
Showing 1 changed file with 14 additions and 1 deletion.
15 changes: 14 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@
from airflow.utils.platform import getuser
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import create_session, provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
from airflow.utils.state import DagRunState, State
from airflow.utils.timeout import timeout

Expand Down Expand Up @@ -1657,11 +1657,24 @@ def _handle_reschedule(self, actual_start_date, reschedule_exception, test_mode=
# Don't record reschedule request in test mode
if test_mode:
return

from airflow.models.dagrun import DagRun # Avoid circular import

self.refresh_from_db(session)

self.end_date = timezone.utcnow()
self.set_duration()

# Lock DAG run to be sure not to get into a deadlock situation when trying to insert
# TaskReschedule which apparently also creates lock on corresponding DagRun entity
with_row_locks(
session.query(DagRun).filter_by(
dag_id=self.dag_id,
run_id=self.run_id,
),
session=session,
).one()

# Log reschedule request
session.add(
TaskReschedule(
Expand Down

0 comments on commit e2f3e94

Please sign in to comment.