Skip to content

Commit

Permalink
changefeedccl: fix pubsub v2 unit tests on AWS
Browse files Browse the repository at this point in the history
The pubsub V2 tests would fail with
```
failed to start feed for job 0: pq: opening client: google: could not find
default credentials. See
https://developers.google.com/accounts/docs/application-default-credentials for
more information
```
only on release-23.1 test runs because every other test was running on google
cloud machines, where you didn't need to even have `gcloud` installed for it to
work.  This happens only on the initial attempt to initialize a PubsubClient
when the GRPCConn is not overriden.

This PR fixes it by skipping initialization entirely so that we don't have to
deal with errors like this or having ensure the old connection is cleaned up
before setting the mock one.

Release note: None

<what was there before: Previously, ...>
<why it needed to change: This was inadequate because ...>
<what you did about it: To address this, this patch ...>
  • Loading branch information
samiskin committed Apr 10, 2023
1 parent 9ce0ac2 commit 368d2db
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 21 deletions.
31 changes: 16 additions & 15 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,16 @@ func makePubsubSinkClient(
return nil, errors.New("missing project name")
}

publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs)
if err != nil {
return nil, err
var err error
var publisherClient *pubsub.PublisherClient

// In unit tests the publisherClient gets set immediately after initializing
// the sink object via knobs.WrapSink.
if knobs == nil || !knobs.PubsubClientSkipClientCreation {
publisherClient, err = makePublisherClient(ctx, pubsubURL, unordered)
if err != nil {
return nil, err
}
}

sinkClient := &pubsubSinkClient{
Expand Down Expand Up @@ -250,7 +257,7 @@ func (pe *pubsubSinkClient) Close() error {
}

func makePublisherClient(
ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs,
ctx context.Context, url sinkURL, unordered bool,
) (*pubsub.PublisherClient, error) {
const regionParam = "region"
region := url.consumeParam(regionParam)
Expand All @@ -267,21 +274,15 @@ func makePublisherClient(
endpoint = gcpEndpointForRegion(region)
}

options := []option.ClientOption{
option.WithEndpoint(endpoint),
}

if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck {
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}
options = append(options, creds)
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}

client, err := pubsub.NewPublisherClient(
ctx,
options...,
option.WithEndpoint(endpoint),
creds,
)
if err != nil {
return nil, errors.Wrap(err, "opening client")
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,11 +2359,11 @@ func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestF

switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
case serverutils.TestClusterInterface:
servers := make([]feedInjectable, t.NumServers())
for i := range servers {
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
}
}

Expand Down Expand Up @@ -2404,8 +2404,6 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
defer mu.Unlock()
if batchingSink, ok := s.(*batchingSink); ok {
if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok {
_ = sinkClient.client.Close()

conn, _ := mockServer.Dial()
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking
PubsubClientSkipCredentialsCheck bool
// PubsubClientSkipClientCreation, if set, skips creating a google cloud
// client as it is expected that the test manually sets a client.
PubsubClientSkipClientCreation bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
Expand Down

0 comments on commit 368d2db

Please sign in to comment.