Skip to content

Commit

Permalink
Merge #82877 #83226
Browse files Browse the repository at this point in the history
82877: sql: support JSONB subscripting for SELECT cases r=mgartner a=otan

I've kept the `ArraySubscript` class names and references as PG does
this too. We can choose to change this in later iterations.

Refs [#77434](#77434)

Release note (sql change): Added support for JSONB subscripting in
SELECT-style cases, e.g. SELECT json_field['a'] ... WHERE
json_field['b'] = ...

83226: changefeedccl: block testfeed closure on canceled status r=samiskin a=samiskin

Resolves #82794 

TestChangefeedNemeses would sometimes flake on cloudstorage sink likely
due to the feed writing files during the tempdirectory closing.  Stress
testing it on cloudstorage sink failed at 160 executions.

This small change blocks feed.Close() on actually seeing the "cancelled"
job status to ensure the feed is completely stopped prior to Close
completing.  Running cloudstorage Nemeses did not fail after 500+ executions.

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Shiranka Miskin <[email protected]>
  • Loading branch information
3 people committed Jun 29, 2022
3 parents ac11a55 + 5a9ec7c + b91b729 commit d757e5f
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 62 deletions.
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3212,15 +3212,19 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
sqlDB.Exec(t, `INSERT INTO foo (a, b) VALUES (1, 1)`)

cf := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
defer closeFeed(t, cf)
jobID := cf.(cdctest.EnterpriseTestFeed).JobID()
defer func() {
// Manually update job status to avoid closeFeed waitng for the registry to cancel it
sqlDB.Exec(t, `UPDATE system.jobs SET status = $1 WHERE id = $2`, jobs.StatusFailed, jobID)
closeFeed(t, cf)
}()

assertPayloads(t, cf, []string{
`foo: [1]->{"after": {"a": 1, "b": 1}}`,
})

// Mimic the claim dying and being cleaned up by
// another node.
jobID := cf.(cdctest.EnterpriseTestFeed).JobID()
sqlDB.Exec(t, `UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`, jobID)

// Expect that the distflow fails since it can't
Expand Down
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,17 @@ func (f *jobFeed) Close() error {
close(f.shutdown)
return nil
}
if status == string(jobs.StatusFailed) {
f.mu.Lock()
defer f.mu.Unlock()
f.mu.terminalErr = errors.New("changefeed failed")
close(f.shutdown)
return nil
}
if _, err := f.db.Exec(`CANCEL JOB $1`, f.jobID); err != nil {
log.Infof(context.Background(), `could not cancel feed %d: %v`, f.jobID, err)
} else {
return f.WaitForStatus(func(s jobs.Status) bool { return s == jobs.StatusCanceled })
}
}

Expand Down
58 changes: 58 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/json
Original file line number Diff line number Diff line change
Expand Up @@ -884,3 +884,61 @@ query TBTB
SELECT j, j ? 'a', j-1, (j-1) ? 'a' FROM t81647
----
["a", "b"] true ["a"] true

#
# Test JSONB subscripting.
#

# Constant folding.
query TTT
SELECT
('{"a": {"b": {"c": 1}}}'::jsonb)['a'],
('{"a": {"b": {"c": 1}}}'::jsonb)['a']['b']['c'],
('[1, "2", null]'::jsonb)[1]
----
{"b": {"c": 1}} 1 "2"

# Referencing subscript which does not exist.
query TTTT
SELECT
('{"a": 1}'::jsonb)['b'],
('{"a": {"b": {"c": 1}}}'::jsonb)['c']['b']['c'],
('[1, "2", null]'::jsonb)[4],
('{"a": 1}'::jsonb)[NULL]
----
NULL NULL NULL NULL

# Error cases.
statement error unexpected JSON subscript type: TIMESTAMPTZ
SELECT ('{"a": 1}'::jsonb)[now()]

statement error jsonb subscript does not support slices
SELECT ('{"a": 1}'::jsonb)['a':'b']

# Check it works from a JSON table.
statement ok
CREATE TABLE json_subscript_test (
id SERIAL PRIMARY KEY,
j JSONB,
extract_field TEXT,
extract_int_field INT
);
INSERT INTO json_subscript_test (j, extract_field, extract_int_field) VALUES
('{"other_field": 2}', 'other_field', 1),
('{"field": {"field": 2}}', 'field', 0),
('[1, 2, 3]', 'nothing_to_fetch', 1)

