Skip to content

Commit

Permalink
feat: ftltest supports pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 12, 2024
1 parent 787aa4f commit fde383f
Show file tree
Hide file tree
Showing 18 changed files with 1,033 additions and 20 deletions.
14 changes: 14 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,17 @@ func TestRetry(t *testing.T) {
1+retriesPerCall),
)
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

in.ExpectError(
in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
"can not publish to another module's topic",
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"ftl/publisher"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)
Expand All @@ -25,3 +26,10 @@ var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription")
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
var topic = publisher.Test_topic
return topic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}
307 changes: 300 additions & 7 deletions go-runtime/ftl/ftltest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,97 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"strings"
"sync"
"time"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/go-runtime/ftl"
"github.com/TBD54566975/ftl/go-runtime/internal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/slices"
"github.com/alecthomas/types/optional"
"github.com/alecthomas/types/pubsub"
)

// pubSubEvent is a sum type for all events that can be published to the pubsub system.
// not to be confused with an event that gets published to a topic
//
//sumtype:decl
type pubSubEvent interface {
// cronJobEvent is a marker to ensure that all events implement the interface.
pubSubEvent()
}

// publishEvent holds an event to be published to a topic
type publishEvent struct {
topic *schema.Ref
content any
}

func (publishEvent) pubSubEvent() {}

// subscriptionDidConsumeEvent indicates that a call to a subscriber has completed
type subscriptionDidConsumeEvent struct {
subscription string
err error
}

func (subscriptionDidConsumeEvent) pubSubEvent() {}

type subscription struct {
name string
topic *schema.Ref
cursor optional.Option[int]
isExecuting bool
errors map[int]error
}

type subscriber func(context.Context, any) error

type fakeFTL struct {
fsm *fakeFSMManager
fsm *fakeFSMManager

mockMaps map[uintptr]mapImpl
allowMapCalls bool
configValues map[string][]byte
secretValues map[string][]byte

// all pubsub events are processed through globalTopic
globalTopic *pubsub.Topic[pubSubEvent]
// publishWaitGroup can be used to wait for all events to be published
publishWaitGroup sync.WaitGroup

// pubSubLock required to access [topics, subscriptions, subscribers]
pubSubLock sync.Mutex
topics map[schema.RefKey][]any
subscriptions map[string]*subscription
subscribers map[string][]subscriber
}

// mapImpl is a function that takes an object and returns an object of a potentially different
// type but is not constrained by input/output type like ftl.Map.
type mapImpl func(context.Context) (any, error)

