From d3be32a0ecd600435a6777713fff9c630e2f3640 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Wed, 8 Dec 2021 16:55:32 -0800 Subject: [PATCH] Adding test coverage --- airflow/jobs/local_task_job.py | 3 +-- tests/jobs/test_local_task_job.py | 41 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 775a878faca05..c9ee5e2a90b41 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -210,10 +210,9 @@ def heartbeat_callback(self, session=None): elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'): if ti.state == State.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. - dagrun = ti.dag_run + dagrun = ti.get_dagrun() execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date dagrun_timeout = ti.task.dag.dagrun_timeout - self.log.debug(f"dagrun_timeout: {dagrun_timeout}") if dagrun_timeout and execution_time > dagrun_timeout: self.log.warning("DagRun timed out after %s.", str(execution_time)) self.log.warning( diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 3ad586cf7b6f9..cf508c4ff6e64 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -435,6 +435,47 @@ def task_function(ti): assert "State of this instance has been externally set to failed. " "Terminating instance." in caplog.text + def test_dagrun_timeout_logged_in_task_logs(self, caplog, dag_maker): + """ + Test that ensures that if a running task is externally skipped (due to a dagrun timeout) + It is logged in the task logs. + """ + + session = settings.Session() + + def task_function(ti): + assert State.RUNNING == ti.state + time.sleep(0.1) + ti.log.info("Marking TI as skipped externally") + ti.state = State.SKIPPED + session.merge(ti) + session.commit() + + # This should not happen -- the state change should be noticed and the task should get killed + time.sleep(10) + assert False + + with dag_maker( + "test_mark_failure", start_date=DEFAULT_DATE, dagrun_timeout=datetime.timedelta(microseconds=1) + ): + task = PythonOperator( + task_id='skipped_externally', + python_callable=task_function, + ) + dag_maker.create_dagrun() + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.refresh_from_db() + + job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + with timeout(30): + # This should be _much_ shorter to run. + # If you change this limit, make the timeout in the callable above bigger + job1.run() + + ti.refresh_from_db() + assert ti.state == State.SKIPPED + assert "DagRun timed out after " in caplog.text + @patch('airflow.utils.process_utils.subprocess.check_call') @patch.object(StandardTaskRunner, 'return_code') def test_failure_callback_only_called_once(self, mock_return_code, _check_call, dag_maker):