Skip to content

Commit

Permalink
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when…
Browse files Browse the repository at this point in the history
… DagFileProcessorManager gets killed (apache#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12
  • Loading branch information
msumit authored Sep 7, 2020
1 parent 596e24f commit b3d7fb4
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 4 deletions.
14 changes: 11 additions & 3 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Column, Integer, String, Boolean, PickleType, Index, UniqueConstraint, func, DateTime, or_,
and_
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import synonym
from sqlalchemy.orm.session import Session
Expand Down Expand Up @@ -359,9 +360,9 @@ def verify_integrity(self, session=None):
tis = self.get_task_instances(session=session)

# check for removed or restored tasks
task_ids = []
task_ids = set()
for ti in tis:
task_ids.append(ti.task_id)
task_ids.add(ti.task_id)
task = None
try:
task = dag.get_task(ti.task_id)
Expand Down Expand Up @@ -395,7 +396,14 @@ def verify_integrity(self, session=None):
ti = TaskInstance(task, self.execution_date)
session.add(ti)

session.commit()
try:
session.commit()
except IntegrityError as err:
self.log.info(str(err))
self.log.info('Hit IntegrityError while creating the TIs for '
f'{dag.dag_id} - {self.execution_date}.')
self.log.info('Doing session rollback.')
session.rollback()

@staticmethod
def get_run(session, dag_id, execution_date):
Expand Down
2 changes: 2 additions & 0 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from __future__ import print_function
from __future__ import unicode_literals

import inspect
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -788,6 +789,7 @@ def _exit_gracefully(self, signum, frame):
Helper method to clean up DAG file processors to avoid leaving orphan processes.
"""
self.log.info("Exiting gracefully upon receiving signal %s", signum)
self.log.debug("Current Stacktrace is: %s", '\n'.join(map(str, inspect.stack())))
self.terminate()
self.end()
self.log.debug("Finished terminating DAG processors.")
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
# under the License.
#

version = '1.10.4+twtr16'
version = '1.10.4+twtr17'

19 changes: 19 additions & 0 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,22 @@ def with_all_tasks_removed(dag):
dagrun.verify_integrity()
flaky_ti.refresh_from_db()
self.assertEqual(State.NONE, flaky_ti.state)

def test_already_added_task_instances_can_be_ignored(self):
dag = DAG('triggered_dag', start_date=DEFAULT_DATE)
dag.add_task(DummyOperator(task_id='first_task', owner='test'))

dagrun = self.create_dag_run(dag)
first_ti = dagrun.get_task_instances()[0]
self.assertEqual('first_task', first_ti.task_id)
self.assertEqual(State.NONE, first_ti.state)

# Lets assume that the above TI was added into DB by webserver, but if scheduler
# is running the same method at the same time it would find 0 TIs for this dag
# and proceeds further to create TIs. Hence mocking DagRun.get_task_instances
# method to return an empty list of TIs.
with mock.patch.object(DagRun, 'get_task_instances') as mock_gtis:
mock_gtis.return_value = []
dagrun.verify_integrity()
first_ti.refresh_from_db()
self.assertEqual(State.NONE, first_ti.state)

0 comments on commit b3d7fb4

Please sign in to comment.