Skip to content

Commit

Permalink
Fix task retries when they receive sigkill and have retries and prope…
Browse files Browse the repository at this point in the history
…rly handle sigterm (#16301)

Currently, tasks are not retried when they receive SIGKILL or SIGTERM even if the task has retry. This change fixes it
and added test for both SIGTERM and SIGKILL so we don't experience regression

Also, SIGTERM sets the task as failed and raises AirflowException which heartbeat sometimes see as externally set to fail
and not call failure_callbacks. This commit also fixes this by calling handle_task_exit when a task gets SIGTERM

Co-authored-by: Ash Berlin-Taylor <[email protected]>
(cherry picked from commit 4e2a94c)
  • Loading branch information
ephraimbuddy authored and potiuk committed Aug 17, 2021
1 parent 90bbd21 commit 1a264a4
Showing 1 changed file with 50 additions and 0 deletions.
50 changes: 50 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,56 @@ def task_function(ti):
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1

def test_process_sigterm_works_with_retries(self, dag_maker):
"""
Test that ensures that task runner sets tasks to retry when they(task runner)
receive sigterm
"""
# use shared memory value so we can properly track value change even if
# it's been updated across processes.
retry_callback_called = Value('i', 0)
task_terminated_externally = Value('i', 1)
shared_mem_lock = Lock()

def retry_callback(context):
with shared_mem_lock:
retry_callback_called.value += 1
assert context['dag_run'].dag_id == 'test_mark_failure_2'

def task_function(ti):
time.sleep(60)
# This should not happen -- the state change should be noticed and the task should get killed
with shared_mem_lock:
task_terminated_externally.value = 0

with dag_maker(dag_id='test_mark_failure_2'):
task = PythonOperator(
task_id='test_on_failure',
python_callable=task_function,
retries=1,
retry_delay=timedelta(seconds=2),
on_retry_callback=retry_callback,
)
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()
job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
job1.task_runner = StandardTaskRunner(job1)
job1.task_runner.start()
settings.engine.dispose()
process = multiprocessing.Process(target=job1.run)
process.start()
for _ in range(0, 25):
ti.refresh_from_db()
if ti.state == State.RUNNING and ti.pid is not None:
break
time.sleep(0.2)
os.kill(process.pid, signal.SIGTERM)
process.join()
ti.refresh_from_db()
assert ti.state == State.UP_FOR_RETRY
assert retry_callback_called.value == 1
assert task_terminated_externally.value == 1

def test_task_exit_should_update_state_of_finished_dagruns_with_dag_paused(self):
"""Test that with DAG paused, DagRun state will update when the tasks finishes the run"""
dag = DAG(dag_id='test_dags', start_date=DEFAULT_DATE)
Expand Down

0 comments on commit 1a264a4

Please sign in to comment.