Skip to content

Commit

Permalink
Handle occasional deadlocks in trigger with retries (#24071)
Browse files Browse the repository at this point in the history
Fixes: #23639
  • Loading branch information
potiuk authored Jun 1, 2022
1 parent 5087f96 commit d86ae09
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import State
Expand Down Expand Up @@ -88,9 +89,11 @@ def clean_unused(cls, session=None):
(triggers have a one-to-many relationship to both)
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
session.query(TaskInstance).filter(
TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
).update({TaskInstance.trigger_id: None})
for attempt in run_with_db_retries():
with attempt:
session.query(TaskInstance).filter(
TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
).update({TaskInstance.trigger_id: None})
# Get all triggers that have no task instances depending on them...
ids = [
trigger_id
Expand Down

2 comments on commit d86ae09

@Greetlist
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line need with attempt?

session.query(Trigger).filter(Trigger.id.in_(ids)).delete(synchronize_session=False)

image

@potiuk
Copy link
Member Author

@potiuk potiuk commented on d86ae09 Jun 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PLease add it in PR @Greetlist

Please sign in to comment.