Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add clear logging to tasks killed due to a Dagrun timeout #19950

Merged
merged 3 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ def heartbeat_callback(self, session=None):
)
raise AirflowException("PID of job runner does not match")
elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'):
if ti.state == State.SKIPPED:
# A DagRun timeout will cause tasks to be externally marked as skipped.
dagrun = ti.get_dagrun(session=session)
execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date
dagrun_timeout = ti.task.dag.dagrun_timeout
SamWheating marked this conversation as resolved.
Show resolved Hide resolved
if dagrun_timeout and execution_time > dagrun_timeout:
self.log.warning("DagRun timed out after %s.", str(execution_time))
self.log.warning(
"State of this instance has been externally set to %s. Terminating instance.", ti.state
)
Expand All @@ -217,7 +224,7 @@ def heartbeat_callback(self, session=None):
else:
# if ti.state is not set by taskinstance.handle_failure, then
# error file will not be populated and it must be updated by
# external source suck as web UI
# external source such as web UI
error = self.task_runner.deserialize_run_error() or "task marked as failed externally"
ti._run_finished_callback(error=error)
self.terminating = True
Expand Down
41 changes: 41 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,47 @@ def task_function(ti):
assert "State of this instance has been externally set to failed. "
"Terminating instance." in caplog.text

def test_dagrun_timeout_logged_in_task_logs(self, caplog, dag_maker):
"""
Test that ensures that if a running task is externally skipped (due to a dagrun timeout)
It is logged in the task logs.
"""

session = settings.Session()

def task_function(ti):
assert State.RUNNING == ti.state
time.sleep(0.1)
ti.log.info("Marking TI as skipped externally")
ti.state = State.SKIPPED
session.merge(ti)
session.commit()

# This should not happen -- the state change should be noticed and the task should get killed
time.sleep(10)
assert False

with dag_maker(
"test_mark_failure", start_date=DEFAULT_DATE, dagrun_timeout=datetime.timedelta(microseconds=1)
):
task = PythonOperator(
task_id='skipped_externally',
python_callable=task_function,
)
dag_maker.create_dagrun()
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()

job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
with timeout(30):
# This should be _much_ shorter to run.
# If you change this limit, make the timeout in the callable above bigger
job1.run()

ti.refresh_from_db()
assert ti.state == State.SKIPPED
assert "DagRun timed out after " in caplog.text

@patch('airflow.utils.process_utils.subprocess.check_call')
@patch.object(StandardTaskRunner, 'return_code')
def test_failure_callback_only_called_once(self, mock_return_code, _check_call, dag_maker):
Expand Down