Skip to content

Commit

Permalink
feat: ftltest supports pubsub (#1736)
Browse files Browse the repository at this point in the history
closes #1598

Adds:
- By default, disables all subscribers during unit tests
- Subscribers can be manually added per unit test
- Subscribers can be actual sinks, or ad-hoc ones defined in the test
(just define a `func` in the argument)
- Check what events were published to a topic
- Check what events were consumed by a subscription, and whether an
error occured
- Allow the unit test to wait for all events to be consumed
- Unit tests can publish to external module's topics
- And conversely, added a runtime check to disallow publishing to
external modules when not in a unit test. fixes #1703
  • Loading branch information
matt2e authored Jun 12, 2024
1 parent 787aa4f commit ff66550
Show file tree
Hide file tree
Showing 19 changed files with 1,058 additions and 20 deletions.
14 changes: 14 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,17 @@ func TestRetry(t *testing.T) {
1+retriesPerCall),
)
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

in.ExpectError(
in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
"can not publish to another module's topic",
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"ftl/publisher"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)
Expand All @@ -25,3 +26,10 @@ var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription")
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
var topic = publisher.Test_topic
return topic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}
56 changes: 49 additions & 7 deletions go-runtime/ftl/ftltest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,72 @@ import (
"reflect"
"strings"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/go-runtime/internal"
"github.com/alecthomas/types/optional"
)

// pubSubEvent is a sum type for all events that can be published to the pubsub system.
// not to be confused with an event that gets published to a topic
//
//sumtype:decl
type pubSubEvent interface {
// cronJobEvent is a marker to ensure that all events implement the interface.
pubSubEvent()
}

// publishEvent holds an event to be published to a topic
type publishEvent struct {
topic *schema.Ref
content any
}

func (publishEvent) pubSubEvent() {}

// subscriptionDidConsumeEvent indicates that a call to a subscriber has completed
type subscriptionDidConsumeEvent struct {
subscription string
err error
}

func (subscriptionDidConsumeEvent) pubSubEvent() {}

type subscription struct {
name string
topic *schema.Ref
cursor optional.Option[int]
isExecuting bool
errors map[int]error
}

type subscriber func(context.Context, any) error

type fakeFTL struct {
fsm *fakeFSMManager
fsm *fakeFSMManager

mockMaps map[uintptr]mapImpl
allowMapCalls bool
configValues map[string][]byte
secretValues map[string][]byte
pubSub *fakePubSub
}

// mapImpl is a function that takes an object and returns an object of a potentially different
// type but is not constrained by input/output type like ftl.Map.
type mapImpl func(context.Context) (any, error)

func newFakeFTL() *fakeFTL {
return &fakeFTL{
func newFakeFTL(ctx context.Context) *fakeFTL {
fake := &fakeFTL{
fsm: newFakeFSMManager(),
mockMaps: map[uintptr]mapImpl{},
allowMapCalls: false,
configValues: map[string][]byte{},
secretValues: map[string][]byte{},
pubSub: newFakePubSub(ctx),
}

return fake
}

var _ internal.FTL = &fakeFTL{}
Expand Down Expand Up @@ -73,10 +115,6 @@ func (f *fakeFTL) FSMSend(ctx context.Context, fsm string, instance string, even
return f.fsm.SendEvent(ctx, fsm, instance, event)
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic string, event any) error {
panic("not implemented")
}

// addMapMock saves a new mock of ftl.Map to the internal map in fakeFTL.
//
// mockMap provides the whole mock implemention, so it gets called in place of both `fn`
Expand Down Expand Up @@ -121,3 +159,7 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any {
}
return out
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
return f.pubSub.publishEvent(topic, event)
}
60 changes: 59 additions & 1 deletion go-runtime/ftl/ftltest/ftltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Context(options ...Option) context.Context {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx = internal.WithContext(ctx, newFakeFTL())
ctx = internal.WithContext(ctx, newFakeFTL(ctx))
name := reflection.Module()

for _, option := range options {
Expand Down Expand Up @@ -371,3 +371,61 @@ func getDSNFromSecret(ftl internal.FTL, module, name string) (string, error) {
}
return dsn, nil
}

// WithSubscriber adds a subscriber during a test
//
// By default, all subscribers are disabled in unit tests, and must be manually enabled by calling WithSubscriber(…).
// This allows easy isolation for each unit test.
//
// WithSubscriber(…) can also be used to make an ad-hoc subscriber for your test by defining a new function as the sink.
//
// To be used when setting up a context for a test:
//
// ctx := ftltest.Context(
// ftltest.WithSubscriber(paymentTopic, ProcessPayment),
// // ... other options
// )
func WithSubscriber[E any](subscription ftl.SubscriptionHandle[E], sink ftl.Sink[E]) Option {
return func(ctx context.Context, state *OptionsState) error {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
addSubscriber(fftl.pubSub, subscription, sink)
return nil
}
}

// EventsForTopic returns all published events for a topic
func EventsForTopic[E any](ctx context.Context, topic ftl.TopicHandle[E]) []E {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return eventsForTopic(ctx, fftl.pubSub, topic)
}

type SubscriptionResult[E any] struct {
Event E
Error ftl.Option[error]
}

// ResultsForSubscription returns all consumed events for a subscription, with any resulting errors
func ResultsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []SubscriptionResult[E] {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return resultsForSubscription(ctx, fftl.pubSub, subscription)
}

// ErrorsForSubscription returns all errors encountered while consuming events for a subscription
func ErrorsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []error {
errs := []error{}
for _, result := range ResultsForSubscription(ctx, subscription) {
if err, ok := result.Error.Get(); ok {
errs = append(errs, err)
}
}
return errs
}

// WaitForSubscriptionsToComplete waits until all subscriptions have consumed all events
//
// Subscriptions with no manually activated subscribers are ignored.
// Make sure you have called WithSubscriber(…) for all subscriptions you want to wait for.
func WaitForSubscriptionsToComplete(ctx context.Context) {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
fftl.pubSub.waitForSubscriptionsToComplete(ctx)
}
6 changes: 5 additions & 1 deletion go-runtime/ftl/ftltest/ftltest_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ func TestModuleUnitTests(t *testing.T) {
in.CopyModule("time"),
in.CopyModule("wrapped"),
in.CopyModule("verbtypes"),
in.Build("time", "wrapped", "verbtypes"),
in.CopyModule("pubsub"),
in.CopyModule("subscriber"),
in.Build("time", "wrapped", "verbtypes", "pubsub", "subscriber"),
in.ExecModuleTest("wrapped"),
in.ExecModuleTest("verbtypes"),
in.ExecModuleTest("pubsub"),
in.ExecModuleTest("subscriber"),
)
}
Loading

0 comments on commit ff66550

Please sign in to comment.