diff --git a/hexa/pipelines/graphql/schema.graphql b/hexa/pipelines/graphql/schema.graphql index 436627c4c..6c761f5ed 100644 --- a/hexa/pipelines/graphql/schema.graphql +++ b/hexa/pipelines/graphql/schema.graphql @@ -192,12 +192,23 @@ type PipelineVersionPage { totalItems: Int! # The total number of pipeline versions. } +""" +Represents the notification level for a pipeline recipient. +""" +enum PipelineNotificationLevel { + ALL + ERROR +} + + """ Represents a recipient of a pipeline. """ type PipelineRecipient { + id: UUID! # The ID of the recipient. user: User! # The user who is a recipient of the pipeline. pipeline: Pipeline! # The pipeline associated with the recipient. + notificationLevel: PipelineNotificationLevel! # The event that triggers the notification. } """ @@ -278,6 +289,38 @@ type CreatePipelineResult { } + +enum PipelineRecipientError { + PIPELINE_NOT_FOUND + USER_NOT_FOUND + RECIPIENT_NOT_FOUND + PERMISSION_DENIED + ALREADY_EXISTS +} + +""" +Represents the input for adding a recipient to a pipeline. +""" +input CreatePipelineRecipientInput { + pipelineId: UUID! + userId: UUID! + notificationLevel: PipelineNotificationLevel! +} + +""" +Represents the input for updating a recipient. +""" +input UpdatePipelineRecipientInput { + recipientId: UUID! + notificationLevel: PipelineNotificationLevel! +} +""" +Represents the input for deleting a pipeline recipient. +""" +input DeletePipelineRecipientInput { + recipientId: UUID! +} + """ Represents the input for updating a pipeline. """ @@ -287,7 +330,6 @@ input UpdatePipelineInput { config: JSON # The new configuration for the pipeline. schedule: String # The new schedule for running the pipeline. description: String # The new description of the pipeline. - recipientIds: [UUID!] # The IDs of the recipients of the pipeline. webhookEnabled: Boolean # Indicates if the webhook should be enabled for the pipeline. } @@ -392,6 +434,23 @@ input UploadPipelineInput { timeout: Int # The timeout value for the pipeline. } +type AddPipelineRecipientResult { + success: Boolean! + errors: [PipelineRecipientError!]! + recipient: PipelineRecipient +} + +type UpdatePipelineRecipientResult { + success: Boolean! + errors: [PipelineRecipientError!]! + recipient: PipelineRecipient +} + +type DeletePipelineRecipientResult { + success: Boolean! + errors: [PipelineRecipientError!]! +} + """ Represents the result of uploading a pipeline. """ @@ -602,5 +661,23 @@ extend type Mutation { Deletes a pipeline version. """ deletePipelineVersion(input: DeletePipelineVersionInput!): DeletePipelineVersionResult! + + """ + Generates a webhook URL for a pipeline. + """ generatePipelineWebhookUrl(input: GeneratePipelineWebhookUrlInput!): GeneratePipelineWebhookUrlResult! + + """ + Adds a recipient to a pipeline. + """ + addPipelineRecipient(input: CreatePipelineRecipientInput!): AddPipelineRecipientResult! + """ + Updates a pipeline recipient. + """ + updatePipelineRecipient(input: UpdatePipelineRecipientInput!): UpdatePipelineRecipientResult! + + """ + Deletes a pipeline recipient. + """ + deletePipelineRecipient(input: DeletePipelineRecipientInput!): DeletePipelineRecipientResult! } diff --git a/hexa/pipelines/management/commands/pipelines_scheduler.py b/hexa/pipelines/management/commands/pipelines_scheduler.py index 677bd8ddf..b29eafe6b 100644 --- a/hexa/pipelines/management/commands/pipelines_scheduler.py +++ b/hexa/pipelines/management/commands/pipelines_scheduler.py @@ -52,19 +52,22 @@ def handle(self, *args, **options): user=None, pipeline_version=pipeline.last_version, trigger_mode=PipelineRunTrigger.SCHEDULED, - send_mail_notifications=pipeline.recipients.count() > 0, ) track( request=None, event="pipelines.pipeline_run", properties={ "pipeline_id": pipeline.code, - "version_name": pipeline.last_version.name - if pipeline.last_version - else None, - "version_id": str(pipeline.last_version.id) - if pipeline.last_version - else None, + "version_name": ( + pipeline.last_version.name + if pipeline.last_version + else None + ), + "version_id": ( + str(pipeline.last_version.id) + if pipeline.last_version + else None + ), "trigger": PipelineRunTrigger.SCHEDULED, "workspace": pipeline.workspace.slug, }, diff --git a/hexa/pipelines/migrations/0046_pipelinerecipient_notification_level_and_more.py b/hexa/pipelines/migrations/0046_pipelinerecipient_notification_level_and_more.py new file mode 100644 index 000000000..c28bd9e90 --- /dev/null +++ b/hexa/pipelines/migrations/0046_pipelinerecipient_notification_level_and_more.py @@ -0,0 +1,26 @@ +# Generated by Django 5.0.9 on 2024-11-12 08:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("pipelines", "0045_alter_pipelinerun_duration_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="pipelinerecipient", + name="notification_level", + field=models.CharField( + choices=[("ALL", "All"), ("ERROR", "Error")], + default="ALL", + max_length=200, + ), + ), + migrations.AlterField( + model_name="pipelinerun", + name="send_mail_notifications", + field=models.BooleanField(default=True), + ), + ] diff --git a/hexa/pipelines/models.py b/hexa/pipelines/models.py index b27eae1be..439544195 100644 --- a/hexa/pipelines/models.py +++ b/hexa/pipelines/models.py @@ -32,7 +32,7 @@ SoftDeleteQuerySet, ) from hexa.user_management.models import User -from hexa.workspaces.models import Workspace, WorkspaceMembership +from hexa.workspaces.models import Workspace class PipelineDoesNotSupportParametersError(Exception): @@ -243,7 +243,7 @@ def run( pipeline_version: PipelineVersion, trigger_mode: PipelineRunTrigger, config: typing.Mapping[typing.Dict, typing.Any] | None = None, - send_mail_notifications: bool = False, + send_mail_notifications: bool = True, ): timeout = settings.PIPELINE_RUN_DEFAULT_TIMEOUT if pipeline_version and pipeline_version.timeout: @@ -338,15 +338,6 @@ def update_if_has_perm(self, principal: User, **kwargs): if "webhook_enabled" in kwargs: self.set_webhook_state(kwargs["webhook_enabled"]) - if "recipient_ids" in kwargs: - PipelineRecipient.objects.filter( - Q(pipeline=self) & ~Q(user_id__in=kwargs["recipient_ids"]) - ).delete() - for member in WorkspaceMembership.objects.filter( - workspace=self.workspace, user_id__in=kwargs["recipient_ids"] - ): - PipelineRecipient.objects.get_or_create(user=member.user, pipeline=self) - return self.save() def delete_if_has_perm(self, *, principal: User): @@ -412,6 +403,25 @@ def __str__(self): return self.code +class PipelineNotificationLevel(models.TextChoices): + ALL = "ALL", _("All") + ERROR = "ERROR", _("Error") + + +class PipelineRecipientManager(models.Manager): + def create_if_has_perm( + self, + principal: User, + pipeline: Pipeline, + user: User, + level: PipelineNotificationLevel, + ): + if not principal.has_perm("pipelines.update_pipeline", pipeline): + raise PermissionDenied + + return self.create(pipeline=pipeline, user=user, notification_level=level) + + class PipelineRecipient(Base): class Meta: ordering = ("-updated_at",) @@ -427,6 +437,26 @@ class Meta: "user_management.User", null=False, on_delete=models.CASCADE ) pipeline = models.ForeignKey(Pipeline, null=False, on_delete=models.CASCADE) + notification_level = models.CharField( + max_length=200, + blank=False, + default=PipelineNotificationLevel.ALL, + choices=PipelineNotificationLevel.choices, + ) + objects = PipelineRecipientManager() + + def update_if_has_perm(self, *, principal: User, level: PipelineNotificationLevel): + if not principal.has_perm("pipelines.update_pipeline", self.pipeline): + raise PermissionDenied + + self.notification_level = level + return self.save() + + def delete_if_has_perm(self, *, principal: User): + if not principal.has_perm("pipelines.update_pipeline", self.pipeline): + raise PermissionDenied + + return self.delete() class PipelineRunQuerySet(BaseQuerySet): @@ -479,8 +509,8 @@ class Meta: outputs = models.JSONField(null=True, blank=True, default=list) run_logs = models.TextField(null=True, blank=True) current_progress = models.PositiveSmallIntegerField(default=0) - send_mail_notifications = models.BooleanField(default=False) timeout = models.IntegerField(null=True) + send_mail_notifications = models.BooleanField(default=True) stopped_by = models.ForeignKey( "user_management.User", null=True, diff --git a/hexa/pipelines/schema/mutations.py b/hexa/pipelines/schema/mutations.py index 0244244d9..d6ae5f6bb 100644 --- a/hexa/pipelines/schema/mutations.py +++ b/hexa/pipelines/schema/mutations.py @@ -15,12 +15,14 @@ MissingPipelineConfiguration, Pipeline, PipelineDoesNotSupportParametersError, + PipelineRecipient, PipelineRun, PipelineRunState, PipelineRunTrigger, PipelineType, PipelineVersion, ) +from hexa.user_management.models import User from hexa.workspaces.models import Workspace pipelines_mutations = MutationType() @@ -378,6 +380,104 @@ def resolve_delete_pipeline_version(_, info, **kwargs): } +@pipelines_mutations.field("addPipelineRecipient") +def resolve_add_pipeline_recipient(_, info, **kwargs): + request: HttpRequest = info.context["request"] + input = kwargs["input"] + try: + pipeline = Pipeline.objects.filter_for_user(request.user).get( + id=input.get("pipelineId") + ) + user = User.objects.get(id=input["userId"]) + + recipient = PipelineRecipient.objects.create_if_has_perm( + principal=request.user, + pipeline=pipeline, + user=user, + level=input["notificationLevel"], + ) + return { + "success": True, + "errors": [], + "recipient": recipient, + } + except Pipeline.DoesNotExist: + return { + "success": False, + "errors": ["PIPELINE_NOT_FOUND"], + } + except User.DoesNotExist: + return { + "success": False, + "errors": ["USER_NOT_FOUND"], + } + except PermissionDenied: + return { + "success": False, + "errors": ["PERMISSION_DENIED"], + } + except IntegrityError: + return { + "success": False, + "errors": ["ALREADY_EXISTS"], + } + + +@pipelines_mutations.field("updatePipelineRecipient") +def resolve_update_pipeline_recipient(_, info, **kwargs): + request: HttpRequest = info.context["request"] + input = kwargs["input"] + + try: + recipient = PipelineRecipient.objects.get( + id=input["recipientId"], + ) + recipient.update_if_has_perm( + principal=request.user, level=input["notificationLevel"] + ) + return { + "success": True, + "errors": [], + "recipient": recipient, + } + except PipelineRecipient.DoesNotExist: + return { + "success": False, + "errors": ["RECIPIENT_NOT_FOUND"], + } + except PermissionDenied: + return { + "success": False, + "errors": ["PERMISSION_DENIED"], + } + + +@pipelines_mutations.field("deletePipelineRecipient") +def resolve_delete_pipeline_recipient(_, info, **kwargs): + request: HttpRequest = info.context["request"] + input = kwargs["input"] + + try: + recipient = PipelineRecipient.objects.get( + id=input["recipientId"], + ) + recipient.delete_if_has_perm(principal=request.user) + return { + "success": True, + "errors": [], + } + except PipelineRecipient.DoesNotExist: + return { + "success": False, + "errors": ["RECIPIENT_NOT_FOUND"], + } + except PermissionDenied: + return { + "success": False, + "errors": ["PERMISSION_DENIED"], + } + + @pipelines_mutations.field("logPipelineMessage") def resolve_pipeline_log_message(_, info, **kwargs): request: HttpRequest = info.context["request"] diff --git a/hexa/pipelines/schema/types.py b/hexa/pipelines/schema/types.py index 6cd8f2304..36ed7a057 100644 --- a/hexa/pipelines/schema/types.py +++ b/hexa/pipelines/schema/types.py @@ -8,7 +8,12 @@ from hexa.databases.utils import get_table_definition from hexa.files import storage from hexa.files.backends.base import StorageObject -from hexa.pipelines.models import Pipeline, PipelineRun, PipelineVersion +from hexa.pipelines.models import ( + Pipeline, + PipelineNotificationLevel, + PipelineRun, + PipelineVersion, +) from hexa.workspaces.models import Workspace from hexa.workspaces.schema.types import workspace_permissions @@ -16,6 +21,9 @@ pipeline_version_permissions = ObjectType("PipelineVersionPermissions") pipeline_parameter = ObjectType("PipelineParameter") pipeline_run_status_enum = EnumType("PipelineRunStatus", PipelineRun.STATUS_MAPPINGS) +pipeline_notification_level_enum = EnumType( + "PipelineNotificationLevel", PipelineNotificationLevel +) pipeline_run_order_by_enum = EnumType( "PipelineRunOrderBy", { @@ -276,6 +284,7 @@ def resolve_pipeline_run_dataset_version(run: PipelineRun, info, **kwargs): pipeline_object, pipeline_run_object, pipeline_run_status_enum, + pipeline_notification_level_enum, pipeline_run_order_by_enum, pipeline_version_object, pipeline_version_permissions, diff --git a/hexa/pipelines/tests/test_models.py b/hexa/pipelines/tests/test_models.py new file mode 100644 index 000000000..6abb65790 --- /dev/null +++ b/hexa/pipelines/tests/test_models.py @@ -0,0 +1,158 @@ +from django.core import mail + +from hexa.core.test import TestCase +from hexa.pipelines.models import ( + Pipeline, + PipelineNotificationLevel, + PipelineRecipient, + PipelineRunState, + PipelineRunTrigger, +) +from hexa.pipelines.utils import mail_run_recipients +from hexa.user_management.models import User +from hexa.workspaces.models import ( + Workspace, +) + + +class PipelineTest(TestCase): + def create_recipient( + self, + user: User, + pipeline: Pipeline, + notification_level: PipelineNotificationLevel, + ): + return PipelineRecipient.objects.create( + pipeline=pipeline, + user=user, + notification_level=notification_level, + ) + + @classmethod + def setUpTestData(cls): + cls.USER_ADMIN = User.objects.create_user( + "admin@bluesquarehub.com", + "admin", + analytics_enabled=True, + is_superuser=True, + ) + cls.USER_FOO = User.objects.create_user( + "foo@bluesquarehub.com", + "foopassword", + ) + + cls.USER_BAR = User.objects.create_user("bar@bluesquarehub.com", "barpassword") + + cls.WORKSPACE = Workspace.objects.create_if_has_perm( + cls.USER_ADMIN, + name="Sandbox", + description="This is a sandbox workspace ", + countries=[{"code": "AL"}], + ) + + cls.PIPELINE = Pipeline.objects.create( + workspace=cls.WORKSPACE, + name="Test pipeline", + code="my-pipeline", + description="This is a test pipeline", + ) + cls.PIPELINE.upload_new_version( + cls.USER_ADMIN, zipfile=b"", parameters=[], name="Version" + ) + + def test_mail_run_recipients_mail_not_sent(self): + self.client.force_login(self.USER_ADMIN) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_BAR, + notification_level=PipelineNotificationLevel.ERROR, + ) + + run = self.PIPELINE.run( + user=self.USER_ADMIN, + pipeline_version=self.PIPELINE.last_version, + trigger_mode=PipelineRunTrigger.MANUAL, + config={}, + ) + + run.state = PipelineRunState.SUCCESS + run.save() + + mail_run_recipients(run) + + self.assertEqual(len(mail.outbox), 0) + + def test_mail_run_recipients_mail_success_only_recipients(self): + self.client.force_login(self.USER_ADMIN) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_BAR, + notification_level=PipelineNotificationLevel.ERROR, + ) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_FOO, + notification_level=PipelineNotificationLevel.ERROR, + ) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_ADMIN, + notification_level=PipelineNotificationLevel.ALL, + ) + + run = self.PIPELINE.run( + user=self.USER_ADMIN, + pipeline_version=self.PIPELINE.last_version, + trigger_mode=PipelineRunTrigger.MANUAL, + config={}, + ) + + run.state = PipelineRunState.SUCCESS + run.save() + + mail_run_recipients(run) + recipients = [email.recipients()[0] for email in mail.outbox] + + self.assertEqual(len(mail.outbox), 1) + self.assertTrue( + all( + [ + self.USER_FOO.email not in recipients, + self.USER_BAR.email not in recipients, + ] + ) + ) + self.assertTrue( + self.USER_ADMIN.email in recipients, + ) + + def test_mail_run_recipients_mail_all_recipients(self): + self.client.force_login(self.USER_ADMIN) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_BAR, + notification_level=PipelineNotificationLevel.ERROR, + ) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_FOO, + notification_level=PipelineNotificationLevel.ERROR, + ) + self.create_recipient( + pipeline=self.PIPELINE, + user=self.USER_ADMIN, + notification_level=PipelineNotificationLevel.ALL, + ) + + run = self.PIPELINE.run( + user=self.USER_ADMIN, + pipeline_version=self.PIPELINE.last_version, + trigger_mode=PipelineRunTrigger.MANUAL, + config={}, + ) + + run.state = PipelineRunState.FAILED + run.save() + + mail_run_recipients(run) + self.assertEqual(len(mail.outbox), 3) diff --git a/hexa/pipelines/tests/test_schema/test_pipelines.py b/hexa/pipelines/tests/test_schema/test_pipelines.py index 098d6dc7f..fddf1376d 100644 --- a/hexa/pipelines/tests/test_schema/test_pipelines.py +++ b/hexa/pipelines/tests/test_schema/test_pipelines.py @@ -15,6 +15,7 @@ from hexa.files.backends.exceptions import NotFound from hexa.pipelines.models import ( Pipeline, + PipelineNotificationLevel, PipelineRecipient, PipelineRun, PipelineRunState, @@ -1196,132 +1197,6 @@ def test_delete_pipeline_version(self): r["data"]["deletePipelineVersion"], ) - def test_add_pipeline_recipients(self): - self.test_create_pipeline_version() - pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() - - r = self.run_query( - """ - mutation updatePipeline($input: UpdatePipelineInput!) { - updatePipeline(input: $input) { - success - errors - pipeline { - recipients { - user { - id - } - } - } - } - } - """, - { - "input": { - "id": str(pipeline.id), - "recipientIds": [str(self.USER_ROOT.id)], - } - }, - ) - self.assertEqual( - { - "success": True, - "errors": [], - "pipeline": {"recipients": [{"user": {"id": str(self.USER_ROOT.id)}}]}, - }, - r["data"]["updatePipeline"], - ) - - def test_update_pipeline_recipients(self): - self.test_create_pipeline_version() - pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() - PipelineRecipient.objects.create(pipeline=pipeline, user=self.USER_ROOT) - PipelineRecipient.objects.create(pipeline=pipeline, user=self.USER_LAMBDA) - - self.assertEqual(pipeline.recipients.count(), 2) - - r = self.run_query( - """ - mutation updatePipeline($input: UpdatePipelineInput!) { - updatePipeline(input: $input) { - success - errors - pipeline { - recipients { - user { - id - } - } - } - } - } - """, - { - "input": { - "id": str(pipeline.id), - "recipientIds": [ - str(self.USER_LAMBDA.id), - str(self.USER_SABRINA.id), - ], - } - }, - ) - self.assertEqual( - { - "success": True, - "errors": [], - "pipeline": { - "recipients": [ - {"user": {"id": str(self.USER_SABRINA.id)}}, - {"user": {"id": str(self.USER_LAMBDA.id)}}, - ] - }, - }, - r["data"]["updatePipeline"], - ) - - def test_update_pipeline_recipients_no_workspace_members(self): - self.test_create_pipeline_version() - pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() - r = self.run_query( - """ - mutation updatePipeline($input: UpdatePipelineInput!) { - updatePipeline(input: $input) { - success - errors - pipeline { - recipients { - user { - id - } - } - } - } - } - """, - { - "input": { - "id": str(pipeline.id), - "recipientIds": [ - str(self.USER_LAMBDA.id), - str(self.USER_NOOB.id), - ], - } - }, - ) - self.assertEqual( - { - "success": True, - "errors": [], - "pipeline": { - "recipients": [ - {"user": {"id": str(self.USER_LAMBDA.id)}}, - ] - }, - }, - r["data"]["updatePipeline"], - ) - def test_update_pipeline_public_webhook(self): self.test_create_pipeline_version() pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() @@ -1407,12 +1282,18 @@ def test_mail_run_recipients_manual_trigger(self): pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() version = pipeline.last_version + + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ALL, + ) + run = pipeline.run( user=self.USER_ROOT, pipeline_version=version, trigger_mode=PipelineRunTrigger.MANUAL, config={}, - send_mail_notifications=True, ) mail_run_recipients(run) @@ -1420,7 +1301,7 @@ def test_mail_run_recipients_manual_trigger(self): f"Run report of {pipeline.code} ({run.state.label})", mail.outbox[0].subject, ) - self.assertListEqual([self.USER_ROOT.email], mail.outbox[0].recipients()) + self.assertListEqual([self.USER_SABRINA.email], mail.outbox[0].recipients()) self.assertTrue( f"{settings.NEW_FRONTEND_DOMAIN}/workspaces/{pipeline.workspace.slug}/pipelines/{pipeline.code}/runs/{run.id}" in mail.outbox[0].body @@ -1433,15 +1314,23 @@ def test_mail_run_recipients_scheduled_trigger(self): pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() version = pipeline.last_version - PipelineRecipient.objects.create(pipeline=pipeline, user=self.USER_ROOT) - PipelineRecipient.objects.create(pipeline=pipeline, user=self.USER_LAMBDA) + + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_ROOT, + notification_level=PipelineNotificationLevel.ALL, + ) + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_LAMBDA, + notification_level=PipelineNotificationLevel.ALL, + ) run = pipeline.run( user=self.USER_ROOT, pipeline_version=version, trigger_mode=PipelineRunTrigger.SCHEDULED, config={}, - send_mail_notifications=True, ) mail_run_recipients(run) @@ -2314,3 +2203,361 @@ def test_generate_pipeline_webhook_url_feature_flag_permission_denied(self): {"success": False, "errors": ["PERMISSION_DENIED"]}, r["data"]["generatePipelineWebhookUrl"], ) + + def test_add_pipeline_recipient_pipeline_not_found(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + + r = self.run_query( + """ + mutation addPipelineRecipient($input: CreatePipelineRecipientInput!) { + addPipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "pipelineId": str(uuid.uuid4()), + "userId": str(self.USER_ROOT.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + + self.assertEqual( + {"success": False, "errors": ["PIPELINE_NOT_FOUND"]}, + r["data"]["addPipelineRecipient"], + ) + + def test_add_pipeline_recipient_user_not_found(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() + + r = self.run_query( + """ + mutation addPipelineRecipient($input: CreatePipelineRecipientInput!) { + addPipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "pipelineId": str(pipeline.id), + "userId": str(uuid.uuid4()), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + + self.assertEqual( + {"success": False, "errors": ["USER_NOT_FOUND"]}, + r["data"]["addPipelineRecipient"], + ) + + def test_add_pipeline_recipient_permission_denied(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_SABRINA) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + r = self.run_query( + """ + mutation addPipelineRecipient($input: CreatePipelineRecipientInput!) { + addPipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "pipelineId": str(pipeline.id), + "userId": str(self.USER_SABRINA.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + + self.assertEqual( + {"success": False, "errors": ["PERMISSION_DENIED"]}, + r["data"]["addPipelineRecipient"], + ) + + def test_add_pipeline_recipient_already_exists(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + + pipeline = Pipeline.objects.filter_for_user(user=self.USER_ROOT).first() + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation addPipelineRecipient($input: CreatePipelineRecipientInput!) { + addPipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "pipelineId": str(pipeline.id), + "userId": str(self.USER_SABRINA.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + self.assertEqual( + {"success": False, "errors": ["ALREADY_EXISTS"]}, + r["data"]["addPipelineRecipient"], + ) + + def test_add_pipeline_recipient(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + r = self.run_query( + """ + mutation addPipelineRecipient($input: CreatePipelineRecipientInput!) { + addPipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "pipelineId": str(pipeline.id), + "userId": str(self.USER_SABRINA.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + self.assertEqual( + {"success": True, "errors": []}, + r["data"]["addPipelineRecipient"], + ) + + def test_update_pipeline_recipient(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + recipient = PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation updatePipelineRecipient($input: UpdatePipelineRecipientInput!) { + updatePipelineRecipient(input: $input) { + success + errors + recipient { + notificationLevel + } + } + } + """, + { + "input": { + "recipientId": str(recipient.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + self.assertEqual( + { + "success": True, + "errors": [], + "recipient": {"notificationLevel": PipelineNotificationLevel.ALL}, + }, + r["data"]["updatePipelineRecipient"], + ) + + def test_update_pipeline_recipient_not_found(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation updatePipelineRecipient($input: UpdatePipelineRecipientInput!) { + updatePipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "recipientId": str(uuid.uuid4()), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + + self.assertEqual( + { + "success": False, + "errors": ["RECIPIENT_NOT_FOUND"], + }, + r["data"]["updatePipelineRecipient"], + ) + + def test_update_pipeline_recipient_permission_denied(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_SABRINA) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + recipient = PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation updatePipelineRecipient($input: UpdatePipelineRecipientInput!) { + updatePipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "recipientId": str(recipient.id), + "notificationLevel": PipelineNotificationLevel.ALL, + } + }, + ) + self.assertEqual( + { + "success": False, + "errors": ["PERMISSION_DENIED"], + }, + r["data"]["updatePipelineRecipient"], + ) + + def test_delete_pipeline_recipient_not_found(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation deletePipelineRecipient($input: DeletePipelineRecipientInput!) { + deletePipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "recipientId": str(uuid.uuid4()), + } + }, + ) + + self.assertEqual( + { + "success": False, + "errors": ["RECIPIENT_NOT_FOUND"], + }, + r["data"]["deletePipelineRecipient"], + ) + + def test_delete_pipeline_recipient_permission_denied(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_SABRINA) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + recipient = PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation deletePipelineRecipient($input: DeletePipelineRecipientInput!) { + deletePipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "recipientId": str(recipient.id), + } + }, + ) + + self.assertEqual( + { + "success": False, + "errors": ["PERMISSION_DENIED"], + }, + r["data"]["deletePipelineRecipient"], + ) + + def test_delete_pipeline_recipient(self): + self.test_create_pipeline_version() + self.client.force_login(self.USER_ROOT) + pipeline = Pipeline.objects.filter_for_user(user=self.USER_SABRINA).first() + + recipient = PipelineRecipient.objects.create( + pipeline=pipeline, + user=self.USER_SABRINA, + notification_level=PipelineNotificationLevel.ERROR, + ) + + r = self.run_query( + """ + mutation deletePipelineRecipient($input: DeletePipelineRecipientInput!) { + deletePipelineRecipient(input: $input) { + success + errors + } + } + """, + { + "input": { + "recipientId": str(recipient.id), + } + }, + ) + + self.assertEqual( + { + "success": True, + "errors": [], + }, + r["data"]["deletePipelineRecipient"], + ) diff --git a/hexa/pipelines/utils.py b/hexa/pipelines/utils.py index e56fdb8af..0b1739010 100644 --- a/hexa/pipelines/utils.py +++ b/hexa/pipelines/utils.py @@ -8,19 +8,23 @@ from hexa.core.utils import send_mail -from .models import PipelineRun, PipelineRunTrigger +from .models import ( + PipelineNotificationLevel, + PipelineRun, + PipelineRunState, +) def mail_run_recipients(run: PipelineRun): - recipient_list = [] - if run.trigger_mode == PipelineRunTrigger.MANUAL and run.user is not None: - recipient_list = [run.user] - else: - recipient_list = run.pipeline.recipients.all() - workspace_slug = run.pipeline.workspace.slug - for recipient in recipient_list: - with override(recipient.language): + for recipient in run.pipeline.pipelinerecipient_set.all(): + if ( + run.state == PipelineRunState.SUCCESS + and recipient.notification_level == PipelineNotificationLevel.ERROR + ): + continue + + with override(recipient.user.language): send_mail( title=gettext_lazy("Run report of {code} ({state})").format( code=run.pipeline.code, state=run.state.label @@ -37,7 +41,7 @@ def mail_run_recipients(run: PipelineRun): ), "run_url": f"{settings.NEW_FRONTEND_DOMAIN}/workspaces/{workspace_slug}/pipelines/{run.pipeline.code}/runs/{run.id}", }, - recipient_list=[recipient.email], + recipient_list=[recipient.user.email], )