Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Use Airflow variable to omit DAGs from any Slack notification (#644)
Browse files Browse the repository at this point in the history
* Add DAG to check for DAGs that need alerts reenabled

* Add tests

* Add test that send_alert skips

* Update message format to prevent links unfurling

* Rename files and small refactor to make it easier to add silenced notifications as well as alerts

* Update send_message to conditionally omit dags from slack notifications

* Update reporting to pass dag_id to send_message

* Update tests

* Add notifications configuration check to DAG

* Make dag_id required argument to ensure all Slack messages are skipped

* Make single variable for silencing notifs of all types, allow different issues

* Update record reporting

* Only allow one predicate per github issue

* Add types for clarity

* Update check_silenced_dags DAG

* Change order of functions

* Correct default for slack_message_override

* Consolidate DAG to one file, update docstring, fix send_alert

* Quote predicate in slack message

* Update dag docs

* Fix typo

Co-authored-by: Zack Krida <[email protected]>

* Update DAG docs with fixed typo

Co-authored-by: Zack Krida <[email protected]>
  • Loading branch information
stacimc and zackkrida authored Sep 26, 2022
1 parent 4a9c008 commit cec6893
Show file tree
Hide file tree
Showing 13 changed files with 411 additions and 264 deletions.
20 changes: 11 additions & 9 deletions DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,17 @@ Checks for DAGs that have silenced Slack alerts which may need to be turned back
on.

When a DAG has known failures, it can be ommitted from Slack error reporting by adding
an entry to the `silenced_slack_alerts` Airflow variable. This is a dictionary where the
key is the `dag_id` of the affected DAG, and the value is the URL of a GitHub issue
tracking the error.

The `check_silenced_alert` DAG iterates over the entries in the `silenced_slack_alerts`
configuration and verifies that the associated GitHub issues are still open. If an issue
has been closed, it is assumed that the DAG should have Slack reporting reenabled, and
an alert is sent to prompt manual update of the configuration. This prevents developers
from forgetting to reenable Slack reporting after the issue has been resolved.
an entry to the `silenced_slack_notifications` Airflow variable. This is a dictionary
where thekey is the `dag_id` of the affected DAG, and the value is a list of
SilencedSlackNotifications (which map silenced notifications to GitHub URLs) for that
DAG.

The `check_silenced_dags` DAG iterates over the entries in the
`silenced_slack_notifications` configuration and verifies that the associated GitHub
issues are still open. If an issue has been closed, it is assumed that the DAG should
have Slack reporting reenabled, and an alert is sent to prompt manual update of the
configuration. This prevents developers from forgetting to reenable Slack reporting
after the issue has been resolved.

The DAG runs weekly.

Expand Down
2 changes: 1 addition & 1 deletion openverse_catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,5 @@ def report_completion(
" pulls that may happen concurrently."
)

send_message(message, username="Airflow DAG Load Data Complete")
send_message(message, dag_id=dag_id, username="Airflow DAG Load Data Complete")
return message
100 changes: 67 additions & 33 deletions openverse_catalog/dags/common/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import json
import logging
from os.path import basename
from typing import Any, Callable, Optional
from typing import Any, Callable, Optional, TypedDict

from airflow.exceptions import AirflowNotFoundException
from airflow.models import Variable
Expand All @@ -65,6 +65,20 @@
log = logging.getLogger(__name__)


class SilencedSlackNotification(TypedDict):
"""
Configuration for a silenced Slack notification.
issue: A link to a GitHub issue which describes why the notification
is silenced and tracks resolving the problem.
predicate: Slack notifications whose text or username contain the
predicate will be silenced. Matching is case-insensitive.
"""

issue: str
predicate: str


class SlackMessage:
"""Slack Block Message Builder"""

Expand Down Expand Up @@ -214,8 +228,57 @@ def send(self, notification_text: str = "Airflow notification") -> Response:
return response


def should_silence_message(text, username, dag_id):
"""
Checks the `silenced_slack_notifications` Airflow variable to see if the message
should be silenced for this DAG.
"""
# Match on message text and username
message = username + text

# Get the configuration for silenced messages for this DAG
silenced_notifications: list[SilencedSlackNotification] = Variable.get(
"silenced_slack_notifications", default_var={}, deserialize_json=True
).get(dag_id, [])

return bool(silenced_notifications) and any(
notification["predicate"].lower() in message.lower()
for notification in silenced_notifications
)


def should_send_message(
text, username, dag_id, http_conn_id=SLACK_NOTIFICATIONS_CONN_ID
):
"""
Returns True if:
* A Slack connection is defined
* The DAG is not configured to silence messages of this type
* We are in the prod env OR the message override is set.
"""
# Exit early if no slack connection exists
hook = HttpHook(http_conn_id=http_conn_id)
try:
hook.get_conn()
except AirflowNotFoundException:
return False

# Exit early if this DAG is configured to skip Slack messaging
if should_silence_message(text, username, dag_id):
log.info(f"Skipping Slack notification for {dag_id}.")
return False

# Exit early if we aren't on production or if force alert is not set
environment = Variable.get("environment", default_var="dev")
force_message = Variable.get(
"slack_message_override", default_var=False, deserialize_json=True
)
return environment == "prod" or force_message


def send_message(
text: str,
dag_id: str,
username: str = "Airflow",
icon_emoji: str = ":airflow:",
markdown: bool = True,
Expand All @@ -225,7 +288,7 @@ def send_message(
) -> None:
"""Send a simple slack message, convenience message for short/simple messages."""
log.info(text)
if not should_send_message(http_conn_id):
if not should_send_message(text, username, dag_id, http_conn_id):
return

environment = Variable.get("environment", default_var="dev")
Expand All @@ -240,29 +303,9 @@ def send_message(
s.send(text)


def should_send_message(http_conn_id=SLACK_NOTIFICATIONS_CONN_ID):
"""
Returns true if a Slack connection is defined and we are in production (or
the message override is set).
"""
# Exit early if no slack connection exists
hook = HttpHook(http_conn_id=http_conn_id)
try:
hook.get_conn()
except AirflowNotFoundException:
return False

# Exit early if we aren't on production or if force alert is not set
environment = Variable.get("environment", default_var="dev")
force_message = Variable.get(
"slack_message_override", default_var=False, deserialize_json=True
)
return environment == "prod" or force_message


def send_alert(
text: str,
dag_id: str | None = None,
dag_id: str,
username: str = "Airflow Alert",
icon_emoji: str = ":airflow:",
markdown: bool = True,
Expand All @@ -273,18 +316,9 @@ def send_alert(
Wrapper for send_message that allows sending a message to the configured alerts
channel instead of the default notification channel.
"""

known_failures = Variable.get(
"silenced_slack_alerts", default_var={}, deserialize_json=True
)
if dag_id in known_failures and any(
error.lower() in text.lower() for error in known_failures[dag_id]["errors"]
):
log.info(f"Skipping Slack alert for {dag_id}: {text}")
return

send_message(
text,
dag_id,
username,
icon_emoji,
markdown,
Expand Down
1 change: 1 addition & 0 deletions openverse_catalog/dags/data_refresh/dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def create_data_refresh_dag(data_refresh: DataRefresh, external_dag_ids: Sequenc
after_record_count.task_id, "return_value"
),
"media_type": data_refresh.media_type,
"dag_id": data_refresh.dag_id,
},
)

Expand Down
6 changes: 4 additions & 2 deletions openverse_catalog/dags/data_refresh/record_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
log = logging.getLogger(__name__)


def report_record_difference(before: str, after: str, media_type: str):
def report_record_difference(before: str, after: str, media_type: str, dag_id: str):
before = int(before)
after = int(after)
count_diff = after - before
Expand All @@ -20,5 +20,7 @@ def report_record_difference(before: str, after: str, media_type: str):
*Record count difference for `{media_type}`*: {before:,}{after:,}
*Change*: {count_diff:+,} ({percent_diff:+}% Δ)
"""
slack.send_message(text=message, username="Data refresh record difference")
slack.send_message(
text=message, dag_id=dag_id, username="Data refresh record difference"
)
return message
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def report_actionable_records(
{media_type_reports}
"""

slack.send_alert(message, username="Reported Media Requires Review")
slack.send_alert(message, dag_id=DAG_ID, username="Reported Media Requires Review")


def create_dag():
Expand Down
118 changes: 118 additions & 0 deletions openverse_catalog/dags/maintenance/check_silenced_dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
Checks for DAGs that have silenced Slack alerts which may need to be turned back
on.
When a DAG has known failures, it can be ommitted from Slack error reporting by adding
an entry to the `silenced_slack_notifications` Airflow variable. This is a dictionary
where thekey is the `dag_id` of the affected DAG, and the value is a list of
SilencedSlackNotifications (which map silenced notifications to GitHub URLs) for that
DAG.
The `check_silenced_dags` DAG iterates over the entries in the
`silenced_slack_notifications` configuration and verifies that the associated GitHub
issues are still open. If an issue has been closed, it is assumed that the DAG should
have Slack reporting reenabled, and an alert is sent to prompt manual update of the
configuration. This prevents developers from forgetting to reenable Slack reporting
after the issue has been resolved.
The DAG runs weekly.
"""

import logging
from datetime import datetime, timedelta
from typing import Tuple

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import DAG, Variable
from airflow.operators.python import PythonOperator
from common.constants import DAG_DEFAULT_ARGS
from common.github import GitHubAPI
from common.slack import SilencedSlackNotification, send_alert


logger = logging.getLogger(__name__)


DAG_ID = "check_silenced_dags"
MAX_ACTIVE = 1
GITHUB_PAT = Variable.get("GITHUB_API_KEY", default_var="not_set")


def get_issue_info(issue_url: str) -> Tuple[str, str, str]:
"""
Parses out the owner, repo, and issue_number from a GitHub issue url.
"""
url_split = issue_url.split("/")
if len(url_split) < 4:
raise AirflowException(f"Issue url {issue_url} could not be parsed.")
return url_split[-4], url_split[-3], url_split[-1]


def get_dags_with_closed_issues(
github_pat: str, silenced_dags: dict[str, list[SilencedSlackNotification]]
):
gh = GitHubAPI(github_pat)

dags_to_reenable = []
for dag_id, silenced_notifications in silenced_dags.items():
for notification in silenced_notifications:
issue_url = notification["issue"]
owner, repo, issue_number = get_issue_info(issue_url)
github_issue = gh.get_issue(repo, issue_number, owner)

if github_issue.get("state") == "closed":
# If the associated issue has been closed, this DAG can have
# alerting reenabled for this predicate.
dags_to_reenable.append((dag_id, issue_url, notification["predicate"]))
return dags_to_reenable


def check_configuration(github_pat: str):
silenced_dags = Variable.get(
"silenced_slack_notifications", default_var={}, deserialize_json=True
)
dags_to_reenable = get_dags_with_closed_issues(github_pat, silenced_dags)

if not dags_to_reenable:
raise AirflowSkipException(
"All DAGs configured to silence messages have work still in progress."
" No configuration updates needed."
)

message = (
"The following DAGs have Slack messages silenced, but the associated issue is"
" closed. Please remove them from the silenced_slack_notifications Airflow"
" variable or assign a new issue."
)
for (dag, issue, predicate) in dags_to_reenable:
message += f"\n - <{issue}|{dag}: '{predicate}'>"
send_alert(
message, dag_id=DAG_ID, username="Silenced DAG Check", unfurl_links=False
)
return message


dag = DAG(
dag_id=DAG_ID,
default_args={
**DAG_DEFAULT_ARGS,
"retry_delay": timedelta(minutes=1),
},
start_date=datetime(2022, 7, 29),
schedule_interval="@weekly",
max_active_tasks=MAX_ACTIVE,
max_active_runs=MAX_ACTIVE,
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
tags=["maintenance"],
)

with dag:
PythonOperator(
task_id="check_silenced_dags_configuration",
python_callable=check_configuration,
op_kwargs={
"github_pat": GITHUB_PAT,
},
)

This file was deleted.

Loading

0 comments on commit cec6893

Please sign in to comment.