Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deadlock when rescheduling task #21362

Merged
merged 1 commit into from
Feb 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
from airflow.utils.platform import getuser
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime, with_row_locks
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.timeout import timeout

Expand Down Expand Up @@ -1671,11 +1671,24 @@ def _handle_reschedule(
# Don't record reschedule request in test mode
if test_mode:
return

from airflow.models.dagrun import DagRun # Avoid circular import

self.refresh_from_db(session)

self.end_date = timezone.utcnow()
self.set_duration()

# Lock DAG run to be sure not to get into a deadlock situation when trying to insert
# TaskReschedule which apparently also creates lock on corresponding DagRun entity
with_row_locks(
session.query(DagRun).filter_by(
dag_id=self.dag_id,
run_id=self.run_id,
),
session=session,
).one()
potiuk marked this conversation as resolved.
Show resolved Hide resolved

# Log reschedule request
session.add(
TaskReschedule(
Expand Down