Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Pipelines): Pipeline notifications v2 #846

Merged
merged 7 commits into from
Nov 15, 2024
79 changes: 78 additions & 1 deletion hexa/pipelines/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,23 @@ type PipelineVersionPage {
totalItems: Int! # The total number of pipeline versions.
}

"""
Represents the notification level for a pipeline recipient.
"""
enum PipelineNotificationLevel {
ALL
ERROR
}


"""
Represents a recipient of a pipeline.
"""
type PipelineRecipient {
id: UUID! # The ID of the recipient.
user: User! # The user who is a recipient of the pipeline.
pipeline: Pipeline! # The pipeline associated with the recipient.
notificationLevel: PipelineNotificationLevel! # The event that triggers the notification.
}

"""
Expand Down Expand Up @@ -278,6 +289,38 @@ type CreatePipelineResult {
}



enum PipelineRecipientError {
PIPELINE_NOT_FOUND
USER_NOT_FOUND
RECIPIENT_NOT_FOUND
PERMISSION_DENIED
ALREADY_EXISTS
}

"""
Represents the input for adding a recipient to a pipeline.
"""
input CreatePipelineRecipientInput {
pipelineId: UUID!
userId: UUID!
notificationLevel: PipelineNotificationLevel!
}

"""
Represents the input for updating a recipient.
"""
input UpdatePipelineRecipientInput {
recipientId: UUID!
notificationLevel: PipelineNotificationLevel!
}
"""
Represents the input for deleting a pipeline recipient.
"""
input DeletePipelineRecipientInput {
recipientId: UUID!
}

"""
Represents the input for updating a pipeline.
"""
Expand All @@ -287,7 +330,6 @@ input UpdatePipelineInput {
config: JSON # The new configuration for the pipeline.
schedule: String # The new schedule for running the pipeline.
description: String # The new description of the pipeline.
recipientIds: [UUID!] # The IDs of the recipients of the pipeline.
webhookEnabled: Boolean # Indicates if the webhook should be enabled for the pipeline.
}

Expand Down Expand Up @@ -392,6 +434,23 @@ input UploadPipelineInput {
timeout: Int # The timeout value for the pipeline.
}

type AddPipelineRecipientResult {
success: Boolean!
errors: [PipelineRecipientError!]!
recipient: PipelineRecipient
}

type UpdatePipelineRecipientResult {
success: Boolean!
errors: [PipelineRecipientError!]!
recipient: PipelineRecipient
}

type DeletePipelineRecipientResult {
success: Boolean!
errors: [PipelineRecipientError!]!
}

"""
Represents the result of uploading a pipeline.
"""
Expand Down Expand Up @@ -602,5 +661,23 @@ extend type Mutation {
Deletes a pipeline version.
"""
deletePipelineVersion(input: DeletePipelineVersionInput!): DeletePipelineVersionResult!

"""
Generates a webhook URL for a pipeline.
"""
generatePipelineWebhookUrl(input: GeneratePipelineWebhookUrlInput!): GeneratePipelineWebhookUrlResult!

"""
Adds a recipient to a pipeline.
"""
addPipelineRecipient(input: CreatePipelineRecipientInput!): AddPipelineRecipientResult!
"""
Updates a pipeline recipient.
"""
updatePipelineRecipient(input: UpdatePipelineRecipientInput!): UpdatePipelineRecipientResult!

"""
Deletes a pipeline recipient.
"""
deletePipelineRecipient(input: DeletePipelineRecipientInput!): DeletePipelineRecipientResult!
}
17 changes: 10 additions & 7 deletions hexa/pipelines/management/commands/pipelines_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,22 @@ def handle(self, *args, **options):
user=None,
pipeline_version=pipeline.last_version,
trigger_mode=PipelineRunTrigger.SCHEDULED,
send_mail_notifications=pipeline.recipients.count() > 0,
)
track(
request=None,
event="pipelines.pipeline_run",
properties={
"pipeline_id": pipeline.code,
"version_name": pipeline.last_version.name
if pipeline.last_version
else None,
"version_id": str(pipeline.last_version.id)
if pipeline.last_version
else None,
"version_name": (
pipeline.last_version.name
if pipeline.last_version
else None
),
"version_id": (
str(pipeline.last_version.id)
if pipeline.last_version
else None
),
"trigger": PipelineRunTrigger.SCHEDULED,
"workspace": pipeline.workspace.slug,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Generated by Django 5.0.9 on 2024-11-12 08:55

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("pipelines", "0045_alter_pipelinerun_duration_and_more"),
]

operations = [
migrations.AddField(
model_name="pipelinerecipient",
name="notification_level",
field=models.CharField(
choices=[("ALL", "All"), ("ERROR", "Error")],
default="ALL",
max_length=200,
),
),
migrations.AlterField(
model_name="pipelinerun",
name="send_mail_notifications",
field=models.BooleanField(default=True),
),
]
54 changes: 42 additions & 12 deletions hexa/pipelines/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
SoftDeleteQuerySet,
)
from hexa.user_management.models import User
from hexa.workspaces.models import Workspace, WorkspaceMembership
from hexa.workspaces.models import Workspace


class PipelineDoesNotSupportParametersError(Exception):
Expand Down Expand Up @@ -243,7 +243,7 @@ def run(
pipeline_version: PipelineVersion,
trigger_mode: PipelineRunTrigger,
config: typing.Mapping[typing.Dict, typing.Any] | None = None,
send_mail_notifications: bool = False,
send_mail_notifications: bool = True,
):
timeout = settings.PIPELINE_RUN_DEFAULT_TIMEOUT
if pipeline_version and pipeline_version.timeout:
Expand Down Expand Up @@ -338,15 +338,6 @@ def update_if_has_perm(self, principal: User, **kwargs):
if "webhook_enabled" in kwargs:
self.set_webhook_state(kwargs["webhook_enabled"])

