Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Omit DAGs that are known to fail from alerts #643

Merged
merged 15 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions openverse_catalog/dags/common/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def _make_request(self, method: str, resource: str, **kwargs) -> requests.Respon
response.raise_for_status()
return response.json()

def get_issue(self, repo: str, issue_number: int, owner: str = "WordPress"):
return self._make_request("GET", f"repos/{owner}/{repo}/issues/{issue_number}")

def get_open_prs(self, repo: str, owner: str = "WordPress"):
return self._make_request(
"GET",
Expand Down
1 change: 0 additions & 1 deletion openverse_catalog/dags/common/loader/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,4 @@ def report_completion(
*Number of records upserted per media type*:
{media_type_reports}"""
send_message(message, username="Airflow DAG Load Data Complete")
logger.info(message)
return message
33 changes: 28 additions & 5 deletions openverse_catalog/dags/common/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,15 +219,22 @@ def send_message(
username: str = "Airflow",
icon_emoji: str = ":airflow:",
markdown: bool = True,
unfurl_links: bool = True,
unfurl_media: bool = True,
http_conn_id: str = SLACK_NOTIFICATIONS_CONN_ID,
) -> None:
"""Send a simple slack message, convenience message for short/simple messages."""
log.info(text)
if not should_send_message(http_conn_id):
return

environment = Variable.get("environment", default_var="dev")
s = SlackMessage(
f"{username} | {environment}", icon_emoji, http_conn_id=http_conn_id
f"{username} | {environment}",
icon_emoji,
unfurl_links,
unfurl_media,
http_conn_id=http_conn_id,
)
s.add_text(text, plain_text=not markdown)
s.send(text)
Expand Down Expand Up @@ -255,16 +262,33 @@ def should_send_message(http_conn_id=SLACK_NOTIFICATIONS_CONN_ID):

def send_alert(
text: str,
dag_id: str | None = None,
username: str = "Airflow Alert",
icon_emoji: str = ":airflow:",
markdown: bool = True,
unfurl_links: bool = True,
unfurl_media: bool = True,
):
"""
Wrapper for send_message that allows sending a message to the configured alerts
channel instead of the default notification channel.
"""

known_failures = Variable.get(
"silenced_slack_alerts", default_var={}, deserialize_json=True
)
if dag_id in known_failures:
log.info(f"Skipping Slack alert for {dag_id}: {text}")
return

send_message(
text, username, icon_emoji, markdown, http_conn_id=SLACK_ALERTS_CONN_ID
text,
username,
icon_emoji,
markdown,
unfurl_links,
unfurl_media,
http_conn_id=SLACK_ALERTS_CONN_ID,
)


Expand All @@ -274,6 +298,7 @@ def on_failure_callback(context: dict) -> None:
Errors are only sent out in production and if a Slack connection is defined.
"""
# Get relevant info
dag = context["dag"]
ti = context["task_instance"]
execution_date = context["execution_date"]
exception: Optional[Exception] = context.get("exception")
Expand All @@ -296,6 +321,4 @@ def on_failure_callback(context: dict) -> None:
*Log*: {ti.log_url}
{exception_message}
"""
send_message(
message, username="Airflow DAG Failure", http_conn_id=SLACK_ALERTS_CONN_ID
)
send_alert(message, dag_id=dag.dag_id, username="Airflow DAG Failure")
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ def report_actionable_records(
{media_type_reports}
"""

logger.info(message)
slack.send_alert(message, username="Reported Media Requires Review")


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import logging
from typing import Tuple

from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import Variable
from common.github import GitHubAPI
from common.slack import send_alert


logger = logging.getLogger(__name__)


def get_issue_info(issue_url: str) -> Tuple[str, str, str]:
"""
Parses out the owner, repo, and issue_number from a GitHub issue url.
"""
url_split = issue_url.split("/")
if len(url_split) < 4:
raise AirflowException(f"Issue url {issue_url} could not be parsed.")
return url_split[-4], url_split[-3], url_split[-1]


def get_dags_with_closed_issues(github_pat, silenced_dags):
gh = GitHubAPI(github_pat)

dags_to_reenable = []
for dag_id, issue_url in silenced_dags.items():
owner, repo, issue_number = get_issue_info(issue_url)
github_issue = gh.get_issue(repo, issue_number, owner)

if github_issue.get("state") == "closed":
# If the associated issue has been closed, this DAG can have
# alerting reenabled.
dags_to_reenable.append((dag_id, issue_url))
return dags_to_reenable


def check_configuration(github_pat: str, airflow_variable: str):
silenced_dags = Variable.get(airflow_variable, {}, deserialize_json=True)
dags_to_reenable = get_dags_with_closed_issues(github_pat, silenced_dags)

if not dags_to_reenable:
raise AirflowSkipException(
"All DAGs configured to silence messages have work still in progress."
" No configuration updates needed."
)

message = (
"The following DAGs have Slack messages silenced, but the associated issue is"
f" closed. Please remove them from the `{airflow_variable}` Airflow variable"
" or assign a new issue."
)
for (dag, issue) in dags_to_reenable:
message += f"\n - <{issue}|{dag}>"
send_alert(message, username="Silenced DAG Check", unfurl_links=False)
return message
Comment on lines +48 to +56
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if this created a GitHub issue or something that could actually be assigned and tracked or have the maintainers pinged. Or maybe even just left a comment on the issue itself like "Please un-silence the DAG errors". Or maybe both, a new issue and a ping on the old, with the new issue just tracking the work of actually updating the prod configuration.

Just worried a slack ping could easily get lost (especially if lots of people are on vacation or distracted by something else, for example) in a way that a GitHub issue won't, as it acts more like a formal "todo" item.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I love this idea! I'm going to create a follow-up issue and link back, this would be fantastic.

Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
Checks for DAGs that have silenced Slack alerts which may need to be turned back
on.

When a DAG has known failures, it can be ommitted from Slack error reporting by adding
an entry to the `silenced_slack_alerts` Airflow variable. This is a dictionary where the
key is the `dag_id` of the affected DAG, and the value is the URL of a GitHub issue
tracking the error.

The `check_silenced_alert` DAG iterates over the entries in the `silenced_slack_alerts`
configuration and verifies that the associated GitHub issues are still open. If an issue
has been closed, it is assumed that the DAG should have Slack reporting reenabled, and
an alert is sent to prompt manual update of the configuration. This prevents developers
from forgetting to reenable Slack reporting after the issue has been resolved.

The DAG runs weekly.
"""

import logging
from datetime import datetime, timedelta

from airflow.models import DAG, Variable
from airflow.operators.python import PythonOperator
from common.constants import DAG_DEFAULT_ARGS
from maintenance.check_silenced_dags import check_silenced_dags


logger = logging.getLogger(__name__)


DAG_ID = "check_silenced_dags"
MAX_ACTIVE = 1
GITHUB_PAT = Variable.get("GITHUB_API_KEY", default_var="not_set")


dag = DAG(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know I keep saying this so apologies if this comes across as pushy, but we could definitely use the TaskFlow API for this DAG! 😄

dag_id=DAG_ID,
default_args={
**DAG_DEFAULT_ARGS,
"retry_delay": timedelta(minutes=1),
},
start_date=datetime(2022, 7, 29),
schedule_interval="@weekly",
max_active_tasks=MAX_ACTIVE,
max_active_runs=MAX_ACTIVE,
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
tags=["maintenance"],
)

with dag:
PythonOperator(
task_id="check_silenced_alert_configuration",
python_callable=check_silenced_dags.check_configuration,
op_kwargs={
"github_pat": GITHUB_PAT,
"airflow_variable": "silenced_slack_alerts",
},
)
16 changes: 16 additions & 0 deletions tests/dags/common/test_slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,24 @@ def test_send_alert():
"DifferentUser",
":airflow:",
True,
True,
True,
http_conn_id=SLACK_ALERTS_CONN_ID,
)


def test_send_alert_skips_when_silenced():
mock_silenced_dags = {
"silenced_dag_id": "https://github.com/WordPress/openverse/issues/1"
}
with mock.patch("common.slack.send_message") as send_message_mock, mock.patch(
"common.slack.Variable"
) as MockVariable:
MockVariable.get.side_effect = [mock_silenced_dags]
send_alert("Sample text", dag_id="silenced_dag_id")
send_message_mock.assert_not_called()


@pytest.mark.parametrize(
"exception, environment, slack_message_override, call_expected",
[
Expand Down Expand Up @@ -369,10 +383,12 @@ def test_on_failure_callback(
"task_instance": mock.Mock(),
"execution_date": datetime.now(),
"exception": exception,
"dag": mock.Mock(),
}
env_vars = {
"environment": environment,
"slack_message_override": slack_message_override,
"silenced_slack_alerts": [],
}

# Mock env variables
Expand Down
Loading