Skip to content

Commit

Permalink
[AIRFLOW-5621] - Failure callback is not triggered when marked Failed…
Browse files Browse the repository at this point in the history
… on UI (#7025)
  • Loading branch information
bharathpalaksha authored and potiuk committed Jan 6, 2020
1 parent 9967486 commit 24fa938
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 0 deletions.
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,11 @@ The 'properties' and 'jars' properties for the Dataproc related operators (`Data
and `dataproc_jars`respectively.
Arguments for dataproc_properties dataproc_jars

### Failure callback will be called when task is marked failed
When task is marked failed by user or task fails due to system failures - on failure call back will be called as part of clean up

See [AIRFLOW-5621](https://jira.apache.org/jira/browse/AIRFLOW-5621) for details

## Airflow 1.10.5

No breaking changes.
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,8 @@ def heartbeat_callback(self, session=None):
"Taking the poison pill.",
ti.state
)
if ti.state == State.FAILED and ti.task.on_failure_callback:
context = ti.get_template_context()
ti.task.on_failure_callback(context)
self.task_runner.terminate()
self.terminating = True
54 changes: 54 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,3 +292,57 @@ def multi_return_code():
# We already make sure patched sleep call is only called once
self.assertLess(time_end - time_start, job1.heartrate)
session.close()

def test_mark_failure_on_failure_callback(self):
"""
Test that ensures that mark_failure in the UI fails
the task, and executes on_failure_callback
"""
data = {'called': False}

def check_failure(context):
self.assertEqual(context['dag_run'].dag_id,
'test_mark_failure')
data['called'] = True

dag = DAG(dag_id='test_mark_failure',
start_date=DEFAULT_DATE,
default_args={'owner': 'owner1'})

task = DummyOperator(
task_id='test_state_succeeded1',
dag=dag,
on_failure_callback=check_failure)

session = settings.Session()

dag.clear()
dag.create_dagrun(run_id="test",
state=State.RUNNING,
execution_date=DEFAULT_DATE,
start_date=DEFAULT_DATE,
session=session)
ti = TI(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti,
ignore_ti_state=True,
executor=SequentialExecutor())
from airflow.task.task_runner.standard_task_runner import StandardTaskRunner
job1.task_runner = StandardTaskRunner(job1)
process = multiprocessing.Process(target=job1.run)
process.start()
ti.refresh_from_db()
for _ in range(0, 50):
if ti.state == State.RUNNING:
break
time.sleep(0.1)
ti.refresh_from_db()
self.assertEqual(State.RUNNING, ti.state)
ti.state = State.FAILED
session.merge(ti)
session.commit()

job1.heartbeat_callback(session=None)
self.assertTrue(data['called'])
process.join(timeout=10)
self.assertFalse(process.is_alive())

0 comments on commit 24fa938

Please sign in to comment.