Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub/pstest): fix panic on undelivered message #7377

Merged
merged 5 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 11 additions & 22 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ type publishResponse struct {
err error
}

// For testing. Note that even though changes to the now variable are atomic, a call
// to the stored function can race with a change to that function. This could be a
// problem if tests are run in parallel, or even if concurrent parts of the same test
// change the value of the variable.
var now atomic.Value

func init() {
now.Store(time.Now)
ResetMinAckDeadline()
}

func timeNow() time.Time {
return now.Load().(func() time.Time)()
}

// Server is a fake Pub/Sub server.
type Server struct {
srv *testutil.Server
Expand All @@ -95,6 +80,8 @@ type GServer struct {
pb.UnimplementedSubscriberServer
pb.UnimplementedSchemaServiceServer

timeNowFunc atomic.Value

mu sync.Mutex
topics map[string]*topic
subs map[string]*subscription
Expand All @@ -103,7 +90,6 @@ type GServer struct {
wg sync.WaitGroup
nextID int
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
Expand Down Expand Up @@ -139,13 +125,13 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
topics: map[string]*topic{},
subs: map[string]*subscription{},
msgsByID: map[string]*Message{},
timeNowFunc: timeNow,
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string][]*pb.Schema{},
},
}
s.GServer.timeNowFunc.Store(time.Now)
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer)
Expand All @@ -156,7 +142,11 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
// SetTimeNowFunc registers f as a function to
// be used instead of time.Now for this server.
func (s *Server) SetTimeNowFunc(f func() time.Time) {
s.GServer.timeNowFunc = f
s.GServer.timeNowFunc.Store(f)
}

func (s *GServer) now() time.Time {
return s.timeNowFunc.Load().(func() time.Time)()
}

// Publish behaves as if the Publish RPC was called with a message with the given
Expand Down Expand Up @@ -501,7 +491,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
deadLetterTopic = dlTopic
}

sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps)
sub := newSubscription(top, &s.mu, s.now, deadLetterTopic, ps)
top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -737,7 +727,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis
id := fmt.Sprintf("m%d", s.nextID)
s.nextID++
pm.MessageId = id
pubTime := s.timeNowFunc()
pubTime := s.now()
tsPubTime := timestamppb.New(pubTime)
pm.PublishTime = tsPubTime
m := &Message{
Expand Down Expand Up @@ -1131,7 +1121,7 @@ func (s *subscription) tryDeliverMessage(m *message, start int, now time.Time) (
return 0, false
}

var retentionDuration = 10 * time.Minute
const retentionDuration = 10 * time.Minute

// Must be called with the lock held.
func (s *subscription) maintainMessages(now time.Time) {
Expand All @@ -1143,7 +1133,6 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
delete(s.msgs, id)
}
}
Expand Down
68 changes: 49 additions & 19 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,10 @@ func TestClearMessages(t *testing.T) {
}

// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".
func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
func publish(t *testing.T, srv *Server, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
pubTime := time.Now()
now.Store(func() time.Time { return pubTime })
defer func() { now.Store(time.Now) }()
srv.SetTimeNowFunc(func() time.Time { return pubTime })
defer srv.SetTimeNowFunc(time.Now)

res, err := pclient.Publish(context.Background(), &pb.PublishRequest{
Topic: topic.Name,
Expand All @@ -517,7 +517,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages
}

func TestPull(t *testing.T) {
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -527,7 +527,7 @@ func TestPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -548,7 +548,7 @@ func TestPull(t *testing.T) {

func TestStreamingPull(t *testing.T) {
// A simple test of streaming pull.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -558,7 +558,7 @@ func TestStreamingPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -572,7 +572,7 @@ func TestStreamingPull(t *testing.T) {
// This test acks each message as it arrives and makes sure we don't see dups.
func TestStreamingPullAck(t *testing.T) {
minAckDeadlineSecs = 1
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -582,7 +582,7 @@ func TestStreamingPullAck(t *testing.T) {
AckDeadlineSeconds: 1,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestAcknowledge(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestAcknowledge(t *testing.T) {

func TestModAck(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -672,7 +672,7 @@ func TestModAck(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -698,7 +698,7 @@ func TestModAck(t *testing.T) {

func TestAckDeadline(t *testing.T) {
// Messages should be resent after they expire.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

minAckDeadlineSecs = 2
Expand All @@ -709,7 +709,7 @@ func TestAckDeadline(t *testing.T) {
AckDeadlineSeconds: minAckDeadlineSecs,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -745,7 +745,7 @@ func TestAckDeadline(t *testing.T) {

func TestMultiSubs(t *testing.T) {
// Each subscription gets every message.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -760,7 +760,7 @@ func TestMultiSubs(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -782,7 +782,7 @@ func TestMultiSubs(t *testing.T) {
func TestMultiStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pclient, sclient, _, cleanup := newFake(ctx, t)
pclient, sclient, srv, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand Down Expand Up @@ -813,7 +813,7 @@ func TestMultiStreams(t *testing.T) {
close(st2Received)
}()

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
})
Expand Down Expand Up @@ -941,7 +941,7 @@ func TestModAck_Race(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, server, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -1599,3 +1599,33 @@ func TestSubscriptionMessageOrdering(t *testing.T) {
ids = ids[len(pull.ReceivedMessages):]
}
}

func TestSubscriptionRetention(t *testing.T) {
// Check that subscriptions with undelivered messages past the
// retention deadline do not trigger a panic.

ctx := context.Background()
s := NewServer()
defer s.Close()

start := time.Now()
s.SetTimeNowFunc(func() time.Time { return start })

const topicName = "projects/p/topics/t"
top, err := s.GServer.CreateTopic(ctx, &pb.Topic{Name: topicName})
if err != nil {
t.Fatal(err)
}
if _, err := s.GServer.CreateSubscription(ctx, &pb.Subscription{
Name: "projects/p/subscriptions/s",
Topic: top.Name,
AckDeadlineSeconds: 30,
EnableMessageOrdering: true,
}); err != nil {
t.Fatal(err)
}
s.Publish(topicName, []byte("payload"), nil)

s.SetTimeNowFunc(func() time.Time { return start.Add(retentionDuration + 1) })
time.Sleep(1 * time.Second)
}