Skip to content

Commit

Permalink
feat(notify): Add conductor scheduled pipelines to the notification r…
Browse files Browse the repository at this point in the history
…ules (#32094)
  • Loading branch information
chouetz authored Dec 13, 2024
1 parent 3b788c9 commit 7c5ac02
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 30 deletions.
19 changes: 2 additions & 17 deletions .gitlab/notify/notify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,8 @@ notify:
- GITLAB_TOKEN=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $GITLAB_TOKEN read_api) || exit $?; export GITLAB_TOKEN
- DD_API_KEY=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $AGENT_API_KEY_ORG2 token) || exit $?; export DD_API_KEY
- python3 -m pip install -r requirements.txt -r tasks/libs/requirements-notifications.txt
- |
# Do not send notifications if this is a child pipeline of another repo
# The triggering repo should already have its own notification system
if [ "$CI_PIPELINE_SOURCE" != "pipeline" ]; then
if [ "$DEPLOY_AGENT" = "true" ]; then
invoke -e notify.send-message --notification-type "deploy"
elif [ "$CI_PIPELINE_SOURCE" != "push" ]; then
invoke -e notify.send-message --notification-type "trigger"
else
invoke -e notify.send-message --notification-type "merge"
fi
if [ "$CI_COMMIT_BRANCH" = "$CI_DEFAULT_BRANCH" ]; then
invoke notify.check-consistent-failures
fi
else
echo "This pipeline was triggered by another repository, skipping notification."
fi
- invoke -e notify.send-message
- invoke -e notify.check-consistent-failures

send_pipeline_stats:
stage: notify
Expand Down
28 changes: 28 additions & 0 deletions tasks/libs/notify/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import re
from typing import Any
from urllib.parse import quote
Expand Down Expand Up @@ -43,3 +44,30 @@ def get_ci_visibility_job_url(
extra_args = ''.join([f'&{key}={value}' for key, value in extra_args.items()])

return CI_VISIBILITY_JOB_URL.format(name=name, extra_flags=extra_flags, extra_args=extra_args)


def should_notify():
"""
Check if the pipeline should notify the channel: only for non-downstream pipelines, unless conductor triggered it
"""
from tasks.libs.ciproviders.gitlab_api import get_pipeline

CONDUCTOR_ID = 8278
pipeline = get_pipeline(PROJECT_NAME, os.environ['CI_PIPELINE_ID'])
return (
os.environ['CI_PIPELINE_SOURCE'] != 'pipeline'
or os.environ['CI_PIPELINE_SOURCE'] == 'pipeline'
and pipeline.user['id'] == CONDUCTOR_ID
)


def notification_type():
"""
Return the type of notification to send (related to the type of pipeline, amongst 'deploy', 'trigger' and 'merge')
"""
if os.environ['DEPLOY_AGENT'] == 'true':
return 'deploy'
elif os.environ['CI_PIPELINE_SOURCE'] != 'push':
return 'trigger'
else:
return 'merge'
12 changes: 9 additions & 3 deletions tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from tasks.libs.common.utils import gitlab_section
from tasks.libs.notify import alerts, failure_summary, pipeline_status
from tasks.libs.notify.jira_failing_tests import close_issue, get_failing_tests_names, get_jira
from tasks.libs.notify.utils import PROJECT_NAME
from tasks.libs.notify.utils import PROJECT_NAME, notification_type, should_notify
from tasks.libs.pipeline.notifications import (
check_for_missing_owners_slack_and_jira,
)
Expand All @@ -41,13 +41,16 @@ def check_teams(_):


@task
def send_message(ctx: Context, notification_type: str = "merge", dry_run: bool = False):
def send_message(ctx: Context, dry_run: bool = False):
"""
Send notifications for the current pipeline. CI-only task.
Use the --dry-run option to test this locally, without sending
real slack messages.
"""
pipeline_status.send_message(ctx, notification_type, dry_run)
if should_notify():
pipeline_status.send_message(ctx, notification_type(), dry_run)
else:
print("This pipeline is a non-conductor downstream pipeline, skipping notifications")


@task
Expand Down Expand Up @@ -87,6 +90,9 @@ def check_consistent_failures(ctx, job_failures_file="job_executions.v2.json"):
# The jobs dictionary contains the consecutive and cumulative failures for each job
# The consecutive failures are reset to 0 when the job is not failing, and are raising an alert when reaching the CONSECUTIVE_THRESHOLD (3)
# The cumulative failures list contains 1 for failures, 0 for succes. They contain only then CUMULATIVE_LENGTH(10) last executions and raise alert when 50% failure rate is reached
if not should_notify() or os.environ['CI_COMMIT_BRANCH'] != os.environ['CI_DEFAULT_BRANCH']:
print("Consistent failures check is only run on the not-downstream default branch")
return

job_executions = alerts.retrieve_job_executions(ctx, job_failures_file)

Expand Down
31 changes: 29 additions & 2 deletions tasks/unit_tests/libs/notify/alerts_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ def test_job_executions(path="tasks/unit_tests/testdata/job_executions.json"):


class TestCheckConsistentFailures(unittest.TestCase):
@patch.dict(
'os.environ',
{
'CI_PIPELINE_ID': '456',
'CI_PIPELINE_SOURCE': 'push',
'CI_COMMIT_BRANCH': 'taylor-swift',
'CI_DEFAULT_BRANCH': 'taylor-swift',
},
)
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_nominal(self, api_mock):
os.environ["CI_PIPELINE_ID"] = "456"

repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list
Expand All @@ -47,9 +54,29 @@ def test_nominal(self, api_mock):
path,
)

