Skip to content

Commit

Permalink
Handle occasional deadlocks in trigger with retries (#24071)
Browse files Browse the repository at this point in the history
Fixes: #23639
(cherry picked from commit d86ae09)
  • Loading branch information
potiuk authored and ephraimbuddy committed Jul 5, 2022
1 parent 9c5a07d commit c76b324
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions airflow/models/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from airflow.models.taskinstance import TaskInstance
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
from airflow.utils.retries import run_with_db_retries
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import State
Expand Down Expand Up @@ -88,9 +89,11 @@ def clean_unused(cls, session=None):
(triggers have a one-to-many relationship to both)
"""
# Update all task instances with trigger IDs that are not DEFERRED to remove them
session.query(TaskInstance).filter(
TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
).update({TaskInstance.trigger_id: None})
for attempt in run_with_db_retries():
with attempt:
session.query(TaskInstance).filter(
TaskInstance.state != State.DEFERRED, TaskInstance.trigger_id.isnot(None)
).update({TaskInstance.trigger_id: None})
# Get all triggers that have no task instances depending on them...
ids = [
trigger_id
Expand Down

0 comments on commit c76b324

Please sign in to comment.