diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 366f132b3fc3..dc7e1dbd0665 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -265,102 +265,98 @@ func withGoogleClientInfo(ctx context.Context) context.Context { func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous, exactlyOnceDelivery bool, numMsgs, extraBytes int) { t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,exactlyOnceDelivery:%t,numMsgs:%d", maxMsgs, synchronous, exactlyOnceDelivery, numMsgs), func(t *testing.T) { t.Parallel() - ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - t.Errorf("CreateTopic error: %v", err) - } - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("TopicExists error: %v", err) - } - if !exists { - t.Errorf("topic %v should exist, but it doesn't", topic) - } + testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) { + ctx := context.Background() + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + r.Errorf("CreateTopic error: %v", err) + } + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + r.Errorf("TopicExists error: %v", err) + } + if !exists { + r.Errorf("topic %v should exist, but it doesn't", topic) + } - sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ - Topic: topic, - EnableExactlyOnceDelivery: exactlyOnceDelivery, - }) - if err != nil { - t.Errorf("CreateSub error: %v", err) - } - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatalf("SubExists error: %v", err) - } - if !exists { - t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) - } - var msgs []*Message - for i := 0; i < numMsgs; i++ { - text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) - attrs := make(map[string]string) - attrs["foo"] = "bar" - msgs = append(msgs, &Message{ - Data: []byte(text), - Attributes: attrs, + sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + Topic: topic, + EnableExactlyOnceDelivery: exactlyOnceDelivery, }) - } - - // Publish some messages. - type pubResult struct { - m *Message - r *PublishResult - } - var rs []pubResult - for _, m := range msgs { - r := topic.Publish(ctx, m) - rs = append(rs, pubResult{m, r}) - } - want := make(map[string]messageData) - for _, res := range rs { - id, err := res.r.Get(ctx) if err != nil { - t.Fatal(err) + r.Errorf("CreateSub error: %v", err) + } + exists, err = sub.Exists(ctx) + if err != nil { + r.Errorf("SubExists error: %v", err) + } + if !exists { + r.Errorf("subscription %s should exist, but it doesn't", sub.ID()) + } + var msgs []*Message + for i := 0; i < numMsgs; i++ { + text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) + attrs := make(map[string]string) + attrs["foo"] = "bar" + msgs = append(msgs, &Message{ + Data: []byte(text), + Attributes: attrs, + }) } - md := extractMessageData(res.m) - md.ID = id - want[md.ID] = md - } - - sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs - sub.ReceiveSettings.Synchronous = synchronous - // Use a timeout to ensure that Pull does not block indefinitely if there are - // unexpectedly few messages available. - now := time.Now() - timeout := 3 * time.Minute - timeoutCtx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), 2*time.Second, func(ctx context.Context, m *Message) { - if exactlyOnceDelivery { - if _, err := m.AckWithResult().Get(ctx); err != nil { - t.Fatalf("failed to ack message with exactly once delivery: %v", err) + // Publish some messages. + type pubResult struct { + m *Message + r *PublishResult + } + var rs []pubResult + for _, m := range msgs { + r := topic.Publish(ctx, m) + rs = append(rs, pubResult{m, r}) + } + want := make(map[string]messageData) + for _, res := range rs { + id, err := res.r.Get(ctx) + if err != nil { + r.Errorf("r.Get: %v", err) } - return + md := extractMessageData(res.m) + md.ID = id + want[md.ID] = md } - m.Ack() - }) - if err != nil { - if c := status.Convert(err); c.Code() == codes.Canceled { - if time.Since(now) >= timeout { - t.Fatal("pullN took too long") + + sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs + sub.ReceiveSettings.Synchronous = synchronous + + // Use a timeout to ensure that Pull does not block indefinitely if there are + // unexpectedly few messages available. + now := time.Now() + timeout := 3 * time.Minute + timeoutCtx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + gotMsgs, err := pullN(timeoutCtx, sub, len(want), 0, func(ctx context.Context, m *Message) { + m.Ack() + }) + if err != nil { + if c := status.Convert(err); c.Code() == codes.Canceled { + if time.Since(now) >= timeout { + r.Errorf("pullN took longer than %v", timeout) + } + } else { + r.Errorf("Pull: %v", err) } - } else { - t.Fatalf("Pull: %v", err) } - } - got := make(map[string]messageData) - for _, m := range gotMsgs { - md := extractMessageData(m) - got[md.ID] = md - } - if !testutil.Equal(got, want) { - t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", - maxMsgs, synchronous, got, want) - } + got := make(map[string]messageData) + for _, m := range gotMsgs { + md := extractMessageData(m) + got[md.ID] = md + } + if !testutil.Equal(got, want) { + r.Errorf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", + maxMsgs, synchronous, got, want) + } + }) }) }