Skip to content

Commit

Permalink
separate out idle check into its own func
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Jun 12, 2024
1 parent ce57810 commit 0c6020c
Showing 1 changed file with 55 additions and 51 deletions.
106 changes: 55 additions & 51 deletions go-runtime/ftl/ftltest/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,66 +209,70 @@ func (f *fakePubSub) handlePubSubEvent(ctx context.Context, e pubSubEvent) {
// subscriptions with no subscribers are ignored.
// logs what which subscriptions are blocking every 2s.
func (f *fakePubSub) waitForSubscriptionsToComplete(ctx context.Context) {
logger := log.FromContext(ctx).Scope("pubsub")
startTime := time.Now()
nextLogTime := startTime.Add(2 * time.Second)

// Make sure all published events make it into our pubsub state
f.publishWaitGroup.Wait()

for {
if func() bool {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

type remainingState struct {
name string
isExecuting bool
pendingEvents int
}
remaining := []remainingState{}
for _, sub := range f.subscriptions {
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
continue
}
var cursor = sub.cursor.Default(-1)
if !sub.isExecuting && len(topicEvents) <= cursor+1 {
// all events have been consumed
continue
}
subscribers, ok := f.subscribers[sub.name]
if !ok || len(subscribers) == 0 {
// no subscribers
continue
}
remaining = append(remaining, remainingState{
name: sub.name,
isExecuting: sub.isExecuting,
pendingEvents: len(topicEvents) - cursor - 1,
})
}
if len(remaining) == 0 {
// not waiting on any more subscriptions
return true
}
if time.Now().After(nextLogTime) {
// print out what we are waiting on
nextLogTime = time.Now().Add(2 * time.Second)
logger.Infof("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string {
var suffix string
if r.isExecuting {
suffix = ", 1 executing"
}
return fmt.Sprintf(" %s: %d events pending%s", r.name, r.pendingEvents, suffix)
}), "\n"))
}
return false
}() {
// reached idle state
shouldPrint := time.Now().After(nextLogTime)
if f.checkSubscriptionsAreComplete(ctx, shouldPrint, startTime) {
return
}
if shouldPrint {
nextLogTime = time.Now().Add(2 * time.Second)
}
time.Sleep(200 * time.Millisecond)
}
}

func (f *fakePubSub) checkSubscriptionsAreComplete(ctx context.Context, shouldPrint bool, startTime time.Time) bool {
f.pubSubLock.Lock()
defer f.pubSubLock.Unlock()

type remainingState struct {
name string
isExecuting bool
pendingEvents int
}
remaining := []remainingState{}
for _, sub := range f.subscriptions {
topicEvents, ok := f.topics[sub.topic.ToRefKey()]
if !ok {
// no events publshed yet
continue
}
var cursor = sub.cursor.Default(-1)
if !sub.isExecuting && len(topicEvents) <= cursor+1 {
// all events have been consumed
continue
}
subscribers, ok := f.subscribers[sub.name]
if !ok || len(subscribers) == 0 {
// no subscribers
continue
}
remaining = append(remaining, remainingState{
name: sub.name,
isExecuting: sub.isExecuting,
pendingEvents: len(topicEvents) - cursor - 1,
})
}
if len(remaining) == 0 {
// not waiting on any more subscriptions
return true
}
if shouldPrint {
// print out what we are waiting on
logger := log.FromContext(ctx).Scope("pubsub")
logger.Infof("waiting on subscriptions to complete after %ds:\n%s", int(time.Until(startTime).Seconds()*-1), strings.Join(slices.Map(remaining, func(r remainingState) string {
var suffix string
if r.isExecuting {
suffix = ", 1 executing"
}
return fmt.Sprintf(" %s: %d events pending%s", r.name, r.pendingEvents, suffix)
}), "\n"))
}
return false
}

0 comments on commit 0c6020c

Please sign in to comment.