Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
101059: sql/json: fix null's in array in inverted index support r=yuzefovich a=cucaroach

We assumed all the array elements were strings w/o checking for null,
now we use AsDString to check it.

Found internally with expanded sqlsmith testing.

Fixes: #101025
Epic: None
Release note: None


101130: changefeedccl: fix pubsub v2 unit tests on AWS r=samiskin a=samiskin

Resolves #100968
Resolves #100969
Resolves #100970
Resolves #100971
Resolves #100972
Resolves #100973
Resolves #100974
Resolves #100985
Resolves #100986
Resolves #100987
Resolves #100988
Resolves #101014
Resolves #101015
Resolves #101016
Resolves #101017
Resolves #101018
Resolves #101019
Resolves #101020
Resolves #101030
Resolves #101031
Resolves #101032
Resolves #101033
Resolves #101034
Resolves #101035
Resolves #101036
Resolves #101040
Resolves #101041
Resolves #101042
Resolves #101043
Resolves #101044
Resolves #101045
Resolves #101062
Resolves #101063
Resolves #101064
Resolves #101065
Resolves #101066
Resolves #101067
Resolves #101079
Resolves #101080
Resolves #101081
Resolves #101082
Resolves #101083
Resolves #101084
Resolves #101085
Resolves #101086
Resolves #101099
Resolves #101100
Resolves #101106
Resolves #101107
Resolves #101113
Resolves #101114
Resolves #101115


The pubsub V2 tests would fail with
```
failed to start feed for job 0: pq: opening client: google: could not find
default credentials. See
https://developers.google.com/accounts/docs/application-default-credentials for
more information
```
only on release-23.1 test runs because every other test was running on google cloud machines, where you didn't need to even have `gcloud` installed for it to work.  This happens only on the initial attempt to initialize a PubsubClient when the GRPCConn is not overriden.

This PR fixes it by skipping initialization entirely so that we don't have to deal with errors like this or having ensure the old connection is cleaned up before setting the mock one.

Release note: None

Co-authored-by: Tommy Reilly <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
3 people committed Apr 10, 2023
3 parents 668ec8d + bc47c8b + 368d2db commit 9c55b61
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
31 changes: 16 additions & 15 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,16 @@ func makePubsubSinkClient(
return nil, errors.New("missing project name")
}

publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs)
if err != nil {
return nil, err
var err error
var publisherClient *pubsub.PublisherClient

// In unit tests the publisherClient gets set immediately after initializing
// the sink object via knobs.WrapSink.
if knobs == nil || !knobs.PubsubClientSkipClientCreation {
publisherClient, err = makePublisherClient(ctx, pubsubURL, unordered)
if err != nil {
return nil, err
}
}

sinkClient := &pubsubSinkClient{
Expand Down Expand Up @@ -250,7 +257,7 @@ func (pe *pubsubSinkClient) Close() error {
}

func makePublisherClient(
ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs,
ctx context.Context, url sinkURL, unordered bool,
) (*pubsub.PublisherClient, error) {
const regionParam = "region"
region := url.consumeParam(regionParam)
Expand All @@ -267,21 +274,15 @@ func makePublisherClient(
endpoint = gcpEndpointForRegion(region)
}

options := []option.ClientOption{
option.WithEndpoint(endpoint),
}

if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck {
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}
options = append(options, creds)
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}

client, err := pubsub.NewPublisherClient(
ctx,
options...,
option.WithEndpoint(endpoint),
creds,
)
if err != nil {
return nil, errors.Wrap(err, "opening client")
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,11 +2359,11 @@ func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestF

switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
case serverutils.TestClusterInterface:
servers := make([]feedInjectable, t.NumServers())
for i := range servers {
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
}
}

Expand Down Expand Up @@ -2404,8 +2404,6 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
defer mu.Unlock()
if batchingSink, ok := s.(*batchingSink); ok {
if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok {
_ = sinkClient.client.Close()

conn, _ := mockServer.Dial()
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking
PubsubClientSkipCredentialsCheck bool
// PubsubClientSkipClientCreation, if set, skips creating a google cloud
// client as it is expected that the test manually sets a client.
PubsubClientSkipClientCreation bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/json
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,15 @@ SELECT true FROM x WHERE j->'a' @> '2'::JSONB
----
true

query T
SELECT j FROM x WHERE j ?| ARRAY[NULL]
----

query T
SELECT j FROM x WHERE j ?& ARRAY[NULL::STRING]
----
{"a": [1, 2, 3]}

query T
SELECT '{"foo": {"bar": 1}}'::JSONB #- ARRAY['foo', 'bar']
----
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,11 @@ func EncodeExistsInvertedIndexSpans(
}
var expr inverted.Expression
for _, d := range val.(*tree.DArray).Array {
s := string(*d.(*tree.DString))
newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, s)
ds, ok := tree.AsDString(d)
if !ok {
continue
}
newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, string(ds))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9c55b61

Please sign in to comment.