Skip to content

Commit

Permalink
Call success_callback before updating task state (apache#4082)
Browse files Browse the repository at this point in the history
In cases where the success callback takes variable
time, it's possible for it to interrupted by the heartbeat process.
This is because the heartbeat process looks for tasks that are no
longer in the "running" state but are still executing and reaps them.

This commit reverses the order of callback invocation and state
updating so that the "SUCCESS" state for the task isn't committed
to the database until after the success callback has finished.
  • Loading branch information
evizitei authored and Chris Fei committed Jan 23, 2019
1 parent 714984d commit ab1501d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 8 deletions.
14 changes: 6 additions & 8 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1728,14 +1728,6 @@ def signal_handler(signum, frame):
self.handle_failure(e, test_mode, context)
raise

# Recording SUCCESS
self.end_date = timezone.utcnow()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.commit()

# Success callback
try:
if task.on_success_callback:
Expand All @@ -1744,6 +1736,12 @@ def signal_handler(signum, frame):
self.log.error("Failed when executing success callback")
self.log.exception(e3)

# Recording SUCCESS
self.end_date = timezone.utcnow()
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
session.merge(self)
session.commit()

@provide_session
Expand Down
34 changes: 34 additions & 0 deletions tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2425,6 +2425,40 @@ def test_overwrite_params_with_dag_run_conf_none(self):

self.assertEqual(False, params["override"])

def test_success_callbak_no_race_condition(self):
class CallbackWrapper(object):
def wrap_task_instance(self, ti):
self.task_id = ti.task_id
self.dag_id = ti.dag_id
self.execution_date = ti.execution_date
self.task_state_in_callback = ""
self.callback_ran = False

def success_handler(self, context):
self.callback_ran = True
session = settings.Session()
temp_instance = session.query(TI).filter(
TI.task_id == self.task_id).filter(
TI.dag_id == self.dag_id).filter(
TI.execution_date == self.execution_date).one()
self.task_state_in_callback = temp_instance.state
cw = CallbackWrapper()
dag = DAG('test_success_callbak_no_race_condition', start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=10))
task = DummyOperator(task_id='op', email='[email protected]',
on_success_callback=cw.success_handler, dag=dag)
ti = TI(task=task, execution_date=datetime.datetime.now())
ti.state = State.RUNNING
session = settings.Session()
session.merge(ti)
session.commit()
cw.wrap_task_instance(ti)
ti._run_raw_task()
self.assertTrue(cw.callback_ran)
self.assertEqual(cw.task_state_in_callback, State.RUNNING)
ti.refresh_from_db()
self.assertEqual(ti.state, State.SUCCESS)


class ClearTasksTest(unittest.TestCase):

Expand Down

0 comments on commit ab1501d

Please sign in to comment.