Skip to content

Commit

Permalink
test(pubsub): add retry guards to publish/receive test (#6869)
Browse files Browse the repository at this point in the history
* test(pubsub): add retry guards to publish/receive test

* fix pullN arguments
  • Loading branch information
hongalex authored Jul 18, 2023
1 parent 715715c commit 5b288b5
Showing 1 changed file with 83 additions and 87 deletions.
170 changes: 83 additions & 87 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,102 +265,98 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}
testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) {
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
r.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
r.Errorf("TopicExists error: %v", err)
}
if !exists {
r.Errorf("topic %v should exist, but it doesn't", topic)
}

sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: exactlyOnceDelivery,
})
if err != nil {
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: exactlyOnceDelivery,
})
}

// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
r.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
r.Errorf("SubExists error: %v", err)
}
if !exists {
r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) {
if exactlyOnceDelivery {
if _, err := m.AckWithResult().Get(ctx); err != nil {
t.Fatalf("failed to ack message with exactly once delivery: %v", err)
// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
r.Errorf("r.Get: %v", err)
}
return
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
t.Fatal("pullN took too long")

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 0, func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
r.Errorf("pullN took longer than %v", timeout)
}
} else {
r.Errorf("Pull: %v", err)
}
} else {
t.Fatalf("Pull: %v", err)
}
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
})
})
}

Expand Down

0 comments on commit 5b288b5

Please sign in to comment.