Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ftltest supports pubsub #1736

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions backend/controller/pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,17 @@ func TestRetry(t *testing.T) {
1+retriesPerCall),
)
}

func TestExternalPublishRuntimeCheck(t *testing.T) {
in.Run(t, "",
in.CopyModule("publisher"),
in.CopyModule("subscriber"),
in.Deploy("publisher"),
in.Deploy("subscriber"),

in.ExpectError(
in.Call("subscriber", "publishToExternalModule", in.Obj{}, func(t testing.TB, resp in.Obj) {}),
"can not publish to another module's topic",
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"ftl/publisher"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl" // Import the FTL SDK.
)
Expand All @@ -25,3 +26,10 @@ var _ = ftl.Subscription(publisher.Topic2, "doomed_subscription")
func ConsumeButFailAndRetry(ctx context.Context, req publisher.PubSubEvent) error {
return fmt.Errorf("always error: event %v", req.Time)
}

//ftl:verb
func PublishToExternalModule(ctx context.Context) error {
// Get around compile-time checks
var topic = publisher.Test_topic
return topic.Publish(ctx, publisher.PubSubEvent{Time: time.Now()})
}
56 changes: 49 additions & 7 deletions go-runtime/ftl/ftltest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,72 @@ import (
"reflect"
"strings"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/go-runtime/internal"
"github.com/alecthomas/types/optional"
)

// pubSubEvent is a sum type for all events that can be published to the pubsub system.
// not to be confused with an event that gets published to a topic
//
//sumtype:decl
type pubSubEvent interface {
// cronJobEvent is a marker to ensure that all events implement the interface.
pubSubEvent()
}

// publishEvent holds an event to be published to a topic
type publishEvent struct {
topic *schema.Ref
content any
}

func (publishEvent) pubSubEvent() {}

// subscriptionDidConsumeEvent indicates that a call to a subscriber has completed
type subscriptionDidConsumeEvent struct {
subscription string
err error
}

func (subscriptionDidConsumeEvent) pubSubEvent() {}

type subscription struct {
name string
topic *schema.Ref
cursor optional.Option[int]
isExecuting bool
errors map[int]error
}

type subscriber func(context.Context, any) error

type fakeFTL struct {
fsm *fakeFSMManager
fsm *fakeFSMManager

mockMaps map[uintptr]mapImpl
allowMapCalls bool
configValues map[string][]byte
secretValues map[string][]byte
pubSub *fakePubSub
}

// mapImpl is a function that takes an object and returns an object of a potentially different
// type but is not constrained by input/output type like ftl.Map.
type mapImpl func(context.Context) (any, error)

func newFakeFTL() *fakeFTL {
return &fakeFTL{
func newFakeFTL(ctx context.Context) *fakeFTL {
fake := &fakeFTL{
fsm: newFakeFSMManager(),
mockMaps: map[uintptr]mapImpl{},
allowMapCalls: false,
configValues: map[string][]byte{},
secretValues: map[string][]byte{},
pubSub: newFakePubSub(ctx),
}

return fake
}

var _ internal.FTL = &fakeFTL{}
Expand Down Expand Up @@ -73,10 +115,6 @@ func (f *fakeFTL) FSMSend(ctx context.Context, fsm string, instance string, even
return f.fsm.SendEvent(ctx, fsm, instance, event)
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic string, event any) error {
panic("not implemented")
}

// addMapMock saves a new mock of ftl.Map to the internal map in fakeFTL.
//
// mockMap provides the whole mock implemention, so it gets called in place of both `fn`
Expand Down Expand Up @@ -121,3 +159,7 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any {
}
return out
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
return f.pubSub.publishEvent(topic, event)
}
60 changes: 59 additions & 1 deletion go-runtime/ftl/ftltest/ftltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Context(options ...Option) context.Context {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
ctx = internal.WithContext(ctx, newFakeFTL())
ctx = internal.WithContext(ctx, newFakeFTL(ctx))
name := reflection.Module()

for _, option := range options {
Expand Down Expand Up @@ -371,3 +371,61 @@ func getDSNFromSecret(ftl internal.FTL, module, name string) (string, error) {
}
return dsn, nil
}

// WithSubscriber adds a subscriber during a test
//
// By default, all subscribers are disabled in unit tests, and must be manually enabled by calling WithSubscriber(…).
// This allows easy isolation for each unit test.
//
// WithSubscriber(…) can also be used to make an ad-hoc subscriber for your test by defining a new function as the sink.
//
// To be used when setting up a context for a test:
//
// ctx := ftltest.Context(
// ftltest.WithSubscriber(paymentTopic, ProcessPayment),
// // ... other options
// )
func WithSubscriber[E any](subscription ftl.SubscriptionHandle[E], sink ftl.Sink[E]) Option {
return func(ctx context.Context, state *OptionsState) error {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
addSubscriber(fftl.pubSub, subscription, sink)
return nil
}
}

// EventsForTopic returns all published events for a topic
func EventsForTopic[E any](ctx context.Context, topic ftl.TopicHandle[E]) []E {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return eventsForTopic(ctx, fftl.pubSub, topic)
}

type SubscriptionResult[E any] struct {
Event E
Error ftl.Option[error]
}

// ResultsForSubscription returns all consumed events for a subscription, with any resulting errors
func ResultsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []SubscriptionResult[E] {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return resultsForSubscription(ctx, fftl.pubSub, subscription)
}

// ErrorsForSubscription returns all errors encountered while consuming events for a subscription
func ErrorsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []error {
errs := []error{}
for _, result := range ResultsForSubscription(ctx, subscription) {
if err, ok := result.Error.Get(); ok {
errs = append(errs, err)
}
}
return errs
}

// WaitForSubscriptionsToComplete waits until all subscriptions have consumed all events
//
// Subscriptions with no manually activated subscribers are ignored.
// Make sure you have called WithSubscriber(…) for all subscriptions you want to wait for.
func WaitForSubscriptionsToComplete(ctx context.Context) {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
fftl.pubSub.waitForSubscriptionsToComplete(ctx)
}
6 changes: 5 additions & 1 deletion go-runtime/ftl/ftltest/ftltest_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ func TestModuleUnitTests(t *testing.T) {
in.CopyModule("time"),
in.CopyModule("wrapped"),
in.CopyModule("verbtypes"),
in.Build("time", "wrapped", "verbtypes"),
in.CopyModule("pubsub"),
in.CopyModule("subscriber"),
in.Build("time", "wrapped", "verbtypes", "pubsub", "subscriber"),
in.ExecModuleTest("wrapped"),
in.ExecModuleTest("verbtypes"),
in.ExecModuleTest("pubsub"),
in.ExecModuleTest("subscriber"),
)
}
Loading
Loading