Skip to content

Commit

Permalink
fix up smaller issues
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 12, 2024
1 parent 0c6020c commit 8bdce7b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 19 deletions.
2 changes: 1 addition & 1 deletion go-runtime/ftl/ftltest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ func actuallyCallMap(ctx context.Context, impl mapImpl) any {
}

func (f *fakeFTL) PublishEvent(ctx context.Context, topic *schema.Ref, event any) error {
return f.pubSub.publishEvent(ctx, topic, event)
return f.pubSub.publishEvent(topic, event)
}
10 changes: 5 additions & 5 deletions go-runtime/ftl/ftltest/ftltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,26 +396,26 @@ func WithSubscriber[E any](subscription ftl.SubscriptionHandle[E], sink ftl.Sink
// EventsForTopic returns all published events for a topic
func EventsForTopic[E any](ctx context.Context, topic ftl.TopicHandle[E]) []E {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return eventsForTopic(ctx, fftl.pubSub, ftl.TopicHandle[E](topic))
return eventsForTopic(ctx, fftl.pubSub, topic)
}

type SubscriptionResult[E any] struct {
Event E
Error error
Error ftl.Option[error]
}

// ResultsForSubscription returns all consumed events for a subscription, with any resulting errors
func ResultsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []SubscriptionResult[E] {
fftl := internal.FromContext(ctx).(*fakeFTL) //nolint:forcetypeassert
return resultsForSubscription(ctx, fftl.pubSub, ftl.SubscriptionHandle[E](subscription))
return resultsForSubscription(ctx, fftl.pubSub, subscription)
}

// ErrorsForSubscription returns all errors encountered while consuming events for a subscription
func ErrorsForSubscription[E any](ctx context.Context, subscription ftl.SubscriptionHandle[E]) []error {
errs := []error{}
for _, result := range ResultsForSubscription(ctx, subscription) {
if result.Error != nil {
errs = append(errs, result.Error)
if err, ok := result.Error.Get(); ok {
errs = append(errs, err)
}
}
return errs
Expand Down
24 changes: 11 additions & 13 deletions go-runtime/ftl/ftltest/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ func newFakePubSub(ctx context.Context) *fakePubSub {
return f
}

func (f *fakePubSub) publishEvent(ctx context.Context, topic *schema.Ref, event any) error {
func (f *fakePubSub) publishEvent(topic *schema.Ref, event any) error {
f.publishWaitGroup.Add(1)
f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
return nil
return f.globalTopic.PublishSync(publishEvent{topic: topic, content: event})
}

// addSubscriber adds a subscriber to the fake FTL instance. Each subscriber included in the test must be manually added
Expand All @@ -59,11 +58,7 @@ func addSubscriber[E any](f *fakePubSub, sub ftl.SubscriptionHandle[E], sink ftl
}
}

subscribers, ok := f.subscribers[sub.Name]
if !ok {
subscribers = []subscriber{}
}
f.subscribers[sub.Name] = append(subscribers, func(ctx context.Context, event any) error {
f.subscribers[sub.Name] = append(f.subscribers[sub.Name], func(ctx context.Context, event any) error {
if event, ok := event.(E); ok {
return sink(ctx, event)
}
Expand Down Expand Up @@ -116,13 +111,16 @@ func resultsForSubscription[E any](ctx context.Context, f *fakePubSub, handle ft
if !subscription.isExecuting {
count++
}
for i := 0; i < count; i++ {
for i := range count {
e := topic[i]
if event, ok := e.(E); ok {
results = append(results, SubscriptionResult[E]{
result := SubscriptionResult[E]{
Event: event,
Error: subscription.errors[i],
})
}
if err, ok := subscription.errors[i]; ok {
result.Error = ftl.Some(err)
}
results = append(results, result)
} else {
logger.Warnf("unexpected event type %T for subscription %s", e, handle.Name)
}
Expand Down Expand Up @@ -266,7 +264,7 @@ func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPr
if shouldPrint {
// print out what we are waiting on
logger := log.FromContext(ctx).Scope("pubsub")
logger.Infof("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string {
logger.Debugf("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string {
var suffix string
if r.isExecuting {
suffix = ", 1 executing"
Expand Down

0 comments on commit 8bdce7b

Please sign in to comment.