func newFakeFTL() *fakeFTL {
return &fakeFTL{
func newFakeFTL(ctx context.Context) *fakeFTL {
fake := &fakeFTL{
fsm: newFakeFSMManager(),
mockMaps: map[uintptr]mapImpl{},
allowMapCalls: false,
configValues: map[string][]byte{},
secretValues: map[string][]byte{},
globalTopic: pubsub.New[pubSubEvent](),
topics: map[schema.RefKey][]any{},
subscriptions: map[string]*subscription{},
subscribers: map[string][]subscriber{},
}

fake.watchPubSub(ctx)

return fake
}

var _ internal.FTL = &fakeFTL{}
Expand Down Expand Up @@ -73,10 +137,6 @@ func (f *fakeFTL) FSMSend(ctx context.Context, fsm string, instance string, even
return f.fsm.SendEvent(ctx, fsm, instance, event)
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic string, event any) error {
panic("not implemented")
}

// addMapMock saves a new mock of ftl.Map to the internal map in fakeFTL.
//
// mockMap provides the whole mock implemention, so it gets called in place of both `fn`
Expand Down Expand Up @@ -121,3 +181,236 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any {
}
return out
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
f.publishWaitGroup.Add(1)
f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
return nil
}

// addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added
func addSubscriber[E any](f *fakeFTL, sub ftl.SubscriptionHandle[E], sink ftl.Sink[E]) {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

if _, ok := f.subscriptions[sub.Name]; !ok {
f.subscriptions[sub.Name] = &subscription{
name: sub.Name,
topic: sub.Topic,
errors: map[int]error{},
}
}

subscribers, ok := f.subscribers[sub.Name]
if !ok {
subscribers = []subscriber{}
}
f.subscribers[sub.Name] = append(subscribers, func(ctx context.Context, event any) error {
if event, ok := event.(E); ok {
return sink(ctx, event)
}
return fmt.Errorf("unexpected event type %T for subscription %s", event, sub.Name)
})
}

// eventsForTopic returns all events published to a topic
func eventsForTopic[E any](ctx context.Context, f *fakeFTL, topic ftl.TopicHandle[E]) []E {
// Make sure all published events make it into our pubsub state
f.publishWaitGroup.Wait()

f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

logger := log.FromContext(ctx).Scope("pubsub")
var events = []E{}
raw, ok := f.topics[topic.Ref.ToRefKey()]
if !ok {
return events
}
for _, e := range raw {
if event, ok := e.(E); ok {
events = append(events, event)
} else {
logger.Warnf("unexpected event type %T for topic %s", e, topic.Ref)
}
}
return events
}

// resultsForSubscription returns all consumed events and whether an error was returned
func resultsForSubscription[E any](ctx context.Context, f *fakeFTL, handle ftl.SubscriptionHandle[E]) []SubscriptionResult[E] {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

logger := log.FromContext(ctx).Scope("pubsub")
results := []SubscriptionResult[E]{}

subscription, ok := f.subscriptions[handle.Name]
if !ok {
return results
}
topic, ok := f.topics[handle.Topic.ToRefKey()]
if !ok {
return results
}

count := subscription.cursor.Default(-1)
if !subscription.isExecuting {
count++
}
for i := 0; i < count; i++ {
e := topic[i]
if event, ok := e.(E); ok {
results = append(results, SubscriptionResult[E]{
Event: event,
Error: subscription.errors[i],
})
} else {
logger.Warnf("unexpected event type %T for subscription %s", e, handle.Name)
}

}
return results
}

func (f *fakeFTL) watchPubSub(ctx context.Context) {
events := make(chan pubSubEvent, 128)
f.globalTopic.Subscribe(events)
go func() {
defer f.globalTopic.Unsubscribe(events)
for {
select {
case e := <-events:
f.handlePubSubEvent(ctx, e)
case <-ctx.Done():
return
}
}
}()
}

func (f *fakeFTL) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

logger := log.FromContext(ctx).Scope("pubsub")

switch event := e.(type) {
case publishEvent:
logger.Debugf("publishing to %s: %v", event.topic.Name, event.content)
if _, ok := f.topics[event.topic.ToRefKey()]; !ok {
f.topics[event.topic.ToRefKey()] = []any{event.content}
} else {
f.topics[event.topic.ToRefKey()] = append(f.topics[event.topic.ToRefKey()], event.content)
}
f.publishWaitGroup.Done()
case subscriptionDidConsumeEvent:
sub, ok := f.subscriptions[event.subscription]
if !ok {
panic(fmt.Sprintf("subscription %q not found", event.subscription))
}
if event.err != nil {
sub.errors[sub.cursor.MustGet()] = event.err
}
sub.isExecuting = false
}

for _, sub := range f.subscriptions {
if sub.isExecuting {
// already executing
continue
}
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
continue
}
var cursor = sub.cursor.Default(-1)
if len(topicEvents) <= cursor+1 {
// no new events
continue
}
subscribers, ok := f.subscribers[sub.name]
if !ok || len(subscribers) == 0 {
// no subscribers
continue
}
chosenSubscriber := subscribers[rand.Intn(len(subscribers))] //nolint:gosec

sub.cursor = optional.Some(cursor + 1)
sub.isExecuting = true

go func(sub string, chosenSubscriber subscriber, event any) {
err := chosenSubscriber(ctx, event)
f.globalTopic.Publish(subscriptionDidConsumeEvent{subscription: sub, err: err})
}(sub.name, chosenSubscriber, topicEvents[sub.cursor.MustGet()])
}
}

// waitForSubscriptionsToComplete waits for all subscriptions to consume all events.
// subscriptions with no subscribers are ignored.
// logs what which subscriptions are blocking every 2s.
func (f *fakeFTL) waitForSubscriptionsToComplete(ctx context.Context) {
logger := log.FromContext(ctx).Scope("pubsub")
startTime := time.Now()
nextLogTime := startTime.Add(2 * time.Second)

// Make sure all published events make it into our pubsub state
f.publishWaitGroup.Wait()

for {
if func() bool {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

type remainingState struct {
name string
isExecuting bool
pendingEvents int
}
remaining := []remainingState{}
for _, sub := range f.subscriptions {
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
continue
}
var cursor = sub.cursor.Default(-1)
if !sub.isExecuting && len(topicEvents) <= cursor+1 {
// all events have been consumed
continue
}
subscribers, ok := f.subscribers[sub.name]
if !ok || len(subscribers) == 0 {
// no subscribers
continue
}
remaining = append(remaining, remainingState{
name: sub.name,
isExecuting: sub.isExecuting,
pendingEvents: len(topicEvents) - cursor - 1,
})
}
if len(remaining) == 0 {
// not waiting on any more subscriptions
return true
}
if time.Now().After(nextLogTime) {
// print out what we are waiting on
nextLogTime = time.Now().Add(2 * time.Second)
logger.Infof("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string {
var suffix string
if r.isExecuting {
suffix = ", 1 executing"
}
return fmt.Sprintf(" %s: %d events pending%s", r.name, r.pendingEvents, suffix)
}), "\n"))
}
return false
}() {
// reached idle state
return
}
time.Sleep(200 * time.Millisecond)
}
}
Loading

0 comments on commit fde383f

Please sign in to comment.