Skip to content

Commit

Permalink
codereview: Reduce dependency on environment variables, improve namin…
Browse files Browse the repository at this point in the history
…g and change the conductor origin detection
  • Loading branch information
chouetz committed Dec 19, 2024
1 parent f193de1 commit acddb29
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 42 deletions.
4 changes: 2 additions & 2 deletions .gitlab/notify/notify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ notify:
- GITLAB_TOKEN=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $GITLAB_TOKEN read_api) || exit $?; export GITLAB_TOKEN
- DD_API_KEY=$($CI_PROJECT_DIR/tools/ci/fetch_secret.sh $AGENT_API_KEY_ORG2 token) || exit $?; export DD_API_KEY
- python3 -m pip install -r requirements.txt -r tasks/libs/requirements-notifications.txt
- invoke -e notify.send-message
- invoke -e notify.check-consistent-failures
- invoke -e notify.send-message -p $CI_PIPELINE_ID
- invoke -e notify.check-consistent-failures -p $CI_PIPELINE_ID

send_pipeline_stats:
stage: notify
Expand Down
16 changes: 8 additions & 8 deletions tasks/libs/notify/pipeline_status.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import os
import re

from tasks.libs.ciproviders.gitlab_api import get_commit, get_pipeline
from tasks.libs.common.git import get_default_branch
from tasks.libs.notify.utils import DEPLOY_PIPELINES_CHANNEL, PIPELINES_CHANNEL, PROJECT_NAME
from tasks.libs.notify.utils import DEPLOY_PIPELINES_CHANNEL, PIPELINES_CHANNEL, PROJECT_NAME, get_pipeline_type
from tasks.libs.pipeline.data import get_failed_jobs
from tasks.libs.pipeline.notifications import (
base_message,
Expand All @@ -23,8 +22,8 @@ def should_send_message_to_author(git_ref: str, default_branch: str) -> bool:
return not (git_ref == default_branch or release_ref_regex.match(git_ref) or release_ref_regex_rc.match(git_ref))


def send_message(ctx, notification_type, dry_run):
pipeline = get_pipeline(PROJECT_NAME, os.environ["CI_PIPELINE_ID"])
def send_message(ctx, pipeline_id, dry_run):
pipeline = get_pipeline(PROJECT_NAME, pipeline_id)
commit = get_commit(PROJECT_NAME, pipeline.sha)
failed_jobs = get_failed_jobs(pipeline)

Expand All @@ -40,15 +39,16 @@ def send_message(ctx, notification_type, dry_run):
# For deploy pipelines not on the main branch, send notifications in a
# dedicated channel.
slack_channel = PIPELINES_CHANNEL
if notification_type == "deploy" and pipeline.ref != get_default_branch():
pipeline_type = get_pipeline_type(pipeline)
if pipeline_type == "deploy" and pipeline.ref != get_default_branch():
slack_channel = DEPLOY_PIPELINES_CHANNEL

header = ""
if notification_type == "merge":
if pipeline_type == "merge":
header = f"{header_icon} :merged: datadog-agent merge"
elif notification_type == "deploy":
elif pipeline_type == "deploy":
header = f"{header_icon} :rocket: datadog-agent deploy"
elif notification_type == "trigger":
elif pipeline_type == "trigger":
header = f"{header_icon} :arrow_forward: datadog-agent triggered"

message = SlackMessage(jobs=failed_jobs)
Expand Down
15 changes: 7 additions & 8 deletions tasks/libs/notify/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,28 +46,27 @@ def get_ci_visibility_job_url(
return CI_VISIBILITY_JOB_URL.format(name=name, extra_flags=extra_flags, extra_args=extra_args)


def should_notify():
def should_notify(pipeline_id):
"""
Check if the pipeline should notify the channel: only for non-downstream pipelines, unless conductor triggered it
"""
from tasks.libs.ciproviders.gitlab_api import get_pipeline

CONDUCTOR_ID = 8278
pipeline = get_pipeline(PROJECT_NAME, os.environ['CI_PIPELINE_ID'])
pipeline = get_pipeline(PROJECT_NAME, pipeline_id)
return (
os.environ['CI_PIPELINE_SOURCE'] != 'pipeline'
or os.environ['CI_PIPELINE_SOURCE'] == 'pipeline'
and pipeline.user['id'] == CONDUCTOR_ID
pipeline.source != 'pipeline'
or pipeline.source == 'pipeline'
and all(var in os.environ for var in ['DDR', 'DDR_WORKFLOW_ID'])
)


def notification_type():
def get_pipeline_type(pipeline):
"""
Return the type of notification to send (related to the type of pipeline, amongst 'deploy', 'trigger' and 'merge')
"""
if os.environ.get('DEPLOY_AGENT', '') == 'true':
return 'deploy'
elif os.environ['CI_PIPELINE_SOURCE'] != 'push':
elif pipeline.source != 'push':
return 'trigger'
else:
return 'merge'
16 changes: 8 additions & 8 deletions tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from tasks.libs.common.utils import gitlab_section
from tasks.libs.notify import alerts, failure_summary, pipeline_status
from tasks.libs.notify.jira_failing_tests import close_issue, get_failing_tests_names, get_jira
from tasks.libs.notify.utils import PROJECT_NAME, notification_type, should_notify
from tasks.libs.notify.utils import PROJECT_NAME, should_notify
from tasks.libs.pipeline.notifications import (
check_for_missing_owners_slack_and_jira,
)
Expand All @@ -41,14 +41,14 @@ def check_teams(_):


@task
def send_message(ctx: Context, dry_run: bool = False):
def send_message(ctx: Context, pipeline_id: str, dry_run: bool = False):
"""
Send notifications for the current pipeline. CI-only task.
Use the --dry-run option to test this locally, without sending
real slack messages.
"""
if should_notify():
pipeline_status.send_message(ctx, notification_type(), dry_run)
if should_notify(pipeline_id):
pipeline_status.send_message(ctx, pipeline_id, dry_run)
else:
print("This pipeline is a non-conductor downstream pipeline, skipping notifications")

Expand All @@ -75,7 +75,7 @@ def send_stats(_, dry_run=False):


@task
def check_consistent_failures(ctx, job_failures_file="job_executions.v2.json"):
def check_consistent_failures(ctx, pipeline_id, job_failures_file="job_executions.v2.json"):
# Retrieve the stored document in aws s3. It has the following format:
# {
# "pipeline_id": 123,
Expand All @@ -90,16 +90,16 @@ def check_consistent_failures(ctx, job_failures_file="job_executions.v2.json"):
# The jobs dictionary contains the consecutive and cumulative failures for each job
# The consecutive failures are reset to 0 when the job is not failing, and are raising an alert when reaching the CONSECUTIVE_THRESHOLD (3)
# The cumulative failures list contains 1 for failures, 0 for succes. They contain only then CUMULATIVE_LENGTH(10) last executions and raise alert when 50% failure rate is reached
if not should_notify() or os.environ['CI_COMMIT_BRANCH'] != os.environ['CI_DEFAULT_BRANCH']:
if not should_notify(pipeline_id) or os.environ['CI_COMMIT_BRANCH'] != os.environ['CI_DEFAULT_BRANCH']:
print("Consistent failures check is only run on the not-downstream default branch")
return

job_executions = alerts.retrieve_job_executions(ctx, job_failures_file)

# By-pass if the pipeline chronological order is not respected
if job_executions.pipeline_id > int(os.environ["CI_PIPELINE_ID"]):
if job_executions.pipeline_id > int(pipeline_id):
return
job_executions.pipeline_id = int(os.environ["CI_PIPELINE_ID"])
job_executions.pipeline_id = int(pipeline_id)

alert_jobs, job_executions = alerts.update_statistics(job_executions)

Expand Down
1 change: 1 addition & 0 deletions tasks/unit_tests/libs/notify/alerts_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def test_nominal(self, api_mock):
with test_job_executions() as path:
notify.check_consistent_failures(
MockContext(run=Result("test")),
1979,
path,
)

Expand Down
33 changes: 17 additions & 16 deletions tasks/unit_tests/notify_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ def get_github_slack_map():


class TestSendMessage(unittest.TestCase):
@patch.dict('os.environ', {'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
@patch('builtins.print')
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
Expand All @@ -40,15 +39,15 @@ def test_merge(self, api_mock, print_mock):
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"
list_mock = repo_mock.pipelines.get.return_value.jobs.list
list_mock.side_effect = [get_fake_jobs(), []]
notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
list_mock.assert_called()
repo_mock.pipelines.get.assert_called_with('42')
repo_mock.pipelines.get.assert_called_with("42")
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('tasks.libs.notify.pipeline_status.get_failed_jobs')
@patch('builtins.print')
Expand All @@ -58,6 +57,7 @@ def test_merge_without_get_failed_call(self, print_mock, get_failed_jobs_mock, a
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.jobs.get.return_value.trace.return_value = b"Log trace"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

failed = FailedJobs()
failed.add_failed_job(
Expand Down Expand Up @@ -121,7 +121,7 @@ def test_merge_without_get_failed_call(self, print_mock, get_failed_jobs_mock, a
)
)
get_failed_jobs_mock.return_value = failed
notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
get_failed_jobs_mock.assert_called()
repo_mock.jobs.get.assert_called()
Expand Down Expand Up @@ -199,7 +199,6 @@ def test_route_e2e_internal_error(self, read_owners_mock):
self.assertNotIn("@DataDog/agent-devx-loops", owners)
self.assertNotIn("@DataDog/agent-delivery", owners)

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
Expand All @@ -212,14 +211,15 @@ def test_merge_with_get_failed_call(self, print_mock, api_mock):
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("merge" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'true', 'CI_PIPELINE_SOURCE': 'push', 'CI_PIPELINE_ID': '42'})
@patch.dict('os.environ', {'DEPLOY_AGENT': 'true'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
Expand All @@ -232,14 +232,14 @@ def test_deploy_with_get_failed_call(self, print_mock, api_mock):
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "push"

notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("rocket" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'api', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
Expand All @@ -252,14 +252,15 @@ def test_trigger_with_get_failed_call(self, print_mock, api_mock):
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.source = "api"

notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'DEPLOY_AGENT': 'false', 'CI_PIPELINE_SOURCE': 'pipeline', 'CI_PIPELINE_ID': '42'})
@patch.dict('os.environ', {'DDR': 'true', 'DDR_WORKFLOW_ID': '1337'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
@patch('builtins.print')
@patch('tasks.libs.pipeline.notifications.get_pr_from_commit', new=MagicMock(return_value=""))
Expand All @@ -272,20 +273,20 @@ def test_trigger_with_get_failed_call_conductor(self, print_mock, api_mock):
list_mock.return_value = get_fake_jobs()
repo_mock.jobs.get.return_value.artifact.return_value = b"{}"
repo_mock.pipelines.get.return_value.ref = "test"
repo_mock.pipelines.get.return_value.user.__getitem__.return_value = 8278
repo_mock.pipelines.get.return_value.source = "pipeline"

notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
self.assertTrue("arrow_forward" in print_mock.mock_calls[0].args[0])
trace_mock.assert_called()
list_mock.assert_called()
repo_mock.jobs.get.assert_called()

@patch.dict('os.environ', {'CI_PIPELINE_SOURCE': 'pipeline', 'CI_PIPELINE_ID': '42'})
@patch('tasks.libs.ciproviders.gitlab_api.get_gitlab_api')
def test_dismiss_notification(self, api_mock):
repo_mock = api_mock.return_value.projects.get.return_value
repo_mock.pipelines.get.return_value.source = "pipeline"

notify.send_message(MockContext(), dry_run=True)
notify.send_message(MockContext(), "42", dry_run=True)
repo_mock.jobs.get.assert_not_called()

def test_post_to_channel1(self):
Expand Down

0 comments on commit acddb29

Please sign in to comment.