if "recipient_ids" in kwargs:
PipelineRecipient.objects.filter(
Q(pipeline=self) & ~Q(user_id__in=kwargs["recipient_ids"])
).delete()
for member in WorkspaceMembership.objects.filter(
workspace=self.workspace, user_id__in=kwargs["recipient_ids"]
):
PipelineRecipient.objects.get_or_create(user=member.user, pipeline=self)

return self.save()

def delete_if_has_perm(self, *, principal: User):
Expand Down Expand Up @@ -412,6 +403,25 @@ def __str__(self):
return self.code


class PipelineNotificationLevel(models.TextChoices):
ALL = "ALL", _("All")
ERROR = "ERROR", _("Error")


class PipelineRecipientManager(models.Manager):
def create_if_has_perm(
self,
principal: User,
pipeline: Pipeline,
user: User,
level: PipelineNotificationLevel,
):
if not principal.has_perm("pipelines.update_pipeline", pipeline):
raise PermissionDenied

return self.create(pipeline=pipeline, user=user, notification_level=level)


class PipelineRecipient(Base):
class Meta:
ordering = ("-updated_at",)
Expand All @@ -427,6 +437,26 @@ class Meta:
"user_management.User", null=False, on_delete=models.CASCADE
)
pipeline = models.ForeignKey(Pipeline, null=False, on_delete=models.CASCADE)
notification_level = models.CharField(
max_length=200,
blank=False,
default=PipelineNotificationLevel.ALL,
choices=PipelineNotificationLevel.choices,
)
objects = PipelineRecipientManager()

def update_if_has_perm(self, *, principal: User, level: PipelineNotificationLevel):
if not principal.has_perm("pipelines.update_pipeline", self.pipeline):
raise PermissionDenied

self.notification_level = level
return self.save()

def delete_if_has_perm(self, *, principal: User):
if not principal.has_perm("pipelines.update_pipeline", self.pipeline):
raise PermissionDenied

return self.delete()


class PipelineRunQuerySet(BaseQuerySet):
Expand Down Expand Up @@ -479,8 +509,8 @@ class Meta:
outputs = models.JSONField(null=True, blank=True, default=list)
run_logs = models.TextField(null=True, blank=True)
current_progress = models.PositiveSmallIntegerField(default=0)
send_mail_notifications = models.BooleanField(default=False)
timeout = models.IntegerField(null=True)
send_mail_notifications = models.BooleanField(default=True)
stopped_by = models.ForeignKey(
"user_management.User",
null=True,
Expand Down
100 changes: 100 additions & 0 deletions hexa/pipelines/schema/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
MissingPipelineConfiguration,
Pipeline,
PipelineDoesNotSupportParametersError,
PipelineRecipient,
PipelineRun,
PipelineRunState,
PipelineRunTrigger,
PipelineType,
PipelineVersion,
)
from hexa.user_management.models import User
from hexa.workspaces.models import Workspace

pipelines_mutations = MutationType()
Expand Down Expand Up @@ -378,6 +380,104 @@ def resolve_delete_pipeline_version(_, info, **kwargs):
}


@pipelines_mutations.field("addPipelineRecipient")
def resolve_add_pipeline_recipient(_, info, **kwargs):
request: HttpRequest = info.context["request"]
input = kwargs["input"]
try:
pipeline = Pipeline.objects.filter_for_user(request.user).get(
id=input.get("pipelineId")
)
user = User.objects.get(id=input["userId"])

recipient = PipelineRecipient.objects.create_if_has_perm(
principal=request.user,
pipeline=pipeline,
user=user,
level=input["notificationLevel"],
)
return {
"success": True,
"errors": [],
"recipient": recipient,
}
except Pipeline.DoesNotExist:
return {
"success": False,
"errors": ["PIPELINE_NOT_FOUND"],
}
except User.DoesNotExist:
return {
"success": False,
"errors": ["USER_NOT_FOUND"],
}
except PermissionDenied:
return {
"success": False,
"errors": ["PERMISSION_DENIED"],
}
except IntegrityError:
return {
"success": False,
"errors": ["ALREADY_EXISTS"],
}


@pipelines_mutations.field("updatePipelineRecipient")
def resolve_update_pipeline_recipient(_, info, **kwargs):
request: HttpRequest = info.context["request"]
input = kwargs["input"]

try:
recipient = PipelineRecipient.objects.get(
id=input["recipientId"],
)
recipient.update_if_has_perm(
principal=request.user, level=input["notificationLevel"]
)
return {
"success": True,
"errors": [],
"recipient": recipient,
}
except PipelineRecipient.DoesNotExist:
return {
"success": False,
"errors": ["RECIPIENT_NOT_FOUND"],
}
except PermissionDenied:
return {
"success": False,
"errors": ["PERMISSION_DENIED"],
}


@pipelines_mutations.field("deletePipelineRecipient")
def resolve_delete_pipeline_recipient(_, info, **kwargs):
request: HttpRequest = info.context["request"]
input = kwargs["input"]

try:
recipient = PipelineRecipient.objects.get(
id=input["recipientId"],
)
recipient.delete_if_has_perm(principal=request.user)
return {
"success": True,
"errors": [],
}
except PipelineRecipient.DoesNotExist:
return {
"success": False,
"errors": ["RECIPIENT_NOT_FOUND"],
}
except PermissionDenied:
return {
"success": False,
"errors": ["PERMISSION_DENIED"],
}


@pipelines_mutations.field("logPipelineMessage")
def resolve_pipeline_log_message(_, info, **kwargs):
request: HttpRequest = info.context["request"]
Expand Down
Loading
Loading