Skip to content

Commit

Permalink
fix(pubsub/pstest): prevent panic on undelievered messages
Browse files Browse the repository at this point in the history
If a running pstest Server has a subscription without a dead letter
topic configured, and there's a pending message in the subscription that
is not received after the retention duration (10 minutes), the deliver
loop for the subscription will trigger a panic.

This commit prevents the panic by checking not trying to deliver to the
dead letter topic unless one is configured.
  • Loading branch information
adg committed Feb 8, 2023
1 parent ef67835 commit 71d937e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
6 changes: 4 additions & 2 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (
return 0, false
}

var retentionDuration = 10 * time.Minute
const retentionDuration = 10 * time.Minute

// Must be called with the lock held.
func (s *subscription) maintainMessages(now time.Time) {
Expand All @@ -1133,7 +1133,9 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
if s.proto.DeadLetterPolicy != nil {
s.publishToDeadLetter(m)
}
delete(s.msgs, id)
}
}
Expand Down
28 changes: 28 additions & 0 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1599,3 +1599,31 @@ func TestSubscriptionMessageOrdering(t *testing.T) {
ids = ids[len(pull.ReceivedMessages):]
}
}

func TestSubscriptionRetention(t *testing.T) {
ctx := context.Background()

s := NewServer()
defer s.Close()

start := time.Now()
s.SetTimeNowFunc(func() time.Time { return start })

const topicName = "projects/p/topics/t"
top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName})
if err != nil {
t.Fatal(err)
}
if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
s.Publish(topicName, []byte("payload"), nil)

s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) })
time.Sleep(1 * time.Second)
}

0 comments on commit 71d937e

Please sign in to comment.