From 357d1e2a0396d9d6ee3d3344715a4914eb70d55b Mon Sep 17 00:00:00 2001 From: jorricks Date: Tue, 18 Jan 2022 15:40:42 +0100 Subject: [PATCH 1/2] Add missing statsd metric for failing SLA Callback notification --- airflow/dag_processing/processor.py | 1 + docs/apache-airflow/logging-monitoring/metrics.rst | 1 + tests/dag_processing/test_processor.py | 3 ++- 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 4bb338380d5c9..a097606a51c3c 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -480,6 +480,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None: dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis) notification_sent = True except Exception: + Stats.incr('sla_callback_notification_failure') self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id) email_content = f"""\ Here's a list of tasks that missed their SLAs: diff --git a/docs/apache-airflow/logging-monitoring/metrics.rst b/docs/apache-airflow/logging-monitoring/metrics.rst index b5a0ac3a757f2..85f41dd337071 100644 --- a/docs/apache-airflow/logging-monitoring/metrics.rst +++ b/docs/apache-airflow/logging-monitoring/metrics.rst @@ -100,6 +100,7 @@ Name Description ``scheduler.critical_section_busy`` Count of times a scheduler process tried to get a lock on the critical section (needed to send tasks to the executor) and found it locked by another process. +``sla_callback_notification_failure`` Number of failed SLA miss callback notification attempts ``sla_email_notification_failure`` Number of failed SLA miss email notification attempts ``ti.start..`` Number of started task in a given dag. Similar to _start but for task ``ti.finish...`` Number of completed task in a given dag. Similar to _end but for task diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index fde5962bf4acb..78b98b0545912 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -237,7 +237,7 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak # ti is successful thereby trying to insert a duplicate record. dag_file_processor.manage_slas(dag=dag, session=session) - def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag): + def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag, mock_stats_incr): """ Test that the dag file processor gracefully logs an exception if there is a problem calling the sla_miss_callback @@ -267,6 +267,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag): mock_log.exception.assert_called_once_with( 'Could not call sla_miss_callback for DAG %s', 'test_sla_miss' ) + mock_stats_incr.assert_called_once_with('sla_callback_notification_failure') @mock.patch('airflow.dag_processing.processor.send_email') def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks( From 5776f1cfae2438a208c841e609b4e18036fe97a2 Mon Sep 17 00:00:00 2001 From: jorricks Date: Tue, 18 Jan 2022 19:26:02 +0100 Subject: [PATCH 2/2] FIX Failing test --- tests/dag_processing/test_processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/dag_processing/test_processor.py b/tests/dag_processing/test_processor.py index 78b98b0545912..50cada119554e 100644 --- a/tests/dag_processing/test_processor.py +++ b/tests/dag_processing/test_processor.py @@ -237,7 +237,8 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak # ti is successful thereby trying to insert a duplicate record. dag_file_processor.manage_slas(dag=dag, session=session) - def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag, mock_stats_incr): + @mock.patch('airflow.dag_processing.processor.Stats.incr') + def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag): """ Test that the dag file processor gracefully logs an exception if there is a problem calling the sla_miss_callback @@ -253,6 +254,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag, sla_miss_callback=sla_callback, default_args={'start_date': test_start_date, 'sla': datetime.timedelta(hours=1)}, ) + mock_stats_incr.reset_mock() session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))