From 71d937e96fb03be9bccd1fbf04a782355c1b1f9d Mon Sep 17 00:00:00 2001 From: Andrew Gerrand Date: Wed, 8 Feb 2023 15:34:16 +1100 Subject: [PATCH] fix(pubsub/pstest): prevent panic on undelievered messages If a running pstest Server has a subscription without a dead letter topic configured, and there's a pending message in the subscription that is not received after the retention duration (10 minutes), the deliver loop for the subscription will trigger a panic. This commit prevents the panic by checking not trying to deliver to the dead letter topic unless one is configured. --- pubsub/pstest/fake.go | 6 ++++-- pubsub/pstest/fake_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index e9c431e706b7..53e7eb02ed14 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -1121,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) ( return 0, false } -var retentionDuration = 10 * time.Minute +const retentionDuration = 10 * time.Minute // Must be called with the lock held. func (s *subscription) maintainMessages(now time.Time) { @@ -1133,7 +1133,9 @@ func (s *subscription) maintainMessages(now time.Time) { pubTime := m.proto.Message.PublishTime.AsTime() // Remove messages that have been undelivered for a long time. if !m.outstanding() && now.Sub(pubTime) > retentionDuration { - s.publishToDeadLetter(m) + if s.proto.DeadLetterPolicy != nil { + s.publishToDeadLetter(m) + } delete(s.msgs, id) } } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 6a82ed3cf4ab..0c5bab5742c6 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1599,3 +1599,31 @@ func TestSubscriptionMessageOrdering(t *testing.T) { ids = ids[len(pull.ReceivedMessages):] } } + +func TestSubscriptionRetention(t *testing.T) { + ctx := context.Background() + + s := NewServer() + defer s.Close() + + start := time.Now() + s.SetTimeNowFunc(func() time.Time { return start }) + + const topicName = "projects/p/topics/t" + top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName}) + if err != nil { + t.Fatal(err) + } + if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/p/subscriptions/s", + Topic: top.Name, + AckDeadlineSeconds: 30, + EnableMessageOrdering: true, + }); err != nil { + t.Fatal(err) + } + s.Publish(topicName, []byte("payload"), nil) + + s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) }) + time.Sleep(1 * time.Second) +}