From 871e2563126e29ce9ec7207c01a74ca8738572c5 Mon Sep 17 00:00:00 2001 From: Mark Phelps <209477+markphelps@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:24:48 -0500 Subject: [PATCH] feat(proposal): subscriber interface/basic impl Signed-off-by: Mark Phelps <209477+markphelps@users.noreply.github.com> --- pkg/phases/logger/logger.go | 61 +++++++++++++++++++++++++++++++++ pkg/phases/logger/subscriber.go | 36 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 pkg/phases/logger/subscriber.go diff --git a/pkg/phases/logger/logger.go b/pkg/phases/logger/logger.go index 91dcece..6469081 100644 --- a/pkg/phases/logger/logger.go +++ b/pkg/phases/logger/logger.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "log/slog" + "sync" "time" "github.com/get-glu/glu/pkg/core" @@ -32,6 +33,8 @@ type PhaseLogger[R core.Resource] struct { encoder func(any) ([]byte, error) decoder func([]byte, any) error last map[string]map[string]version + + subscribers sync.Map // map[uuid.UUID]Subscriber } func New[R core.Resource](db kv.DB) *PhaseLogger[R] { @@ -48,6 +51,29 @@ type version struct { Annotations map[string]string } +func (l *PhaseLogger[R]) Subscribe(ctx context.Context, subscriber Subscriber) (*Subscription, error) { + id, err := uuid.NewV7() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + l.subscribers.Store(id, subscriber) + + go func() { + <-ctx.Done() + l.Unsubscribe(context.Background(), id) + }() + + return &Subscription{ID: id, Cancel: cancel}, nil +} + +func (l *PhaseLogger[R]) Unsubscribe(ctx context.Context, id uuid.UUID) error { + l.subscribers.Delete(id) + return nil +} + func (l *PhaseLogger[R]) CreateLog(ctx context.Context, phase core.Descriptor) error { return l.db.Update(func(tx kv.Tx) error { if _, err := createBucketPath(tx, versionBucket, refBucket, phase.Pipeline, phase.Metadata.Name); err != nil { @@ -123,10 +149,45 @@ func (l *PhaseLogger[R]) RecordLatest(ctx context.Context, phase core.Descriptor return err } + var ( + timestamp = time.Unix(id.Time().UnixTime()) + event = Event{ + Phase: phase, + State: core.State{ + Version: id, + Digest: string(digest), + Resource: resource, + Annotations: annotations, + RecordedAt: timestamp.UTC(), + }, + } + ) + + // TODO: determine if we should notify subscribers before or after the write since the write could fail + l.notifySubscribers(ctx, event) + return refs.Put(idBytes, encoded) }) } +func (l *PhaseLogger[R]) notifySubscribers(ctx context.Context, event Event) { + l.subscribers.Range(func(key, value any) bool { + subscriber := value.(Subscriber) + // notify subscriber in a new goroutine to avoid blocking the main thread + go func() { + select { + case <-ctx.Done(): + return + default: + if err := subscriber.OnEvent(ctx, event); err != nil { + slog.Error("subscriber error", "error", err, "subscriber", key.(uuid.UUID), "event", event) + } + } + }() + return true + }) +} + func (l *PhaseLogger[R]) isUpToDate(refs kv.Bucket, phase core.Descriptor, digest string) bool { slog := slog.With("pipeline", phase.Pipeline, "phase", phase.Metadata.Name) diff --git a/pkg/phases/logger/subscriber.go b/pkg/phases/logger/subscriber.go new file mode 100644 index 0000000..7ba15f4 --- /dev/null +++ b/pkg/phases/logger/subscriber.go @@ -0,0 +1,36 @@ +package logger + +import ( + "context" + + "github.com/get-glu/glu/pkg/core" + "github.com/google/uuid" +) + +type Event struct { + Phase core.Descriptor + State core.State + // TODO: event type +} + +// Subscriber is a type that can receive events from a phase logger. +type Subscriber interface { + // OnEvent is called when a new event is received. + OnEvent(ctx context.Context, event Event) error +} + +// Subscription represents an active subscription to a phase logger. +type Subscription struct { + ID uuid.UUID + Cancel context.CancelFunc +} + +// Subscribable extends PhaseLogger with subscription functionality. +type Subscribable[R core.Resource] interface { + PhaseLogger[R] + + // Subscribe returns a new subscription to the phase logger. + Subscribe(ctx context.Context, subscriber Subscriber) (*Subscription, error) + // Unsubscribe cancels a subscription. + Unsubscribe(ctx context.Context, id uuid.UUID) error +}