From 8fb5b5fcbae3aaedff8ea3a09261eea252ace451 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Wed, 12 Jun 2024 13:39:10 +1000 Subject: [PATCH] fix up smaller issues --- go-runtime/ftl/ftltest/fake.go | 2 +- go-runtime/ftl/ftltest/ftltest.go | 10 +++++----- go-runtime/ftl/ftltest/pubsub.go | 24 +++++++++++------------- 3 files changed, 17 insertions(+), 19 deletions(-) diff --git a/go-runtime/ftl/ftltest/fake.go b/go-runtime/ftl/ftltest/fake.go index 274ba97a40..e3ebe1d5d0 100644 --- a/go-runtime/ftl/ftltest/fake.go +++ b/go-runtime/ftl/ftltest/fake.go @@ -161,5 +161,5 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any { } func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error { - return f.pubSub.publishEvent(ctx, topic, event) + return f.pubSub.publishEvent(topic, event) } diff --git a/go-runtime/ftl/ftltest/ftltest.go b/go-runtime/ftl/ftltest/ftltest.go index e67c7c26c2..423069b774 100644 --- a/go-runtime/ftl/ftltest/ftltest.go +++ b/go-runtime/ftl/ftltest/ftltest.go @@ -396,26 +396,26 @@ func WithSubscriber[E any](subscription ftl.SubscriptionHandle[E], sink ftl.Sink // EventsForTopic returns all published events for a topic func EventsForTopic[E any](ctx context.Context, topic ftl.TopicHandle[E]) []E { fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert - return eventsForTopic(ctx, fftl.pubSub, ftl.TopicHandle[E](topic)) + return eventsForTopic(ctx, fftl.pubSub, topic) } type SubscriptionResult[E any] struct { Event E - Error error + Error ftl.Option[error] } // ResultsForSubscription returns all consumed events for a subscription, with any resulting errors func ResultsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []SubscriptionResult[E] { fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert - return resultsForSubscription(ctx, fftl.pubSub, ftl.SubscriptionHandle[E](subscription)) + return resultsForSubscription(ctx, fftl.pubSub, subscription) } // ErrorsForSubscription returns all errors encountered while consuming events for a subscription func ErrorsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []error { errs := []error{} for _, result := range ResultsForSubscription(ctx, subscription) { - if result.Error != nil { - errs = append(errs, result.Error) + if err, ok := result.Error.Get(); ok { + errs = append(errs, err) } } return errs diff --git a/go-runtime/ftl/ftltest/pubsub.go b/go-runtime/ftl/ftltest/pubsub.go index 754751fb38..817708ad3b 100644 --- a/go-runtime/ftl/ftltest/pubsub.go +++ b/go-runtime/ftl/ftltest/pubsub.go @@ -40,10 +40,9 @@ func newFakePubSub(ctx context.Context) *fakePubSub { return f } -func (f *fakePubSub) publishEvent(ctx context.Context, topic *schema.Ref, event any) error { +func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error { f.publishWaitGroup.Add(1) - f.globalTopic.PublishSync(publishEvent{topic: topic, content: event}) - return nil + return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event}) } // addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added @@ -59,11 +58,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl } } - subscribers, ok := f.subscribers[sub.Name] - if !ok { - subscribers = []subscriber{} - } - f.subscribers[sub.Name] = append(subscribers, func(ctx context.Context, event any) error { + f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error { if event, ok := event.(E); ok { return sink(ctx, event) } @@ -116,13 +111,16 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft if !subscription.isExecuting { count++ } - for i := 0; i < count; i++ { + for i := range count { e := topic[i] if event, ok := e.(E); ok { - results = append(results, SubscriptionResult[E]{ + result := SubscriptionResult[E]{ Event: event, - Error: subscription.errors[i], - }) + } + if err, ok := subscription.errors[i]; ok { + result.Error = ftl.Some(err) + } + results = append(results, result) } else { logger.Warnf("unexpected event type %T for subscription %s", e, handle.Name) } @@ -266,7 +264,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr if shouldPrint { // print out what we are waiting on logger := log.FromContext(ctx).Scope("pubsub") - logger.Infof("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string { + logger.Debugf("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string { var suffix string if r.isExecuting { suffix = ", 1 executing"