Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Omit DAGs that are known to fail from alerts #643

Merged
merged 15 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from typing import Tuple

from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.models import Variable
from common.github import GitHubAPI
from common.slack import send_alert
Expand All @@ -10,16 +10,40 @@
logger = logging.getLogger(__name__)


def get_issue_info(issue_url: str) -> Tuple[str, str, str]:
"""
Parses out the owner, repo, and issue_number from a GitHub issue url.
"""
url_split = issue_url.split("/")
if len(url_split) < 4:
raise AirflowException(f"Issue url {issue_url} could not be parsed.")
return url_split[-4], url_split[-3], url_split[-1]


def get_dags_with_closed_issues(github_pat, silenced_dags):
gh = GitHubAPI(github_pat)

dags_to_reenable = []
for dag_id, issue_url in silenced_dags.items():
owner, repo, issue_number = get_issue_info(issue_url)
github_issue = gh.get_issue(repo, issue_number, owner)

if github_issue.get("state") == "closed":
# If the associated issue has been closed, this DAG can have
# alerting reenabled.
dags_to_reenable.append((dag_id, issue_url))
return dags_to_reenable


def check_configuration(github_pat: str, airflow_variable: str):
stacimc marked this conversation as resolved.
Show resolved Hide resolved
silenced_dags = Variable.get(airflow_variable, {}, deserialize_json=True)
dags_to_reenable = get_dags_with_closed_issues(github_pat, silenced_dags)

if not dags_to_reenable:
logger.info(
raise AirflowSkipException(
"All DAGs configured to silence messages have work still in progress."
" No configuration updates needed."
)
return

message = (
"The following DAGs have Slack messages silenced, but the associated issue is"
Expand All @@ -30,28 +54,3 @@ def check_configuration(github_pat: str, airflow_variable: str):
message += f"\n - <{issue}|{dag}>"
send_alert(message, username="Silenced DAG Check")
return message


def get_dags_with_closed_issues(github_pat, silenced_dags):
gh = GitHubAPI(github_pat)

dags_to_reenable = []
for dag_id, issue_url in silenced_dags.items():
owner, repo, issue_number = get_issue_info(issue_url)
github_issue = gh.get_issue(repo, issue_number, owner)

if github_issue.get("state") == "closed":
# If the associated issue has been closed, this DAG can have
# alerting reenabled.
dags_to_reenable.append((dag_id, issue_url))
return dags_to_reenable


def get_issue_info(issue_url: str) -> Tuple[str, str, str]:
"""
Parses out the owner, repo, and issue_number from a GitHub issue url.
"""
url_split = issue_url.split("/")
if len(url_split) < 4:
raise AirflowException(f"Issue url {issue_url} could not be parsed.")
return url_split[-4], url_split[-3], url_split[-1]
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@


DAG_ID = "check_silenced_dags"
MAX_ACTIVE_TASKS = 1
MAX_ACTIVE = 1
GITHUB_PAT = Variable.get("GITHUB_API_KEY", default_var="not_set")


Expand All @@ -41,8 +41,8 @@
},
start_date=datetime(2022, 7, 29),
schedule_interval="@weekly",
max_active_tasks=MAX_ACTIVE_TASKS,
max_active_runs=MAX_ACTIVE_TASKS,
max_active_tasks=MAX_ACTIVE,
max_active_runs=MAX_ACTIVE,
catchup=False,
# Use the docstring at the top of the file as md docs in the UI
doc_md=__doc__,
Expand Down
33 changes: 21 additions & 12 deletions tests/dags/maintenance/test_check_silenced_dags.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest import mock

import pytest
from airflow.exceptions import AirflowException
from airflow.exceptions import AirflowException, AirflowSkipException
from maintenance.check_silenced_dags.check_silenced_dags import (
check_configuration,
get_dags_with_closed_issues,
Expand All @@ -14,8 +14,13 @@
@pytest.mark.parametrize(
"silenced_dags, dags_to_reenable, should_send_alert",
(
# No Dags to reenable, don't alert
({}, [], False),
# No Dags to reenable, task should skip
AetherUnbound marked this conversation as resolved.
Show resolved Hide resolved
pytest.param(
{},
[],
False,
marks=pytest.mark.raises(exception=AirflowSkipException),
),
# One DAG to reenable
(
{"dag_a_id": "https://github.com/WordPress/openverse/issues/1"},
Expand All @@ -39,15 +44,19 @@
),
)
def test_check_configuration(silenced_dags, dags_to_reenable, should_send_alert):
with mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.Variable",
return_value=silenced_dags,
), mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.get_dags_with_closed_issues",
return_value=dags_to_reenable,
) as get_dags_with_closed_issues_mock, mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.send_alert"
) as send_alert_mock:
with (
mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.Variable",
return_value=silenced_dags,
),
mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.get_dags_with_closed_issues",
return_value=dags_to_reenable,
) as get_dags_with_closed_issues_mock,
mock.patch(
"maintenance.check_silenced_dags.check_silenced_dags.send_alert"
) as send_alert_mock,
):
message = check_configuration("not_set", "silenced_slack_alerts")
assert send_alert_mock.called == should_send_alert
assert get_dags_with_closed_issues_mock.called_with("not_set", silenced_dags)
Expand Down