# Test subscripts with fields using other columns.
query TTITTTT
SELECT j, extract_field, extract_int_field, j['field'], j[extract_field], j[extract_field][extract_field], j[extract_int_field]
FROM json_subscript_test ORDER BY id
----
{"other_field": 2} other_field 1 NULL 2 NULL NULL
{"field": {"field": 2}} field 0 {"field": 2} {"field": 2} 2 NULL
[1, 2, 3] nothing_to_fetch 1 NULL NULL NULL 2

# Test use in a WHERE clause.
query T
SELECT j FROM json_subscript_test WHERE j['other_field'] = '2' ORDER BY id
----
{"other_field": 2}
13 changes: 11 additions & 2 deletions pkg/sql/opt/memo/typing.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,18 @@ func typeArrayAgg(e opt.ScalarExpr) *types.T {
return types.MakeArray(typ)
}

// typeIndirection returns the type of the element of the array.
// typeIndirection returns the type of the element after the indirection
// is applied.
func typeIndirection(e opt.ScalarExpr) *types.T {
return e.Child(0).(opt.ScalarExpr).DataType().ArrayContents()
t := e.Child(0).(opt.ScalarExpr).DataType()
switch t.Family() {
case types.JsonFamily:
return t
case types.ArrayFamily:
return t.ArrayContents()
default:
panic(errors.AssertionFailedf("unknown type indirection type %s", t.SQLString()))
}
}

// typeCollate returns the collated string typed with the given locale.
Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/opt/norm/fold_constants_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,10 +498,19 @@ func (c *CustomFuncs) FoldIndirection(input, index opt.ScalarExpr) (_ opt.Scalar
return nil, false
}

