Skip to content
This repository has been archived by the owner on Mar 15, 2023. It is now read-only.

Commit

Permalink
[AIRFLOW-2865] Call success_callback before updating task state (apac…
Browse files Browse the repository at this point in the history
…he#4082)

In cases where the success callback takes variable
time, it's possible for it to interrupted by the heartbeat process.
This is because the heartbeat process looks for tasks that are no
longer in the "running" state but are still executing and reaps them.

This commit reverses the order of callback invocation and state
updating so that the "SUCCESS" state for the task isn't committed
to the database until after the success callback has finished.
  • Loading branch information
evizitei authored and wyndhblb committed Nov 9, 2018
1 parent f4cb395 commit c5322f4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
14 changes: 6 additions & 8 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1697,14 +1697,6 @@ def signal_handler(signum, frame):
self.handle_failure(e, test_mode, context)
raise

# Recording SUCCESS
self.end_date = timezone.utcnow()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.commit()

# Success callback
try:
if task.on_success_callback:
Expand All @@ -1713,6 +1705,12 @@ def signal_handler(signum, frame):
self.log.error("Failed when executing success callback")
self.log.exception(e3)

# Recording SUCCESS
self.end_date = timezone.utcnow()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.commit()

@provide_session
Expand Down
34 changes: 34 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,40 @@ def test_set_duration_empty_dates(self):
ti.set_duration()
self.assertIsNone(ti.duration)

def test_success_callbak_no_race_condition(self):
class CallbackWrapper(object):
def wrap_task_instance(self, ti):
self.task_id = ti.task_id
self.dag_id = ti.dag_id
self.execution_date = ti.execution_date
self.task_state_in_callback = ""
self.callback_ran = False

def success_handler(self, context):
self.callback_ran = True
session = settings.Session()
temp_instance = session.query(TI).filter(
TI.task_id == self.task_id).filter(
TI.dag_id == self.dag_id).filter(
TI.execution_date == self.execution_date).one()
self.task_state_in_callback = temp_instance.state
cw = CallbackWrapper()
dag = DAG('test_success_callbak_no_race_condition', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
task = DummyOperator(task_id='op', email='[email protected]',
on_success_callback=cw.success_handler, dag=dag)
ti = TI(task=task, execution_date=datetime.datetime.now())
ti.state = State.RUNNING
session = settings.Session()
session.merge(ti)
session.commit()
cw.wrap_task_instance(ti)
ti._run_raw_task()
self.assertTrue(cw.callback_ran)
self.assertEqual(cw.task_state_in_callback, State.RUNNING)
ti.refresh_from_db()
self.assertEqual(ti.state, State.SUCCESS)


class ClearTasksTest(unittest.TestCase):

Expand Down

0 comments on commit c5322f4

Please sign in to comment.