From 9cfcaf6ea2a689e875b8400ca5872ccadcb17252 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 7 Sep 2023 12:39:28 -0700 Subject: [PATCH] test(pubsub): add retry to all integration test resource creation (#8533) * test(pubsub): add retry to all admin tests * revert comment in DetectProjectID test * make ctx first argument in helper funcs --- pubsub/integration_test.go | 165 ++++++++++++++++++++++--------------- 1 file changed, 98 insertions(+), 67 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index d78ba332fe91..960e8240da4d 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -123,13 +123,13 @@ func integrationTestSchemaClient(ctx context.Context, t *testing.T, opts ...opti return sc } -func TestIntegration_All(t *testing.T) { +func TestIntegration_Admin(t *testing.T) { t.Parallel() ctx := context.Background() client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Errorf("CreateTopic error: %v", err) } @@ -143,7 +143,7 @@ func TestIntegration_All(t *testing.T) { } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { t.Errorf("CreateSub error: %v", err) } exists, err = sub.Exists(ctx) @@ -230,7 +230,7 @@ func TestIntegration_All(t *testing.T) { } } -func TestPublishReceive(t *testing.T) { +func TestIntegration_PublishReceive(t *testing.T) { ctx := context.Background() client := integrationTestClient(ctx, t) @@ -267,10 +267,11 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou t.Parallel() testutil.Retry(t, 3, 10*time.Second, func(r *testutil.R) { ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { r.Errorf("CreateTopic error: %v", err) } + defer topic.Delete(ctx) defer topic.Stop() exists, err := topic.Exists(ctx) if err != nil { @@ -280,13 +281,14 @@ func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronou r.Errorf("topic %v should exist, but it doesn't", topic) } - sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ Topic: topic, EnableExactlyOnceDelivery: exactlyOnceDelivery, }) if err != nil { r.Errorf("CreateSub error: %v", err) } + defer sub.Delete(ctx) exists, err = sub.Exists(ctx) if err != nil { r.Errorf("SubExists error: %v", err) @@ -428,7 +430,7 @@ func TestIntegration_LargePublishSize(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -489,29 +491,24 @@ func TestIntegration_LargePublishSize(t *testing.T) { func TestIntegration_CancelReceive(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() client := integrationTestClient(ctx, t) defer client.Close() - var topic *Topic - var err error - testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) { - topic, err = client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - r.Errorf("failed to create topic: %v", err) - } - }) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + t.Errorf("failed to create topic: %v", err) + } defer topic.Delete(ctx) defer topic.Stop() var sub *Subscription - testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) { - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { - r.Errorf("failed to create subscription: %v", err) - } - }) + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + t.Fatalf("failed to create subscription: %v", err) + } defer sub.Delete(ctx) + ctx, cancel := context.WithCancel(context.Background()) sub.ReceiveSettings.MaxOutstandingMessages = -1 sub.ReceiveSettings.MaxOutstandingBytes = -1 sub.ReceiveSettings.NumGoroutines = 1 @@ -532,14 +529,11 @@ func TestIntegration_CancelReceive(t *testing.T) { }() go func() { - defer close(doneReceiving) err = sub.Receive(ctx, func(_ context.Context, msg *Message) { cancel() time.AfterFunc(5*time.Second, msg.Ack) }) - if err != nil { - t.Error(err) - } + close(doneReceiving) }() select { @@ -555,7 +549,7 @@ func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -567,7 +561,7 @@ func TestIntegration_CreateSubscription_NeverExpire(t *testing.T) { ExpirationPolicy: time.Duration(0), } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -600,6 +594,9 @@ func findServiceAccountEmail(ctx context.Context, t *testing.T) string { } jwtConf, err = google.JWTConfigFromJSON(creds.JSON) if err != nil { + if strings.Contains(err.Error(), "authorized_user") { + t.Skip("Found ADC user so can't get serviceAccountEmail") + } t.Fatalf("Failed to parse Google JWTConfig from JSON: %v", err) } return jwtConf.Email @@ -614,7 +611,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { serviceAccountEmail := findServiceAccountEmail(ctx, t) - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -633,7 +630,7 @@ func TestIntegration_UpdateSubscription(t *testing.T) { }, }, } - if sub, err = client.CreateSubscription(ctx, subIDs.New(), sCfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), sCfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -752,7 +749,7 @@ func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -760,7 +757,7 @@ func TestIntegration_UpdateSubscription_ExpirationPolicy(t *testing.T) { defer topic.Stop() var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -827,7 +824,7 @@ func TestIntegration_UpdateTopicLabels(t *testing.T) { return testutil.Equal(got.Labels, wantLabels) } - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -866,7 +863,7 @@ func TestIntegration_PublicTopic(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - sub, err := client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ Topic: client.TopicInProject("taxirides-realtime", "pubsub-public-data"), }) if err != nil { @@ -882,7 +879,7 @@ func TestIntegration_Errors(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -926,7 +923,7 @@ func TestIntegration_Errors(t *testing.T) { } // Updating out-of-range retention duration. - sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}) + sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{Topic: topic}) if err != nil { t.Fatal(err) } @@ -943,7 +940,7 @@ func TestIntegration_MessageStoragePolicy_TopicLevel(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1007,7 +1004,7 @@ func TestIntegration_MessageStoragePolicy_ProjectLevel(t *testing.T) { if err != nil { t.Fatalf("Creating client error: %v", err) } - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1081,7 +1078,7 @@ func TestIntegration_CreateTopic_KMS(t *testing.T) { tc := TopicConfig{ KMSKeyName: key.GetName(), } - topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc) if err != nil { t.Fatalf("CreateTopicWithConfig error: %v", err) } @@ -1110,7 +1107,7 @@ func TestIntegration_CreateTopic_MessageStoragePolicy(t *testing.T) { AllowedPersistenceRegions: []string{"us-east1"}, }, } - topic, err := client.CreateTopicWithConfig(ctx, topicIDs.New(), &tc) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), &tc) if err != nil { t.Fatalf("CreateTopicWithConfig error: %v", err) } @@ -1132,10 +1129,11 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatal(err) } + defer topic.Delete(ctx) defer topic.Stop() exists, err := topic.Exists(ctx) if err != nil { @@ -1145,13 +1143,13 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { t.Fatalf("topic %v should exist, but it doesn't", topic) } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ Topic: topic, EnableMessageOrdering: true, }); err != nil { t.Fatal(err) } - _ = sub + defer sub.Delete(ctx) exists, err = sub.Exists(ctx) if err != nil { t.Fatal(err) @@ -1211,10 +1209,11 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatal(err) } + defer topic.Delete(ctx) defer topic.Stop() exists, err := topic.Exists(ctx) if err != nil { @@ -1224,13 +1223,13 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) { t.Fatalf("topic %v should exist, but it doesn't", topic) } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ Topic: topic, EnableMessageOrdering: true, }); err != nil { t.Fatal(err) } - _ = sub + defer sub.Delete(ctx) exists, err = sub.Exists(ctx) if err != nil { t.Fatal(err) @@ -1317,7 +1316,7 @@ func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) { client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatal(err) } @@ -1373,7 +1372,7 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatal(err) } @@ -1394,7 +1393,7 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { Topic: topic, EnableMessageOrdering: enableMessageOrdering, } - sub, err := client.CreateSubscription(ctx, subIDs.New(), subCfg) + sub, err := createSubWithRetry(ctx, t, client, subIDs.New(), subCfg) if err != nil { t.Fatal(err) } @@ -1441,15 +1440,14 @@ func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } - defer topic.Delete(ctx) defer topic.Stop() - deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New()) + deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1465,7 +1463,7 @@ func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { }, } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1515,7 +1513,7 @@ func TestIntegration_DeadLetterPolicy_DeliveryAttempt(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1526,7 +1524,7 @@ func TestIntegration_DeadLetterPolicy_DeliveryAttempt(t *testing.T) { Topic: topic, } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1557,14 +1555,14 @@ func TestIntegration_DeadLetterPolicy_ClearDeadLetter(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } defer topic.Delete(ctx) defer topic.Stop() - deadLetterTopic, err := client.CreateTopic(ctx, topicIDs.New()) + deadLetterTopic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1578,7 +1576,7 @@ func TestIntegration_DeadLetterPolicy_ClearDeadLetter(t *testing.T) { }, } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1616,7 +1614,7 @@ func TestIntegration_Filter_CreateSubscription(t *testing.T) { ctx := context.Background() client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1627,7 +1625,7 @@ func TestIntegration_Filter_CreateSubscription(t *testing.T) { Filter: "attributes.event_type = \"1\"", } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1677,7 +1675,7 @@ func TestIntegration_RetryPolicy(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1692,7 +1690,7 @@ func TestIntegration_RetryPolicy(t *testing.T) { }, } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1741,7 +1739,7 @@ func TestIntegration_DetachSubscription(t *testing.T) { client := integrationTestClient(ctx, t) defer client.Close() - topic, err := client.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) if err != nil { t.Fatalf("CreateTopic error: %v", err) } @@ -1752,7 +1750,7 @@ func TestIntegration_DetachSubscription(t *testing.T) { Topic: topic, } var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), cfg); err != nil { + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), cfg); err != nil { t.Fatalf("CreateSub error: %v", err) } defer sub.Delete(ctx) @@ -1963,7 +1961,7 @@ func TestIntegration_TopicRetention(t *testing.T) { tc := TopicConfig{ RetentionDuration: 50 * time.Minute, } - topic, err := c.CreateTopicWithConfig(ctx, topicIDs.New(), &tc) + topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), &tc) if err != nil { r.Errorf("failed to create topic: %v", err) } @@ -1982,12 +1980,13 @@ func TestIntegration_TopicRetention(t *testing.T) { } // Create a subscription on the topic and read TopicMessageRetentionDuration. - s, err := c.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{ + s, err := createSubWithRetry(ctx, t, c, subIDs.New(), SubscriptionConfig{ Topic: topic, }) if err != nil { r.Errorf("failed to create subscription: %v", err) } + defer s.Delete(ctx) sCfg, err := s.Config(ctx) if err != nil { r.Errorf("failed to get sub config: %v", err) @@ -2041,7 +2040,7 @@ func TestIntegration_TopicUpdateSchema(t *testing.T) { } defer sc.DeleteSchema(ctx, schemaID) - topic, err := c.CreateTopic(ctx, topicIDs.New()) + topic, err := createTopicWithRetry(ctx, t, c, topicIDs.New(), nil) if err != nil { t.Fatal(err) } @@ -2082,8 +2081,40 @@ func TestIntegration_DetectProjectID(t *testing.T) { } badTS := testutil.ErroringTokenSource{} - if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil { t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.projectID) } } + +// createTopicWithRetry creates a topic, wrapped with testutil.Retry and returns the created topic or an error. +func createTopicWithRetry(ctx context.Context, t *testing.T, c *Client, topicID string, cfg *TopicConfig) (*Topic, error) { + var topic *Topic + var err error + testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) { + if cfg != nil { + topic, err = c.CreateTopicWithConfig(ctx, topicID, cfg) + if err != nil { + r.Errorf("CreateTopic error: %v", err) + } + } else { + topic, err = c.CreateTopic(ctx, topicID) + if err != nil { + r.Errorf("CreateTopic error: %v", err) + } + } + }) + return topic, err +} + +// createSubWithRetry creates a subscription, wrapped with testutil.Retry and returns the created subscription or an error. +func createSubWithRetry(ctx context.Context, t *testing.T, c *Client, subID string, cfg SubscriptionConfig) (*Subscription, error) { + var sub *Subscription + var err error + testutil.Retry(t, 5, 1*time.Second, func(r *testutil.R) { + sub, err = c.CreateSubscription(ctx, subID, cfg) + if err != nil { + r.Errorf("CreateSub error: %v", err) + } + }) + return sub, err +}