Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing statsd metric for failing SLA Callback notification #20924

Merged
merged 2 commits into from
Feb 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ def manage_slas(self, dag: DAG, session: Session = None) -> None:
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis)
notification_sent = True
except Exception:
Stats.incr('sla_callback_notification_failure')
self.log.exception("Could not call sla_miss_callback for DAG %s", dag.dag_id)
email_content = f"""\
Here's a list of tasks that missed their SLAs:
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/logging-monitoring/metrics.rst
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Name Description
``scheduler.critical_section_busy`` Count of times a scheduler process tried to get a lock on the critical
section (needed to send tasks to the executor) and found it locked by
another process.
``sla_callback_notification_failure`` Number of failed SLA miss callback notification attempts
``sla_email_notification_failure`` Number of failed SLA miss email notification attempts
``ti.start.<dag_id>.<task_id>`` Number of started task in a given dag. Similar to <job_name>_start but for task
``ti.finish.<dag_id>.<task_id>.<state>`` Number of completed task in a given dag. Similar to <job_name>_end but for task
Expand Down
5 changes: 4 additions & 1 deletion tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ def test_dag_file_processor_sla_miss_doesnot_raise_integrity_error(self, dag_mak
# ti is successful thereby trying to insert a duplicate record.
dag_file_processor.manage_slas(dag=dag, session=session)

def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag):
@mock.patch('airflow.dag_processing.processor.Stats.incr')
def test_dag_file_processor_sla_miss_callback_exception(self, mock_stats_incr, create_dummy_dag):
"""
Test that the dag file processor gracefully logs an exception if there is a problem
calling the sla_miss_callback
Expand All @@ -253,6 +254,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag):
sla_miss_callback=sla_callback,
default_args={'start_date': test_start_date, 'sla': datetime.timedelta(hours=1)},
)
mock_stats_incr.reset_mock()

session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success'))

Expand All @@ -267,6 +269,7 @@ def test_dag_file_processor_sla_miss_callback_exception(self, create_dummy_dag):
mock_log.exception.assert_called_once_with(
'Could not call sla_miss_callback for DAG %s', 'test_sla_miss'
)
mock_stats_incr.assert_called_once_with('sla_callback_notification_failure')

@mock.patch('airflow.dag_processing.processor.send_email')
def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(
Expand Down