repo_mock.jobs.get.assert_called()
trace_mock.assert_called()
list_mock.assert_called()

@patch.dict(
'os.environ',
{
'CI_PIPELINE_ID': '456',
'CI_PIPELINE_SOURCE': 'push',
'CI_COMMIT_BRANCH': 'taylor',
'CI_DEFAULT_BRANCH': 'swift',
},
)
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_dismiss(self, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
with test_job_executions() as path:
notify.check_consistent_failures(
MockContext(run=Result("test")),
path,
)
repo_mock.jobs.get.assert_not_called()


class TestAlertsRetrieveJobExecutionsCreated(unittest.TestCase):
job_executions = None
Expand Down
96 changes: 88 additions & 8 deletions tasks/unit_tests/notify_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,29 @@ def get_github_slack_map():


class TestSendMessage(unittest.TestCase):
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge(self, api_mock):
@patch('builtins.print')
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_merge(self, api_mock, print_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
repo_mock.pipelines.get.return_value.ref = "test"
list_mock = repo_mock.pipelines.get.return_value.jobs.list
list_mock.side_effect = [get_fake_jobs(), []]
notify.send_message(MockContext(), notification_type="merge", dry_run=True)
notify.send_message(MockContext(), dry_run=True)
list_mock.assert_called()
repo_mock.pipelines.get.assert_called_with('42')
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('tasks.libs.notify.pipeline_status.get_failed_jobs')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_without_get_failed_call(self, get_failed_jobs_mock, api_mock):
def test_merge_without_get_failed_call(self, print_mock, get_failed_jobs_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
Expand Down Expand Up @@ -114,9 +121,10 @@ def test_merge_without_get_failed_call(self, get_failed_jobs_mock, api_mock):
)
)
get_failed_jobs_mock.return_value = failed
notify.send_message(MockContext(), notification_type="merge", dry_run=True)

notify.send_message(MockContext(), dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
get_failed_jobs_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch("tasks.libs.owners.parsing.read_owners")
def test_route_e2e_internal_error(self, read_owners_mock):
Expand Down Expand Up @@ -191,9 +199,51 @@ def test_route_e2e_internal_error(self, read_owners_mock):
self.assertNotIn("@DataDog/agent-devx-loops", owners)
self.assertNotIn("@DataDog/agent-delivery", owners)

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"

notify.send_message(MockContext(), dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'true', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_deploy_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"

notify.send_message(MockContext(), dry_run=True)
self.assertTrue("rocket" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'api', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_merge_with_get_failed_call(self, api_mock):
def test_trigger_with_get_failed_call(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list
Expand All @@ -203,10 +253,40 @@ def test_merge_with_get_failed_call(self, api_mock):
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"

notify.send_message(MockContext(), notification_type="merge", dry_run=True)
notify.send_message(MockContext(), dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'pipeline', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
def test_trigger_with_get_failed_call_conductor(self, print_mock, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
trace_mock = repo_mock.jobs.get.return_value.trace
list_mock = repo_mock.pipelines.get.return_value.jobs.list

trace_mock.return_value = b"no basic auth credentials"
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.user.__getitem__.return_value = 8278

notify.send_message(MockContext(), dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'CI_PIPELINE_SOURCE': 'pipeline', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_dismiss_notification(self, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value

notify.send_message(MockContext(), dry_run=True)
repo_mock.jobs.get.assert_not_called()

def test_post_to_channel1(self):
self.assertFalse(pipeline_status.should_send_message_to_author("main", default_branch="main"))
Expand Down

0 comments on commit 7c5ac02

Please sign in to comment.