Skip to content

Commit

Permalink
Adding test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
SamWheating committed Dec 9, 2021
1 parent f3e2fe8 commit d3be32a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
3 changes: 1 addition & 2 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,9 @@ def heartbeat_callback(self, session=None):
elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'):
if ti.state == State.SKIPPED:
# A DagRun timeout will cause tasks to be externally marked as skipped.
dagrun = ti.dag_run
dagrun = ti.get_dagrun()
execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date
dagrun_timeout = ti.task.dag.dagrun_timeout
self.log.debug(f"dagrun_timeout: {dagrun_timeout}")
if dagrun_timeout and execution_time > dagrun_timeout:
self.log.warning("DagRun timed out after %s.", str(execution_time))
self.log.warning(
Expand Down
41 changes: 41 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,47 @@ def task_function(ti):
assert "State of this instance has been externally set to failed. "
"Terminating instance." in caplog.text

def test_dagrun_timeout_logged_in_task_logs(self, caplog, dag_maker):
"""
Test that ensures that if a running task is externally skipped (due to a dagrun timeout)
It is logged in the task logs.
"""

session = settings.Session()

def task_function(ti):
assert State.RUNNING == ti.state
time.sleep(0.1)
ti.log.info("Marking TI as skipped externally")
ti.state = State.SKIPPED
session.merge(ti)
session.commit()

# This should not happen -- the state change should be noticed and the task should get killed
time.sleep(10)
assert False

with dag_maker(
"test_mark_failure", start_date=DEFAULT_DATE, dagrun_timeout=datetime.timedelta(microseconds=1)
):
task = PythonOperator(
task_id='skipped_externally',
python_callable=task_function,
)
dag_maker.create_dagrun()
ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.refresh_from_db()

job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor())
with timeout(30):
# This should be _much_ shorter to run.
# If you change this limit, make the timeout in the callable above bigger
job1.run()

ti.refresh_from_db()
assert ti.state == State.SKIPPED
assert "DagRun timed out after " in caplog.text

@patch('airflow.utils.process_utils.subprocess.check_call')
@patch.object(StandardTaskRunner, 'return_code')
def test_failure_callback_only_called_once(self, mock_return_code, _check_call, dag_maker):
Expand Down

0 comments on commit d3be32a

Please sign in to comment.