Skip to content

Commit

Permalink
fix(notify): Mock environ in all situations (#32607)
Browse files Browse the repository at this point in the history
  • Loading branch information
chouetz authored Jan 3, 2025
1 parent 3162916 commit cb3783d
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 41 deletions.
19 changes: 2 additions & 17 deletions .gitlab/notify/notify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,8 @@ notify:
- GITLAB_TOKEN=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $GITLAB_TOKEN read_api) || exit $?; export GITLAB_TOKEN
- DD_API_KEY=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $AGENT_API_KEY_ORG2 token) || exit $?; export DD_API_KEY
- python3 -m pip install -r requirements.txt -r tasks/libs/requirements-notifications.txt
- |
# Do not send notifications if this is a child pipeline of another repo
# The triggering repo should already have its own notification system
if [ "$CI_PIPELINE_SOURCE" != "pipeline" ]; then
if [ "$DEPLOY_AGENT" = "true" ]; then
invoke -e notify.send-message --notification-type "deploy"
elif [ "$CI_PIPELINE_SOURCE" != "push" ]; then
invoke -e notify.send-message --notification-type "trigger"
else
invoke -e notify.send-message --notification-type "merge"
fi
if [ "$CI_COMMIT_BRANCH" = "$CI_DEFAULT_BRANCH" ]; then
invoke notify.check-consistent-failures
fi
else
echo "This pipeline was triggered by another repository, skipping notification."
fi
- invoke -e notify.send-message -p $CI_PIPELINE_ID
- invoke -e notify.check-consistent-failures -p $CI_PIPELINE_ID

send_pipeline_stats:
stage: notify
Expand Down
16 changes: 8 additions & 8 deletions tasks/libs/notify/pipeline_status.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import re

