From ad317659bc2c4737529c4587c11b351a81e73fa4 Mon Sep 17 00:00:00 2001 From: Tyler Auerbeck Date: Fri, 21 Jul 2023 09:23:36 -0400 Subject: [PATCH] Add helper functions for publishing AuthRelationshipRequests and AuthRelationshipResponses Signed-off-by: Tyler Auerbeck --- events/publisher.go | 94 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/events/publisher.go b/events/publisher.go index 5d92e2e..f5fe8c5 100644 --- a/events/publisher.go +++ b/events/publisher.go @@ -41,6 +41,9 @@ var ErrUnsupportedPubsub = errors.New("unsupported pubsub provider") // ErrMissingEventType is returned when attempting to publish an event without an event type specified var ErrMissingEventType = errors.New("event type missing") +// InvalidAuthRelationshipAction is returned when attempting to publish an AuthRelationshipAction that isn't write or delete +var ErrInvalidAuthRelationshipAction = errors.New("invalid auth relationship action") + // Publisher provides a pubsub publisher that uses the watermill pubsub package type Publisher struct { prefix string @@ -81,6 +84,97 @@ func NewPublisher(cfg PublisherConfig) (*Publisher, error) { return NewPublisherWithLogger(cfg, zap.NewNop().Sugar()) } +func (p *Publisher) PublishAuthRelationshipRequest(ctx context.Context, msg AuthRelationshipRequest) error { + ctx, span := p.tracer.Start( + ctx, + "events.publishAuthRelationshipRequest", + trace.WithAttributes( + attribute.String( + "events.action", + string(msg.Action), + ), + attribute.String( + "events.subject_id", + msg.SubjectID.String(), + ), + attribute.String( + "events.object_id", + msg.ObjectID.String(), + ), + attribute.String( + "events.relationship_name", + msg.RelationshipName, + ), + ), + ) + defer span.End() + + var mapCarrier propagation.MapCarrier = make(map[string]string) + + otel.GetTextMapPropagator().Inject(ctx, mapCarrier) + + msg.TraceContext = mapCarrier + + if strings.ToLower(string(msg.Action)) != string(WriteAuthRelationshipAction) || strings.ToLower(string(msg.Action)) != string(DeleteAuthRelationshipAction) { + span.RecordError(ErrInvalidAuthRelationshipAction) + span.SetStatus(codes.Error, ErrInvalidAuthRelationshipAction.Error()) + + return ErrInvalidAuthRelationshipAction + } + + topic := strings.Join([]string{p.prefix, "permissions.relationship", string(msg.Action)}, ".") + + span.SetAttributes( + attribute.String( + "events.topic", + topic, + ), + ) + + v, err := json.Marshal(msg) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + m := message.NewMessage(watermill.NewUUID(), v) + + span.SetAttributes( + attribute.String( + "events.message_id", + m.UUID, + ), + ) + + if err := p.publisher.Publish(topic, m); err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + + return err + } + + return nil +} + +func (p *Publisher) PublishAuthRelationshipResponse(ctx context.Context, msg AuthRelationshipResponse) AuthRelationshipResponse { + ctx, span := p.tracer.Start( + ctx, + "events.publishAuthRelationshipResponse", + ) + defer span.End() + + // Propagate trace context into the message for the subscriber + var mapCarrier propagation.MapCarrier = make(map[string]string) + + otel.GetTextMapPropagator().Inject(ctx, mapCarrier) + + msg.TraceContext = mapCarrier + + return msg +} + // PublishChange will publish a ChangeMessage to the topic for the change func (p *Publisher) PublishChange(ctx context.Context, subjectType string, change ChangeMessage) error { ctx, span := p.tracer.Start(