diff --git a/pkg/ccl/changefeedccl/sink_pubsub_v2.go b/pkg/ccl/changefeedccl/sink_pubsub_v2.go index 0a42172d27a0..1b069b2ff16e 100644 --- a/pkg/ccl/changefeedccl/sink_pubsub_v2.go +++ b/pkg/ccl/changefeedccl/sink_pubsub_v2.go @@ -104,9 +104,16 @@ func makePubsubSinkClient( return nil, errors.New("missing project name") } - publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs) - if err != nil { - return nil, err + var err error + var publisherClient *pubsub.PublisherClient + + // In unit tests the publisherClient gets set immediately after initializing + // the sink object via knobs.WrapSink. + if knobs == nil || !knobs.PubsubClientSkipClientCreation { + publisherClient, err = makePublisherClient(ctx, pubsubURL, unordered) + if err != nil { + return nil, err + } } sinkClient := &pubsubSinkClient{ @@ -250,7 +257,7 @@ func (pe *pubsubSinkClient) Close() error { } func makePublisherClient( - ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs, + ctx context.Context, url sinkURL, unordered bool, ) (*pubsub.PublisherClient, error) { const regionParam = "region" region := url.consumeParam(regionParam) @@ -267,21 +274,15 @@ func makePublisherClient( endpoint = gcpEndpointForRegion(region) } - options := []option.ClientOption{ - option.WithEndpoint(endpoint), - } - - if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck { - creds, err := getGCPCredentials(ctx, url) - if err != nil { - return nil, err - } - options = append(options, creds) + creds, err := getGCPCredentials(ctx, url) + if err != nil { + return nil, err } client, err := pubsub.NewPublisherClient( ctx, - options..., + option.WithEndpoint(endpoint), + creds, ) if err != nil { return nil, errors.Wrap(err, "opening client") diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index 2bc6c749dc95..435722e496aa 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -2359,11 +2359,11 @@ func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestF switch t := srvOrCluster.(type) { case serverutils.TestTenantInterface: - t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true + t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true case serverutils.TestClusterInterface: servers := make([]feedInjectable, t.NumServers()) for i := range servers { - t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true + t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true } } @@ -2404,8 +2404,6 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te defer mu.Unlock() if batchingSink, ok := s.(*batchingSink); ok { if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok { - _ = sinkClient.client.Close() - conn, _ := mockServer.Dial() mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn)) sinkClient.client = mockClient diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 56da319a08eb..3003ab1cda49 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -32,8 +32,9 @@ type TestingKnobs struct { // It allows the tests to muck with the Sink, and even return altogether different // implementation. WrapSink func(s Sink, jobID jobspb.JobID) Sink - // PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking - PubsubClientSkipCredentialsCheck bool + // PubsubClientSkipClientCreation, if set, skips creating a google cloud + // client as it is expected that the test manually sets a client. + PubsubClientSkipClientCreation bool // FilterSpanWithMutation is a filter returning true if the resolved span event should // be skipped. This method takes a pointer in case resolved spans need to be mutated. FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool diff --git a/pkg/sql/logictest/testdata/logic_test/json b/pkg/sql/logictest/testdata/logic_test/json index da291dc6de93..6f645ebb22af 100644 --- a/pkg/sql/logictest/testdata/logic_test/json +++ b/pkg/sql/logictest/testdata/logic_test/json @@ -607,6 +607,15 @@ SELECT true FROM x WHERE j->'a' @> '2'::JSONB ---- true +query T +SELECT j FROM x WHERE j ?| ARRAY[NULL] +---- + +query T +SELECT j FROM x WHERE j ?& ARRAY[NULL::STRING] +---- +{"a": [1, 2, 3]} + query T SELECT '{"foo": {"bar": 1}}'::JSONB #- ARRAY['foo', 'bar'] ---- diff --git a/pkg/sql/rowenc/index_encoding.go b/pkg/sql/rowenc/index_encoding.go index 033ba11c6cfe..2abddbf8a37e 100644 --- a/pkg/sql/rowenc/index_encoding.go +++ b/pkg/sql/rowenc/index_encoding.go @@ -704,8 +704,11 @@ func EncodeExistsInvertedIndexSpans( } var expr inverted.Expression for _, d := range val.(*tree.DArray).Array { - s := string(*d.(*tree.DString)) - newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, s) + ds, ok := tree.AsDString(d) + if !ok { + continue + } + newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, string(ds)) if err != nil { return nil, err }