from tasks.libs.ciproviders.gitlab_api import get_commit, get_pipeline
from tasks.libs.common.git import get_default_branch
from tasks.libs.notify.utils import DEPLOY_PIPELINES_CHANNEL, PIPELINES_CHANNEL, PROJECT_NAME
from tasks.libs.notify.utils import DEPLOY_PIPELINES_CHANNEL, PIPELINES_CHANNEL, PROJECT_NAME, get_pipeline_type
from tasks.libs.pipeline.data import get_failed_jobs
from tasks.libs.pipeline.notifications import (
base_message,
Expand All @@ -23,8 +22,8 @@ def should_send_message_to_author(git_ref: str, default_branch: str) -> bool:
return not (git_ref == default_branch or release_ref_regex.match(git_ref) or release_ref_regex_rc.match(git_ref))


def send_message(ctx, notification_type, dry_run):
pipeline = get_pipeline(PROJECT_NAME, os.environ["CI_PIPELINE_ID"])
def send_message(ctx, pipeline_id, dry_run):
pipeline = get_pipeline(PROJECT_NAME, pipeline_id)
commit = get_commit(PROJECT_NAME, pipeline.sha)
failed_jobs = get_failed_jobs(pipeline)

Expand All @@ -40,15 +39,16 @@ def send_message(ctx, notification_type, dry_run):
# For deploy pipelines not on the main branch, send notifications in a
# dedicated channel.
slack_channel = PIPELINES_CHANNEL
if notification_type == "deploy" and pipeline.ref != get_default_branch():
pipeline_type = get_pipeline_type(pipeline)
if pipeline_type == "deploy" and pipeline.ref != get_default_branch():
slack_channel = DEPLOY_PIPELINES_CHANNEL

header = ""
if notification_type == "merge":
if pipeline_type == "merge":
header = f"{header_icon} :merged: datadog-agent merge"
elif notification_type == "deploy":
elif pipeline_type == "deploy":
header = f"{header_icon} :rocket: datadog-agent deploy"
elif notification_type == "trigger":
elif pipeline_type == "trigger":
header = f"{header_icon} :arrow_forward: datadog-agent triggered"

message = SlackMessage(jobs=failed_jobs)
Expand Down
27 changes: 27 additions & 0 deletions tasks/libs/notify/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import re
from typing import Any
from urllib.parse import quote
Expand Down Expand Up @@ -43,3 +44,29 @@ def get_ci_visibility_job_url(
extra_args = ''.join([f'&{key}={value}' for key, value in extra_args.items()])

return CI_VISIBILITY_JOB_URL.format(name=name, extra_flags=extra_flags, extra_args=extra_args)


def should_notify(pipeline_id):
"""
Check if the pipeline should notify the channel: only for non-downstream pipelines, unless conductor triggered it
"""
from tasks.libs.ciproviders.gitlab_api import get_pipeline

pipeline = get_pipeline(PROJECT_NAME, pipeline_id)
return (
pipeline.source != 'pipeline'
or pipeline.source == 'pipeline'
and all(var in os.environ for var in ['DDR', 'DDR_WORKFLOW_ID'])
)


def get_pipeline_type(pipeline):
"""
Return the type of notification to send (related to the type of pipeline, amongst 'deploy', 'trigger' and 'merge')
"""
if os.environ.get('DEPLOY_AGENT', '') == 'true':
return 'deploy'
elif pipeline.source != 'push':
return 'trigger'
else:
return 'merge'
18 changes: 12 additions & 6 deletions tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from tasks.libs.common.utils import gitlab_section
from tasks.libs.notify import alerts, failure_summary, pipeline_status
from tasks.libs.notify.jira_failing_tests import close_issue, get_failing_tests_names, get_jira
from tasks.libs.notify.utils import PROJECT_NAME
from tasks.libs.notify.utils import PROJECT_NAME, should_notify
from tasks.libs.pipeline.notifications import (
check_for_missing_owners_slack_and_jira,
)
Expand All @@ -41,13 +41,16 @@ def check_teams(_):


@task
def send_message(ctx: Context, notification_type: str = "merge", dry_run: bool = False):
def send_message(ctx: Context, pipeline_id: str, dry_run: bool = False):
"""
Send notifications for the current pipeline. CI-only task.
Use the --dry-run option to test this locally, without sending
real slack messages.
"""
pipeline_status.send_message(ctx, notification_type, dry_run)
if should_notify(pipeline_id):
pipeline_status.send_message(ctx, pipeline_id, dry_run)
else:
print("This pipeline is a non-conductor downstream pipeline, skipping notifications")


@task
Expand All @@ -72,7 +75,7 @@ def send_stats(_, dry_run=False):


@task
def check_consistent_failures(ctx, job_failures_file="job_executions.v2.json"):
def check_consistent_failures(ctx, pipeline_id, job_failures_file="job_executions.v2.json"):
# Retrieve the stored document in aws s3. It has the following format:
# {
# "pipeline_id": 123,
Expand All @@ -87,13 +90,16 @@ def check_consistent_failures(ctx, job_failures_file="job_executions.v2.json"):
# The jobs dictionary contains the consecutive and cumulative failures for each job
# The consecutive failures are reset to 0 when the job is not failing, and are raising an alert when reaching the CONSECUTIVE_THRESHOLD (3)
# The cumulative failures list contains 1 for failures, 0 for succes. They contain only then CUMULATIVE_LENGTH(10) last executions and raise alert when 50% failure rate is reached
if not should_notify(pipeline_id) or os.environ['CI_COMMIT_BRANCH'] != os.environ['CI_DEFAULT_BRANCH']:
print("Consistent failures check is only run on the not-downstream default branch")
return

job_executions = alerts.retrieve_job_executions(ctx, job_failures_file)

# By-pass if the pipeline chronological order is not respected
if job_executions.pipeline_id > int(os.environ["CI_PIPELINE_ID"]):
if job_executions.pipeline_id > int(pipeline_id):
return
job_executions.pipeline_id = int(os.environ["CI_PIPELINE_ID"])
job_executions.pipeline_id = int(pipeline_id)

alert_jobs, job_executions = alerts.update_statistics(job_executions)

Expand Down
32 changes: 30 additions & 2 deletions tasks/unit_tests/libs/notify/alerts_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ def test_job_executions(path="tasks/unit_tests/testdata/job_executions.json"):


class TestCheckConsistentFailures(unittest.TestCase):
@patch.dict(
'os.environ',
{
'CI_PIPELINE_ID': '456',
'CI_PIPELINE_SOURCE': 'push',
'CI_COMMIT_BRANCH': 'taylor-swift',
'CI_DEFAULT_BRANCH': 'taylor-swift',
},
)
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_nominal(self, api_mock):
os.environ["CI_PIPELINE_ID"] = "456"

repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list
Expand All @@ -44,12 +51,33 @@ def test_nominal(self, api_mock):
with test_job_executions() as path:
notify.check_consistent_failures(
MockContext(run=Result("test")),
1979,
path,
)

repo_mock.jobs.get.assert_called()
trace_mock.assert_called()
list_mock.assert_called()

@patch.dict(
'os.environ',
{
'CI_PIPELINE_ID': '456',
'CI_PIPELINE_SOURCE': 'push',
'CI_COMMIT_BRANCH': 'taylor',
'CI_DEFAULT_BRANCH': 'swift',
},
)
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_dismiss(self, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
with test_job_executions() as path:
notify.check_consistent_failures(
MockContext(run=Result("test")),
path,
)
repo_mock.jobs.get.assert_not_called()


class TestAlertsRetrieveJobExecutionsCreated(unittest.TestCase):
job_executions = None
Expand Down
104 changes: 96 additions & 8 deletions tasks/unit_tests/notify_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,34 @@ def get_github_slack_map():


class TestSendMessage(unittest.TestCase):
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge(self, api_mock):
@patch('builtins.print')
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_merge(self, api_mock, print_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"
list_mock = repo_mock.pipelines.get.return_value.jobs.list
list_mock.side_effect = [get_fake_jobs(), []]
notify.send_message(MockContext(), notification_type="merge", dry_run=True)
with patch.dict('os.environ', {}, clear=True):
notify.send_message(MockContext(), "42", dry_run=True)
list_mock.assert_called()
repo_mock.pipelines.get.assert_called_with("42")
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
repo_mock.jobs.get.assert_called()

@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('tasks.libs.notify.pipeline_status.get_failed_jobs')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_without_get_failed_call(self, get_failed_jobs_mock, api_mock):
def test_merge_without_get_failed_call(self, print_mock, get_failed_jobs_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

failed = FailedJobs()
failed.add_failed_job(
Expand Down Expand Up @@ -114,9 +122,11 @@ def test_merge_without_get_failed_call(self, get_failed_jobs_mock, api_mock):
)
)
get_failed_jobs_mock.return_value = failed
notify.send_message(MockContext(), notification_type="merge", dry_run=True)

with patch.dict('os.environ', {}, clear=True):
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
get_failed_jobs_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch("tasks.libs.owners.parsing.read_owners")
def test_route_e2e_internal_error(self, read_owners_mock):
Expand Down Expand Up @@ -192,8 +202,51 @@ def test_route_e2e_internal_error(self, read_owners_mock):
self.assertNotIn("@DataDog/agent-delivery", owners)

@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

with patch.dict('os.environ', {}, clear=True):
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'true'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_deploy_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("rocket" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_with_get_failed_call(self, api_mock):
def test_trigger_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list
Expand All @@ -202,11 +255,46 @@ def test_merge_with_get_failed_call(self, api_mock):
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "api"

with patch.dict('os.environ', {}, clear=True):
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DDR': 'true', 'DDR_WORKFLOW_ID': '1337', 'DEPLOY_AGENT': 'false'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_trigger_with_get_failed_call_conductor(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

notify.send_message(MockContext(), notification_type="merge", dry_run=True)
trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "pipeline"

notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
def test_dismiss_notification(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.pipelines.get.return_value.source = "pipeline"

with patch.dict('os.environ', {}, clear=True):
notify.send_message(MockContext(), "42", dry_run=True)
repo_mock.jobs.get.assert_not_called()
print_mock.assert_called_with("This pipeline is a non-conductor downstream pipeline, skipping notifications")

def test_post_to_channel1(self):
self.assertFalse(pipeline_status.should_send_message_to_author("main", default_branch="main"))
Expand Down

0 comments on commit cb3783d

Please sign in to comment.