Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
101059: sql/json: fix null's in array in inverted index support r=yuzefovich a=cucaroach

We assumed all the array elements were strings w/o checking for null,
now we use AsDString to check it.

Found internally with expanded sqlsmith testing.

Fixes: cockroachdb#101025
Epic: None
Release note: None


101130: changefeedccl: fix pubsub v2 unit tests on AWS r=samiskin a=samiskin

Resolves cockroachdb#100968
Resolves cockroachdb#100969
Resolves cockroachdb#100970
Resolves cockroachdb#100971
Resolves cockroachdb#100972
Resolves cockroachdb#100973
Resolves cockroachdb#100974
Resolves cockroachdb#100985
Resolves cockroachdb#100986
Resolves cockroachdb#100987
Resolves cockroachdb#100988
Resolves cockroachdb#101014
Resolves cockroachdb#101015
Resolves cockroachdb#101016
Resolves cockroachdb#101017
Resolves cockroachdb#101018
Resolves cockroachdb#101019
Resolves cockroachdb#101020
Resolves cockroachdb#101030
Resolves cockroachdb#101031
Resolves cockroachdb#101032
Resolves cockroachdb#101033
Resolves cockroachdb#101034
Resolves cockroachdb#101035
Resolves cockroachdb#101036
Resolves cockroachdb#101040
Resolves cockroachdb#101041
Resolves cockroachdb#101042
Resolves cockroachdb#101043
Resolves cockroachdb#101044
Resolves cockroachdb#101045
Resolves cockroachdb#101062
Resolves cockroachdb#101063
Resolves cockroachdb#101064
Resolves cockroachdb#101065
Resolves cockroachdb#101066
Resolves cockroachdb#101067
Resolves cockroachdb#101079
Resolves cockroachdb#101080
Resolves cockroachdb#101081
Resolves cockroachdb#101082
Resolves cockroachdb#101083
Resolves cockroachdb#101084
Resolves cockroachdb#101085
Resolves cockroachdb#101086
Resolves cockroachdb#101099
Resolves cockroachdb#101100
Resolves cockroachdb#101106
Resolves cockroachdb#101107
Resolves cockroachdb#101113
Resolves cockroachdb#101114
Resolves cockroachdb#101115


The pubsub V2 tests would fail with
```
failed to start feed for job 0: pq: opening client: google: could not find
default credentials. See
https://developers.google.com/accounts/docs/application-default-credentials for
more information
```
only on release-23.1 test runs because every other test was running on google cloud machines, where you didn't need to even have `gcloud` installed for it to work.  This happens only on the initial attempt to initialize a PubsubClient when the GRPCConn is not overriden.

This PR fixes it by skipping initialization entirely so that we don't have to deal with errors like this or having ensure the old connection is cleaned up before setting the mock one.

Release note: None

Co-authored-by: Tommy Reilly <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
3 people committed Apr 10, 2023
3 parents 668ec8d + bc47c8b + 368d2db commit 9c55b61
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 23 deletions.
31 changes: 16 additions & 15 deletions pkg/ccl/changefeedccl/sink_pubsub_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,16 @@ func makePubsubSinkClient(
return nil, errors.New("missing project name")
}

publisherClient, err := makePublisherClient(ctx, pubsubURL, unordered, knobs)
if err != nil {
return nil, err
var err error
var publisherClient *pubsub.PublisherClient

// In unit tests the publisherClient gets set immediately after initializing
// the sink object via knobs.WrapSink.
if knobs == nil || !knobs.PubsubClientSkipClientCreation {
publisherClient, err = makePublisherClient(ctx, pubsubURL, unordered)
if err != nil {
return nil, err
}
}

sinkClient := &pubsubSinkClient{
Expand Down Expand Up @@ -250,7 +257,7 @@ func (pe *pubsubSinkClient) Close() error {
}

func makePublisherClient(
ctx context.Context, url sinkURL, unordered bool, knobs *TestingKnobs,
ctx context.Context, url sinkURL, unordered bool,
) (*pubsub.PublisherClient, error) {
const regionParam = "region"
region := url.consumeParam(regionParam)
Expand All @@ -267,21 +274,15 @@ func makePublisherClient(
endpoint = gcpEndpointForRegion(region)
}

options := []option.ClientOption{
option.WithEndpoint(endpoint),
}

if knobs == nil || !knobs.PubsubClientSkipCredentialsCheck {
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}
options = append(options, creds)
creds, err := getGCPCredentials(ctx, url)
if err != nil {
return nil, err
}

client, err := pubsub.NewPublisherClient(
ctx,
options...,
option.WithEndpoint(endpoint),
creds,
)
if err != nil {
return nil, errors.Wrap(err, "opening client")
Expand Down
6 changes: 2 additions & 4 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2359,11 +2359,11 @@ func makePubsubFeedFactory(srvOrCluster interface{}, db *gosql.DB) cdctest.TestF

switch t := srvOrCluster.(type) {
case serverutils.TestTenantInterface:
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
case serverutils.TestClusterInterface:
servers := make([]feedInjectable, t.NumServers())
for i := range servers {
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipCredentialsCheck = true
t.Server(i).DistSQLServer().(*distsql.ServerImpl).TestingKnobs.Changefeed.(*TestingKnobs).PubsubClientSkipClientCreation = true
}
}

Expand Down Expand Up @@ -2404,8 +2404,6 @@ func (p *pubsubFeedFactory) Feed(create string, args ...interface{}) (cdctest.Te
defer mu.Unlock()
if batchingSink, ok := s.(*batchingSink); ok {
if sinkClient, ok := batchingSink.client.(*pubsubSinkClient); ok {
_ = sinkClient.client.Close()

conn, _ := mockServer.Dial()
mockClient, _ := pubsubv1.NewPublisherClient(context.Background(), option.WithGRPCConn(conn))
sinkClient.client = mockClient
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// PubsubClientSkipCredentialsCheck, if set, skips the gcp credentials checking
PubsubClientSkipCredentialsCheck bool
// PubsubClientSkipClientCreation, if set, skips creating a google cloud
// client as it is expected that the test manually sets a client.
PubsubClientSkipClientCreation bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/json
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,15 @@ SELECT true FROM x WHERE j->'a' @> '2'::JSONB
----
true

query T
SELECT j FROM x WHERE j ?| ARRAY[NULL]
----

query T
SELECT j FROM x WHERE j ?& ARRAY[NULL::STRING]
----
{"a": [1, 2, 3]}

query T
SELECT '{"foo": {"bar": 1}}'::JSONB #- ARRAY['foo', 'bar']
----
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,8 +704,11 @@ func EncodeExistsInvertedIndexSpans(
}
var expr inverted.Expression
for _, d := range val.(*tree.DArray).Array {
s := string(*d.(*tree.DString))
newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, s)
ds, ok := tree.AsDString(d)
if !ok {
continue
}
newExpr, err := json.EncodeExistsInvertedIndexSpans(nil /* inKey */, string(ds))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 9c55b61

Please sign in to comment.