Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: fix pubsub v2 unit tests on AWS #101130

Merged
merged 1 commit into from
Apr 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,16 @@ func makePubsubSinkClient(
return nil, errors.New("missing project name")
}

publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs)
if err != nil {
return nil, err
var err error
var publisherClient *pubsub.PublisherClient

// In unit tests the publisherClient gets set immediately after initializing
// the sink object via knobs.WrapSink.
if knobs == nil || !knobs.PubsubClientSkipClientCreation {
publisherClient, err = makePublisherClient(ctx, pubsubURL, unordered)
if err != nil {
return nil, err
}
}

sinkClient := &pubsubSinkClient{
Expand Down Expand Up @@ -250,7 +257,7 @@ func (pe *pubsubSinkClient) Close() error {
}

func makePublisherClient(
ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs,
ctx context.Context, url sinkURL, unordered bool,
) (*pubsub.PublisherClient, error) {
const regionParam = "region"
region := url.consumeParam(regionParam)
Expand All @@ -267,21 +274,15 @@ func makePublisherClient(
endpoint = gcpEndpointForRegion(region)
}

options := []option.ClientOption{
option.WithEndpoint(endpoint),
}

if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck {
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}
options = append(options, creds)
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}

client, err := pubsub.NewPublisherClient(
ctx,
options...,
option.WithEndpoint(endpoint),
creds,
)
if err != nil {
return nil, errors.Wrap(err, "opening client")
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,11 +2359,11 @@ func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestF

switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
case serverutils.TestClusterInterface:
servers := make([]feedInjectable, t.NumServers())
for i := range servers {
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
}
}

Expand Down Expand Up @@ -2404,8 +2404,6 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
defer mu.Unlock()
if batchingSink, ok := s.(*batchingSink); ok {
if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok {
_ = sinkClient.client.Close()

conn, _ := mockServer.Dial()
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking
PubsubClientSkipCredentialsCheck bool
// PubsubClientSkipClientCreation, if set, skips creating a google cloud
// client as it is expected that the test manually sets a client.
PubsubClientSkipClientCreation bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
Expand Down