Skip to content

Commit

Permalink
fix(pubsub/pstest): prevent panic on undelievered messages
Browse files Browse the repository at this point in the history
If a running pstest Server has a subscription without a dead letter
topic configured, and there's a pending message in the subscription that
is not received after the retention duration (10 minutes), the deliver
loop for the subscription will trigger a panic.

This commit prevents the panic by not trying to deliver expired messages
to the dead letter topic, since that's what the live service does
either.
  • Loading branch information
adg committed Feb 8, 2023
1 parent 7d4b3be commit a2f274d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
3 changes: 1 addition & 2 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (
return 0, false
}

var retentionDuration = 10 * time.Minute
const retentionDuration = 10 * time.Minute

// Must be called with the lock held.
func (s *subscription) maintainMessages(now time.Time) {
Expand All @@ -1133,7 +1133,6 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
delete(s.msgs, id)
}
}
Expand Down
30 changes: 30 additions & 0 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1599,3 +1599,33 @@ func TestSubscriptionMessageOrdering(t *testing.T) {
ids = ids[len(pull.ReceivedMessages):]
}
}

func TestSubscriptionRetention(t *testing.T) {
// Check that subscriptions with undelivered messages past the
// retention deadline do not trigger a panic.

ctx := context.Background()
s := NewServer()
defer s.Close()

start := time.Now()
s.SetTimeNowFunc(func() time.Time { return start })

const topicName = "projects/p/topics/t"
top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName})
if err != nil {
t.Fatal(err)
}
if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
s.Publish(topicName, []byte("payload"), nil)

s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) })
time.Sleep(1 * time.Second)
}

0 comments on commit a2f274d

Please sign in to comment.