diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index d407675aa26d..0afda6948fd4 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -66,21 +66,6 @@ type publishResponse struct { err error } -// For testing. Note that even though changes to the now variable are atomic, a call -// to the stored function can race with a change to that function. This could be a -// problem if tests are run in parallel, or even if concurrent parts of the same test -// change the value of the variable. -var now atomic.Value - -func init() { - now.Store(time.Now) - ResetMinAckDeadline() -} - -func timeNow() time.Time { - return now.Load().(func() time.Time)() -} - // Server is a fake Pub/Sub server. type Server struct { srv *testutil.Server @@ -95,6 +80,8 @@ type GServer struct { pb.UnimplementedSubscriberServer pb.UnimplementedSchemaServiceServer + timeNowFunc atomic.Value + mu sync.Mutex topics map[string]*topic subs map[string]*subscription @@ -103,7 +90,6 @@ type GServer struct { wg sync.WaitGroup nextID int streamTimeout time.Duration - timeNowFunc func() time.Time reactorOptions ReactorOptions // schemas is a map of schemaIDs to a slice of schema revisions. // the last element in the slice is the most recent schema. @@ -139,13 +125,13 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server { topics: map[string]*topic{}, subs: map[string]*subscription{}, msgsByID: map[string]*Message{}, - timeNowFunc: timeNow, reactorOptions: reactorOptions, publishResponses: make(chan *publishResponse, 100), autoPublishResponse: true, schemas: map[string][]*pb.Schema{}, }, } + s.GServer.timeNowFunc.Store(time.Now) pb.RegisterPublisherServer(srv.Gsrv, &s.GServer) pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer) pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer) @@ -156,7 +142,11 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server { // SetTimeNowFunc registers f as a function to // be used instead of time.Now for this server. func (s *Server) SetTimeNowFunc(f func() time.Time) { - s.GServer.timeNowFunc = f + s.GServer.timeNowFunc.Store(f) +} + +func (s *GServer) now() time.Time { + return s.timeNowFunc.Load().(func() time.Time)() } // Publish behaves as if the Publish RPC was called with a message with the given @@ -501,7 +491,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p deadLetterTopic = dlTopic } - sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps) + sub := newSubscription(top, &s.mu, s.now, deadLetterTopic, ps) top.subs[ps.Name] = sub s.subs[ps.Name] = sub sub.start(&s.wg) @@ -737,7 +727,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis id := fmt.Sprintf("m%d", s.nextID) s.nextID++ pm.MessageId = id - pubTime := s.timeNowFunc() + pubTime := s.now() tsPubTime := timestamppb.New(pubTime) pm.PublishTime = tsPubTime m := &Message{ @@ -1131,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) ( return 0, false } -var retentionDuration = 10 * time.Minute +const retentionDuration = 10 * time.Minute // Must be called with the lock held. func (s *subscription) maintainMessages(now time.Time) { @@ -1143,7 +1133,6 @@ func (s *subscription) maintainMessages(now time.Time) { pubTime := m.proto.Message.PublishTime.AsTime() // Remove messages that have been undelivered for a long time. if !m.outstanding() && now.Sub(pubTime) > retentionDuration { - s.publishToDeadLetter(m) delete(s.msgs, id) } } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 7e806cfb5fa8..8757254d696c 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -491,10 +491,10 @@ func TestClearMessages(t *testing.T) { } // Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now". -func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage { +func publish(t *testing.T, srv *Server, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage { pubTime := time.Now() - now.Store(func() time.Time { return pubTime }) - defer func() { now.Store(time.Now) }() + srv.SetTimeNowFunc(func() time.Time { return pubTime }) + defer srv.SetTimeNowFunc(time.Now) res, err := pclient.Publish(context.Background(), &pb.PublishRequest{ Topic: topic.Name, @@ -517,7 +517,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages } func TestPull(t *testing.T) { - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -527,7 +527,7 @@ func TestPull(t *testing.T) { AckDeadlineSeconds: 10, }) - want := publish(t, pclient, top, []*pb.PubsubMessage{ + want := publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -548,7 +548,7 @@ func TestPull(t *testing.T) { func TestStreamingPull(t *testing.T) { // A simple test of streaming pull. - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -558,7 +558,7 @@ func TestStreamingPull(t *testing.T) { AckDeadlineSeconds: 10, }) - want := publish(t, pclient, top, []*pb.PubsubMessage{ + want := publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -572,7 +572,7 @@ func TestStreamingPull(t *testing.T) { // This test acks each message as it arrives and makes sure we don't see dups. func TestStreamingPullAck(t *testing.T) { minAckDeadlineSecs = 1 - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -582,7 +582,7 @@ func TestStreamingPullAck(t *testing.T) { AckDeadlineSeconds: 1, }) - _ = publish(t, pclient, top, []*pb.PubsubMessage{ + _ = publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -633,7 +633,7 @@ func TestAcknowledge(t *testing.T) { AckDeadlineSeconds: 10, }) - publish(t, pclient, top, []*pb.PubsubMessage{ + publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -662,7 +662,7 @@ func TestAcknowledge(t *testing.T) { func TestModAck(t *testing.T) { ctx := context.Background() - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -672,7 +672,7 @@ func TestModAck(t *testing.T) { AckDeadlineSeconds: 10, }) - publish(t, pclient, top, []*pb.PubsubMessage{ + publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -698,7 +698,7 @@ func TestModAck(t *testing.T) { func TestAckDeadline(t *testing.T) { // Messages should be resent after they expire. - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() minAckDeadlineSecs = 2 @@ -709,7 +709,7 @@ func TestAckDeadline(t *testing.T) { AckDeadlineSeconds: minAckDeadlineSecs, }) - _ = publish(t, pclient, top, []*pb.PubsubMessage{ + _ = publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -745,7 +745,7 @@ func TestAckDeadline(t *testing.T) { func TestMultiSubs(t *testing.T) { // Each subscription gets every message. - pclient, sclient, _, cleanup := newFake(context.TODO(), t) + pclient, sclient, srv, cleanup := newFake(context.TODO(), t) defer cleanup() top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -760,7 +760,7 @@ func TestMultiSubs(t *testing.T) { AckDeadlineSeconds: 10, }) - want := publish(t, pclient, top, []*pb.PubsubMessage{ + want := publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -782,7 +782,7 @@ func TestMultiSubs(t *testing.T) { func TestMultiStreams(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pclient, sclient, _, cleanup := newFake(ctx, t) + pclient, sclient, srv, cleanup := newFake(ctx, t) defer cleanup() top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"}) @@ -813,7 +813,7 @@ func TestMultiStreams(t *testing.T) { close(st2Received) }() - publish(t, pclient, top, []*pb.PubsubMessage{ + publish(t, srv, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, }) @@ -941,7 +941,7 @@ func TestModAck_Race(t *testing.T) { AckDeadlineSeconds: 10, }) - publish(t, pclient, top, []*pb.PubsubMessage{ + publish(t, server, pclient, top, []*pb.PubsubMessage{ {Data: []byte("d1")}, {Data: []byte("d2")}, {Data: []byte("d3")}, @@ -1599,3 +1599,33 @@ func TestSubscriptionMessageOrdering(t *testing.T) { ids = ids[len(pull.ReceivedMessages):] } } + +func TestSubscriptionRetention(t *testing.T) { + // Check that subscriptions with undelivered messages past the + // retention deadline do not trigger a panic. + + ctx := context.Background() + s := NewServer() + defer s.Close() + + start := time.Now() + s.SetTimeNowFunc(func() time.Time { return start }) + + const topicName = "projects/p/topics/t" + top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName}) + if err != nil { + t.Fatal(err) + } + if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/p/subscriptions/s", + Topic: top.Name, + AckDeadlineSeconds: 30, + EnableMessageOrdering: true, + }); err != nil { + t.Fatal(err) + } + s.Publish(topicName, []byte("payload"), nil) + + s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) }) + time.Sleep(1 * time.Second) +}