Skip to content

Commit

Permalink
feat(proposal): subscriber interface/basic impl
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Phelps <[email protected]>
  • Loading branch information
markphelps committed Dec 9, 2024
1 parent cdc7e07 commit 871e256
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 0 deletions.
61 changes: 61 additions & 0 deletions pkg/phases/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"log/slog"
"sync"
"time"

"github.com/get-glu/glu/pkg/core"
Expand All @@ -32,6 +33,8 @@ type PhaseLogger[R core.Resource] struct {
encoder func(any) ([]byte, error)
decoder func([]byte, any) error
last map[string]map[string]version

subscribers sync.Map // map[uuid.UUID]Subscriber
}

func New[R core.Resource](db kv.DB) *PhaseLogger[R] {
Expand All @@ -48,6 +51,29 @@ type version struct {
Annotations map[string]string
}

func (l *PhaseLogger[R]) Subscribe(ctx context.Context, subscriber Subscriber) (*Subscription, error) {
id, err := uuid.NewV7()
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(ctx)

l.subscribers.Store(id, subscriber)

go func() {
<-ctx.Done()
l.Unsubscribe(context.Background(), id)
}()

return &Subscription{ID: id, Cancel: cancel}, nil
}

func (l *PhaseLogger[R]) Unsubscribe(ctx context.Context, id uuid.UUID) error {
l.subscribers.Delete(id)
return nil
}

func (l *PhaseLogger[R]) CreateLog(ctx context.Context, phase core.Descriptor) error {
return l.db.Update(func(tx kv.Tx) error {
if _, err := createBucketPath(tx, versionBucket, refBucket, phase.Pipeline, phase.Metadata.Name); err != nil {
Expand Down Expand Up @@ -123,10 +149,45 @@ func (l *PhaseLogger[R]) RecordLatest(ctx context.Context, phase core.Descriptor
return err
}

var (
timestamp = time.Unix(id.Time().UnixTime())
event = Event{
Phase: phase,
State: core.State{
Version: id,
Digest: string(digest),
Resource: resource,
Annotations: annotations,
RecordedAt: timestamp.UTC(),
},
}
)

// TODO: determine if we should notify subscribers before or after the write since the write could fail
l.notifySubscribers(ctx, event)

return refs.Put(idBytes, encoded)
})
}

func (l *PhaseLogger[R]) notifySubscribers(ctx context.Context, event Event) {
l.subscribers.Range(func(key, value any) bool {
subscriber := value.(Subscriber)
// notify subscriber in a new goroutine to avoid blocking the main thread
go func() {
select {
case <-ctx.Done():
return
default:
if err := subscriber.OnEvent(ctx, event); err != nil {
slog.Error("subscriber error", "error", err, "subscriber", key.(uuid.UUID), "event", event)
}
}
}()
return true
})
}

func (l *PhaseLogger[R]) isUpToDate(refs kv.Bucket, phase core.Descriptor, digest string) bool {
slog := slog.With("pipeline", phase.Pipeline, "phase", phase.Metadata.Name)

Expand Down
36 changes: 36 additions & 0 deletions pkg/phases/logger/subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package logger

import (
"context"

"github.com/get-glu/glu/pkg/core"
"github.com/google/uuid"
)

type Event struct {
Phase core.Descriptor
State core.State
// TODO: event type
}

// Subscriber is a type that can receive events from a phase logger.
type Subscriber interface {
// OnEvent is called when a new event is received.
OnEvent(ctx context.Context, event Event) error
}

// Subscription represents an active subscription to a phase logger.
type Subscription struct {
ID uuid.UUID
Cancel context.CancelFunc
}

// Subscribable extends PhaseLogger with subscription functionality.
type Subscribable[R core.Resource] interface {
PhaseLogger[R]

// Subscribe returns a new subscription to the phase logger.
Subscribe(ctx context.Context, subscriber Subscriber) (*Subscription, error)
// Unsubscribe cancels a subscription.
Unsubscribe(ctx context.Context, id uuid.UUID) error
}

0 comments on commit 871e256

Please sign in to comment.