From f3e2fe8cc375e439b5a550bada566f3eff58bac5 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Wed, 1 Dec 2021 16:48:12 -0800 Subject: [PATCH 1/3] Add logging to tasks killed by a Dagrun timeout --- airflow/jobs/local_task_job.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 43eca4ae82cb5..775a878faca05 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -208,6 +208,14 @@ def heartbeat_callback(self, session=None): ) raise AirflowException("PID of job runner does not match") elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'): + if ti.state == State.SKIPPED: + # A DagRun timeout will cause tasks to be externally marked as skipped. + dagrun = ti.dag_run + execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date + dagrun_timeout = ti.task.dag.dagrun_timeout + self.log.debug(f"dagrun_timeout: {dagrun_timeout}") + if dagrun_timeout and execution_time > dagrun_timeout: + self.log.warning("DagRun timed out after %s.", str(execution_time)) self.log.warning( "State of this instance has been externally set to %s. Terminating instance.", ti.state ) @@ -217,7 +225,7 @@ def heartbeat_callback(self, session=None): else: # if ti.state is not set by taskinstance.handle_failure, then # error file will not be populated and it must be updated by - # external source suck as web UI + # external source such as web UI error = self.task_runner.deserialize_run_error() or "task marked as failed externally" ti._run_finished_callback(error=error) self.terminating = True From d3be32a0ecd600435a6777713fff9c630e2f3640 Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Wed, 8 Dec 2021 16:55:32 -0800 Subject: [PATCH 2/3] Adding test coverage --- airflow/jobs/local_task_job.py | 3 +-- tests/jobs/test_local_task_job.py | 41 +++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 775a878faca05..c9ee5e2a90b41 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -210,10 +210,9 @@ def heartbeat_callback(self, session=None): elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'): if ti.state == State.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. - dagrun = ti.dag_run + dagrun = ti.get_dagrun() execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date dagrun_timeout = ti.task.dag.dagrun_timeout - self.log.debug(f"dagrun_timeout: {dagrun_timeout}") if dagrun_timeout and execution_time > dagrun_timeout: self.log.warning("DagRun timed out after %s.", str(execution_time)) self.log.warning( diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index 3ad586cf7b6f9..cf508c4ff6e64 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -435,6 +435,47 @@ def task_function(ti): assert "State of this instance has been externally set to failed. " "Terminating instance." in caplog.text + def test_dagrun_timeout_logged_in_task_logs(self, caplog, dag_maker): + """ + Test that ensures that if a running task is externally skipped (due to a dagrun timeout) + It is logged in the task logs. + """ + + session = settings.Session() + + def task_function(ti): + assert State.RUNNING == ti.state + time.sleep(0.1) + ti.log.info("Marking TI as skipped externally") + ti.state = State.SKIPPED + session.merge(ti) + session.commit() + + # This should not happen -- the state change should be noticed and the task should get killed + time.sleep(10) + assert False + + with dag_maker( + "test_mark_failure", start_date=DEFAULT_DATE, dagrun_timeout=datetime.timedelta(microseconds=1) + ): + task = PythonOperator( + task_id='skipped_externally', + python_callable=task_function, + ) + dag_maker.create_dagrun() + ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) + ti.refresh_from_db() + + job1 = LocalTaskJob(task_instance=ti, ignore_ti_state=True, executor=SequentialExecutor()) + with timeout(30): + # This should be _much_ shorter to run. + # If you change this limit, make the timeout in the callable above bigger + job1.run() + + ti.refresh_from_db() + assert ti.state == State.SKIPPED + assert "DagRun timed out after " in caplog.text + @patch('airflow.utils.process_utils.subprocess.check_call') @patch.object(StandardTaskRunner, 'return_code') def test_failure_callback_only_called_once(self, mock_return_code, _check_call, dag_maker): From 5a0554d51d46f7abdc72830c0cd82d3017d3ba0e Mon Sep 17 00:00:00 2001 From: Sam Wheating Date: Fri, 10 Dec 2021 10:44:52 -0800 Subject: [PATCH 3/3] Update airflow/jobs/local_task_job.py Co-authored-by: Kaxil Naik --- airflow/jobs/local_task_job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index c9ee5e2a90b41..0acf46fbd87d9 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -210,7 +210,7 @@ def heartbeat_callback(self, session=None): elif self.task_runner.return_code() is None and hasattr(self.task_runner, 'process'): if ti.state == State.SKIPPED: # A DagRun timeout will cause tasks to be externally marked as skipped. - dagrun = ti.get_dagrun() + dagrun = ti.get_dagrun(session=session) execution_time = (dagrun.end_date or timezone.utcnow()) - dagrun.start_date dagrun_timeout = ti.task.dag.dagrun_timeout if dagrun_timeout and execution_time > dagrun_timeout: