Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(pubsub): add retry guards to publish/receive test #6869

Merged
merged 5 commits into from
Jul 18, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 83 additions & 87 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,102 +265,98 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) {
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) {
t.Parallel()
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}
testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) {
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
r.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
r.Errorf("TopicExists error: %v", err)
}
if !exists {
r.Errorf("topic %v should exist, but it doesn't", topic)
}

sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: exactlyOnceDelivery,
})
if err != nil {
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableExactlyOnceDelivery: exactlyOnceDelivery,
})
}

// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
r.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
r.Errorf("SubExists error: %v", err)
}
if !exists {
r.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) {
if exactlyOnceDelivery {
if _, err := m.AckWithResult().Get(ctx); err != nil {
t.Fatalf("failed to ack message with exactly once delivery: %v", err)
// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
r.Errorf("r.Get: %v", err)
}
return
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
t.Fatal("pullN took too long")

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeout := 3 * time.Minute
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), 0, func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= timeout {
r.Errorf("pullN took longer than %v", timeout)
}
} else {
r.Errorf("Pull: %v", err)
}
} else {
t.Fatalf("Pull: %v", err)
}
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
})
})
}

Expand Down