// Case 2: The input is a constant DArray.
// Case 2: The input is a constant DArray or DJSON.
if memo.CanExtractConstDatum(input) {
var resolvedType *types.T
switch input.DataType().Family() {
case types.JsonFamily:
resolvedType = input.DataType()
case types.ArrayFamily:
resolvedType = input.DataType().ArrayContents()
default:
panic(errors.AssertionFailedf("expected array or json; found %s", input.DataType().SQLString()))
}
inputD := memo.ExtractConstDatum(input)
texpr := tree.NewTypedIndirectionExpr(inputD, indexD, input.DataType().ArrayContents())
texpr := tree.NewTypedIndirectionExpr(inputD, indexD, resolvedType)
result, err := eval.Expr(c.f.evalCtx, texpr)
if err == nil {
return c.f.ConstructConstVal(result, texpr.ResolvedType()), true
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/opt/norm/testdata/rules/fold_constants
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,28 @@ project
└── projections
└── a.arr:6[0] [as=arr:9, outer=(6)]

# Fold JSONB constants.
norm expect=FoldIndirection
SELECT ('{"a": 1}'::jsonb)['a'] AS other_col FROM a
----
project
├── columns: other_col:9!null
├── fd: ()-->(9)
├── scan a
└── projections
└── '1' [as=other_col:9]

# JSONB is dynamically constructured.
norm expect-not=FoldIndirection
SELECT j['field'] FROM a
----
project
├── columns: j:9
├── scan a
│ └── columns: a.j:5
└── projections
└── a.j:5['field'] [as=j:9, outer=(5)]

# Regression test for #40404.
norm expect=FoldIndirection
SELECT (SELECT x[1]) FROM (VALUES(null::oid[])) v(x)
Expand Down
23 changes: 12 additions & 11 deletions pkg/sql/opt/optbuilder/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,22 @@ func (b *Builder) buildScalar(
out = b.factory.ConstructArrayFlatten(s.node, &subqueryPrivate)

case *tree.IndirectionExpr:
expr := b.buildScalar(t.Expr.(tree.TypedExpr), inScope, nil, nil, colRefs)

if len(t.Indirection) != 1 {
if len(t.Indirection) != 1 && t.Expr.(tree.TypedExpr).ResolvedType().Family() == types.ArrayFamily {
panic(unimplementedWithIssueDetailf(32552, "ind", "multidimensional indexing is not supported"))
}

subscript := t.Indirection[0]
if subscript.Slice {
panic(unimplementedWithIssueDetailf(32551, "", "array slicing is not supported"))
}
out = b.buildScalar(t.Expr.(tree.TypedExpr), inScope, nil, nil, colRefs)

out = b.factory.ConstructIndirection(
expr,
b.buildScalar(subscript.Begin.(tree.TypedExpr), inScope, nil, nil, colRefs),
)
for _, subscript := range t.Indirection {
if subscript.Slice {
panic(unimplementedWithIssueDetailf(32551, "", "array slicing is not supported"))
}

out = b.factory.ConstructIndirection(
out,
b.buildScalar(subscript.Begin.(tree.TypedExpr), inScope, nil, nil, colRefs),
)
}

case *tree.IfErrExpr:
cond := b.buildScalar(t.Cond.(tree.TypedExpr), inScope, nil, nil, colRefs)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/optbuilder/testdata/select
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ error (42P01): no data source matches pattern: bar.kv.*
build
SELECT kv.*[1] FROM kv
----
error (42804): cannot subscript type tuple{char AS k, char AS v} because it is not an array
error (42804): cannot subscript type tuple{char AS k, char AS v} because it is not an array or json object

build
SELECT ARRAY[]
Expand Down
84 changes: 61 additions & 23 deletions pkg/sql/sem/eval/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treecmp"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -275,20 +276,6 @@ func (e *evaluator) EvalIndexedVar(iv *tree.IndexedVar) (tree.Datum, error) {

func (e *evaluator) EvalIndirectionExpr(expr *tree.IndirectionExpr) (tree.Datum, error) {
var subscriptIdx int
for i, t := range expr.Indirection {
if t.Slice || i > 0 {
return nil, errors.AssertionFailedf("unsupported feature should have been rejected during planning")
}

d, err := t.Begin.(tree.TypedExpr).Eval(e)
if err != nil {
return nil, err
}
if d == tree.DNull {
return d, nil
}
subscriptIdx = int(tree.MustBeDInt(d))
}

d, err := expr.Expr.(tree.TypedExpr).Eval(e)
if err != nil {
Expand All @@ -298,17 +285,68 @@ func (e *evaluator) EvalIndirectionExpr(expr *tree.IndirectionExpr) (tree.Datum,
return d, nil
}

// Index into the DArray, using 1-indexing.
arr := tree.MustBeDArray(d)
switch d.ResolvedType().Family() {
case types.ArrayFamily:
for i, t := range expr.Indirection {
if t.Slice || i > 0 {
return nil, errors.AssertionFailedf("unsupported feature should have been rejected during planning")
}

// VECTOR types use 0-indexing.
if arr.FirstIndex() == 0 {
subscriptIdx++
}
if subscriptIdx < 1 || subscriptIdx > arr.Len() {
return tree.DNull, nil
beginDatum, err := t.Begin.(tree.TypedExpr).Eval(e)
if err != nil {
return nil, err
}
if beginDatum == tree.DNull {
return tree.DNull, nil
}
subscriptIdx = int(tree.MustBeDInt(beginDatum))
}

// Index into the DArray, using 1-indexing.
arr := tree.MustBeDArray(d)

// VECTOR types use 0-indexing.
if arr.FirstIndex() == 0 {
subscriptIdx++
}
if subscriptIdx < 1 || subscriptIdx > arr.Len() {
return tree.DNull, nil
}
return arr.Array[subscriptIdx-1], nil
case types.JsonFamily:
j := tree.MustBeDJSON(d)
curr := j.JSON
for _, t := range expr.Indirection {
if t.Slice {
return nil, errors.AssertionFailedf("unsupported feature should have been rejected during planning")
}

field, err := t.Begin.(tree.TypedExpr).Eval(e)
if err != nil {
return nil, err
}
if field == tree.DNull {
return tree.DNull, nil
}
switch field.ResolvedType().Family() {
case types.StringFamily:
if curr, err = curr.FetchValKeyOrIdx(string(tree.MustBeDString(field))); err != nil {
return nil, err
}
case types.IntFamily:
if curr, err = curr.FetchValIdx(int(tree.MustBeDInt(field))); err != nil {
return nil, err
}
default:
return nil, errors.AssertionFailedf("unsupported feature should have been rejected during planning")
}
if curr == nil {
return tree.DNull, nil
}
}
return tree.NewDJSON(curr), nil
}
return arr.Array[subscriptIdx-1], nil
return nil, errors.AssertionFailedf("unsupported feature should have been rejected during planning")
}

func (e *evaluator) EvalDefaultVal(expr *tree.DefaultVal) (tree.Datum, error) {
Expand Down
Loading

0 comments on commit d757e5f

Please sign in to comment.