Skip to content

Commit

Permalink
refactor(pubsub/pstest): replace global now atomic with struct field
Browse files Browse the repository at this point in the history
It is counterintuitive to plumb through a time.Now replacement function
only to have it refer to a global variable. Instead, make the Server
timeNowFunc an atomic.Value field and add a now method accessor that
loads the function value atomically and invokes it. Use that in tests,
and also in the exported SetTimeNowFunc variable.
  • Loading branch information
adg committed Feb 8, 2023
1 parent 220f8a5 commit 7d4b3be
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 39 deletions.
30 changes: 10 additions & 20 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,21 +66,6 @@ type publishResponse struct {
err error
}

// For testing. Note that even though changes to the now variable are atomic, a call
// to the stored function can race with a change to that function. This could be a
// problem if tests are run in parallel, or even if concurrent parts of the same test
// change the value of the variable.
var now atomic.Value

func init() {
now.Store(time.Now)
ResetMinAckDeadline()
}

func timeNow() time.Time {
return now.Load().(func() time.Time)()
}

// Server is a fake Pub/Sub server.
type Server struct {
srv *testutil.Server
Expand All @@ -95,6 +80,8 @@ type GServer struct {
pb.UnimplementedSubscriberServer
pb.UnimplementedSchemaServiceServer

timeNowFunc atomic.Value

mu sync.Mutex
topics map[string]*topic
subs map[string]*subscription
Expand All @@ -103,7 +90,6 @@ type GServer struct {
wg sync.WaitGroup
nextID int
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
Expand Down Expand Up @@ -139,13 +125,13 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
topics: map[string]*topic{},
subs: map[string]*subscription{},
msgsByID: map[string]*Message{},
timeNowFunc: timeNow,
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string][]*pb.Schema{},
},
}
s.GServer.timeNowFunc.Store(time.Now)
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
pb.RegisterSubscriberServer(srv.Gsrv, &s.GServer)
pb.RegisterSchemaServiceServer(srv.Gsrv, &s.GServer)
Expand All @@ -156,7 +142,11 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
// SetTimeNowFunc registers f as a function to
// be used instead of time.Now for this server.
func (s *Server) SetTimeNowFunc(f func() time.Time) {
s.GServer.timeNowFunc = f
s.GServer.timeNowFunc.Store(f)
}

func (s *GServer) now() time.Time {
return s.timeNowFunc.Load().(func() time.Time)()
}

// Publish behaves as if the Publish RPC was called with a message with the given
Expand Down Expand Up @@ -501,7 +491,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
deadLetterTopic = dlTopic
}

sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps)
sub := newSubscription(top, &s.mu, s.now, deadLetterTopic, ps)
top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -737,7 +727,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis
id := fmt.Sprintf("m%d", s.nextID)
s.nextID++
pm.MessageId = id
pubTime := s.timeNowFunc()
pubTime := s.now()
tsPubTime := timestamppb.New(pubTime)
pm.PublishTime = tsPubTime
m := &Message{
Expand Down
38 changes: 19 additions & 19 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,10 +491,10 @@ func TestClearMessages(t *testing.T) {
}

// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".
func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
func publish(t *testing.T, srv *Server, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
pubTime := time.Now()
now.Store(func() time.Time { return pubTime })
defer func() { now.Store(time.Now) }()
srv.SetTimeNowFunc(func() time.Time { return pubTime })
defer srv.SetTimeNowFunc(time.Now)

res, err := pclient.Publish(context.Background(), &pb.PublishRequest{
Topic: topic.Name,
Expand All @@ -517,7 +517,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages
}

func TestPull(t *testing.T) {
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -527,7 +527,7 @@ func TestPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -548,7 +548,7 @@ func TestPull(t *testing.T) {

func TestStreamingPull(t *testing.T) {
// A simple test of streaming pull.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -558,7 +558,7 @@ func TestStreamingPull(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -572,7 +572,7 @@ func TestStreamingPull(t *testing.T) {
// This test acks each message as it arrives and makes sure we don't see dups.
func TestStreamingPullAck(t *testing.T) {
minAckDeadlineSecs = 1
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -582,7 +582,7 @@ func TestStreamingPullAck(t *testing.T) {
AckDeadlineSeconds: 1,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -633,7 +633,7 @@ func TestAcknowledge(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestAcknowledge(t *testing.T) {

func TestModAck(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -672,7 +672,7 @@ func TestModAck(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -698,7 +698,7 @@ func TestModAck(t *testing.T) {

func TestAckDeadline(t *testing.T) {
// Messages should be resent after they expire.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

minAckDeadlineSecs = 2
Expand All @@ -709,7 +709,7 @@ func TestAckDeadline(t *testing.T) {
AckDeadlineSeconds: minAckDeadlineSecs,
})

_ = publish(t, pclient, top, []*pb.PubsubMessage{
_ = publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down Expand Up @@ -745,7 +745,7 @@ func TestAckDeadline(t *testing.T) {

func TestMultiSubs(t *testing.T) {
// Each subscription gets every message.
pclient, sclient, _, cleanup := newFake(context.TODO(), t)
pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand All @@ -760,7 +760,7 @@ func TestMultiSubs(t *testing.T) {
AckDeadlineSeconds: 10,
})

want := publish(t, pclient, top, []*pb.PubsubMessage{
want := publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand All @@ -782,7 +782,7 @@ func TestMultiSubs(t *testing.T) {
func TestMultiStreams(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pclient, sclient, _, cleanup := newFake(ctx, t)
pclient, sclient, srv, cleanup := newFake(ctx, t)
defer cleanup()

top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
Expand Down Expand Up @@ -813,7 +813,7 @@ func TestMultiStreams(t *testing.T) {
close(st2Received)
}()

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, srv, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
})
Expand Down Expand Up @@ -941,7 +941,7 @@ func TestModAck_Race(t *testing.T) {
AckDeadlineSeconds: 10,
})

publish(t, pclient, top, []*pb.PubsubMessage{
publish(t, server, pclient, top, []*pb.PubsubMessage{
{Data: []byte("d1")},
{Data: []byte("d2")},
{Data: []byte("d3")},
Expand Down

0 comments on commit 7d4b3be

Please sign in to comment.