diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index ff2224dd9fff..d5d160a205c2 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -92,6 +92,7 @@ go_library( "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", "//pkg/sql/sem/tree", + "//pkg/sql/sem/volatility", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", "//pkg/sql/types", diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index c851d9b62c88..ffca7d1840f5 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent", visibility = ["//visibility:public"], deps = [ + "//pkg/build", "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/keys", "//pkg/kv", @@ -34,6 +35,7 @@ go_library( "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/iterutil", + "//pkg/util/log", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", ], @@ -57,6 +59,7 @@ go_test( "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", + "//pkg/settings/cluster", "//pkg/sql/catalog", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descpb", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index dcb4ba7c87f4..af82a284287c 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -12,6 +12,7 @@ import ( "context" "fmt" + "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -21,10 +22,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/row" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/cache" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) @@ -530,3 +533,29 @@ func TestingGetFamilyIDFromKey( _, familyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(context.Background(), key, ts) return familyID, err } + +// MakeRowFromTuple converts a SQL datum produced by, for example, SELECT ROW(foo.*), +// into the same kind of cdcevent.Row you'd get as a result of an insert, but without +// the primary key. +func MakeRowFromTuple(evalCtx *eval.Context, t *tree.DTuple) Row { + r := Projection{EventDescriptor: &EventDescriptor{}} + names := t.ResolvedType().TupleLabels() + for i, d := range t.D { + var name string + if names == nil { + name = fmt.Sprintf("col%d", i+1) + } else { + name = names[i] + } + r.AddValueColumn(name, d.ResolvedType()) + if err := r.SetValueDatumAt(evalCtx, i, d); err != nil { + if build.IsRelease() { + log.Warningf(context.Background(), "failed to set row value from tuple due to error %v", err) + _ = r.SetValueDatumAt(evalCtx, i, tree.DNull) + } else { + panic(err) + } + } + } + return Row(r) +} diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 401a7dd63ead..d077580ae310 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -18,11 +18,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/distsql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -397,3 +400,61 @@ func slurpDatums(t *testing.T, it Iterator) (res []string) { })) return res } + +func TestMakeRowFromTuple(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + i := tree.NewDInt(1234) + f := tree.NewDFloat(12.34) + s := tree.NewDString("testing") + typ := types.MakeTuple([]*types.T{types.Int, types.Float, types.String}) + unlabeledTuple := tree.NewDTuple(typ, i, f, s) + st := cluster.MakeTestingClusterSettings() + evalCtx := eval.MakeTestingEvalContext(st) + + rowFromUnlabeledTuple := MakeRowFromTuple(&evalCtx, unlabeledTuple) + expectedCols := []struct { + name string + typ *types.T + valAsString string + }{ + {name: "col1", typ: types.Int, valAsString: "1234"}, + {name: "col2", typ: types.Float, valAsString: "12.34"}, + {name: "col3", typ: types.String, valAsString: "testing"}, + } + + remainingCols := expectedCols + + require.NoError(t, rowFromUnlabeledTuple.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + current := remainingCols[0] + remainingCols = remainingCols[1:] + require.Equal(t, current.name, col.Name) + require.Equal(t, current.typ, col.Typ) + require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport)) + return nil + })) + + require.Empty(t, remainingCols) + + typ.InternalType.TupleLabels = []string{"a", "b", "c"} + labeledTuple := tree.NewDTuple(typ, i, f, s) + + expectedCols[0].name = "a" + expectedCols[1].name = "b" + expectedCols[2].name = "c" + + remainingCols = expectedCols + + rowFromLabeledTuple := MakeRowFromTuple(&evalCtx, labeledTuple) + + require.NoError(t, rowFromLabeledTuple.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error { + current := remainingCols[0] + remainingCols = remainingCols[1:] + require.Equal(t, current.name, col.Name) + require.Equal(t, current.typ, col.Typ) + require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport)) + return nil + })) + +} diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 3ee88fb62ad9..6f0f1c3ab261 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -252,6 +252,35 @@ func TestChangefeedBasics(t *testing.T) { // cloudStorageTest is a regression test for #36994. } +func TestToJSONAsChangefeed(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo values (1, 'hello')`) + sqlDB.CheckQueryResults(t, + `SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*) from foo`, + [][]string{{`{"after": {"a": 1, "b": "hello"}}`}}, + ) + sqlDB.CheckQueryResults(t, + `SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'diff') from foo`, + [][]string{{`{"after": {"a": 1, "b": "hello"}, "before": null, "updated": "0.0000000000"}`}}, + ) + + sqlDB.CheckQueryResults(t, + `SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'envelope=row') from foo`, + [][]string{{`{"__crdb__": {"updated": "0.0000000000"}, "a": 1, "b": "hello"}`}}, + ) + + sqlDB.ExpectErr(t, `unknown envelope: lobster`, + `SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'envelope=lobster') from foo`) + } + + cdcTest(t, testFn) +} + func TestChangefeedIdleness(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go index 5bda71975651..39e16954255c 100644 --- a/pkg/ccl/changefeedccl/encoder_json.go +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -12,13 +12,19 @@ import ( "bytes" "context" gojson "encoding/json" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sem/volatility" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/errors" @@ -195,3 +201,62 @@ func (e *jsonEncoder) EncodeResolvedTimestamp( } return gojson.Marshal(jsonEntries) } + +var placeholderCtx = eventContext{topic: "topic"} + +// EncodeAsJSONChangefeedWithFlags implements the crdb_internal.to_json_as_changefeed_with_flags +// builtin. +func EncodeAsJSONChangefeedWithFlags(r cdcevent.Row, flags ...string) ([]byte, error) { + optsMap := make(map[string]string, len(flags)) + for _, f := range flags { + split := strings.SplitN(f, "=", 2) + k := split[0] + var v string + if len(split) == 2 { + v = split[1] + } + optsMap[k] = v + } + opts, err := changefeedbase.MakeStatementOptions(optsMap).GetEncodingOptions() + if err != nil { + return nil, err + } + // If this function ends up needing to be optimized, cache or pool these. + // Nontrivial to do as an encoder generally isn't safe to call on different + // rows in parallel. + e, err := makeJSONEncoder(opts, changefeedbase.Targets{}) + if err != nil { + return nil, err + } + return e.EncodeValue(context.TODO(), placeholderCtx, r, cdcevent.Row{}) + +} + +func init() { + + overload := tree.Overload{ + Types: tree.VariadicType{FixedTypes: []*types.T{types.AnyTuple}, VarType: types.String}, + ReturnType: tree.FixedReturnType(types.Bytes), + Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + row := cdcevent.MakeRowFromTuple(evalCtx, tree.MustBeDTuple(args[0])) + flags := make([]string, len(args)-1) + for i, d := range args[1:] { + flags[i] = string(tree.MustBeDString(d)) + } + o, err := EncodeAsJSONChangefeedWithFlags(row, flags...) + if err != nil { + return nil, pgerror.Wrap(err, pgcode.InvalidParameterValue, ``) + } + return tree.NewDBytes(tree.DBytes(o)), nil + }, + Info: "Strings can be of the form 'resolved' or 'resolved=1s'.", + // Probably actually stable, but since this is tightly coupled to changefeed logic by design, + // best to be defensive. + Volatility: volatility.Volatile, + } + + utilccl.RegisterCCLBuiltin("crdb_internal.to_json_as_changefeed_with_flags", + `Encodes a tuple the way a changefeed would output it if it were inserted as a row or emitted by a changefeed expression, and returns the raw bytes. + Flags such as 'diff' modify the encoding as though specified in the WITH portion of a changefeed.`, + overload) +} diff --git a/pkg/ccl/utilccl/BUILD.bazel b/pkg/ccl/utilccl/BUILD.bazel index dd929a5f7579..526bd045f141 100644 --- a/pkg/ccl/utilccl/BUILD.bazel +++ b/pkg/ccl/utilccl/BUILD.bazel @@ -34,7 +34,6 @@ go_test( name = "utilccl_test", size = "small", srcs = [ - "builtins_test.go", "license_check_test.go", "license_test.go", ], @@ -45,15 +44,11 @@ go_test( "//pkg/ccl/utilccl/licenseccl", "//pkg/settings", "//pkg/settings/cluster", - "//pkg/sql/sem/eval", - "//pkg/sql/sem/tree", - "//pkg/sql/types", "//pkg/testutils", "//pkg/util/envutil", "//pkg/util/stop", "//pkg/util/timeutil", "//pkg/util/uuid", - "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/utilccl/builtins_test.go b/pkg/ccl/utilccl/builtins_test.go deleted file mode 100644 index 7ce26e0e1319..000000000000 --- a/pkg/ccl/utilccl/builtins_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2017 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package utilccl - -import ( - "testing" - - "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/types" - "github.com/cockroachdb/errors" - "github.com/stretchr/testify/require" -) - -func unexportedFunction(s string) string { - return "behold: " + s -} - -func TestRegister(t *testing.T) { - RegisterCCLBuiltin("test_builtin_from_unexported_function", "", unexportedFunction) - RegisterCCLBuiltin("test_builtin_with_error", "", func(s string) (string, error) { - if s == "dog" { - return "pet the dog", nil - } - return s, errors.New("please provide dog") - }) - resolver := func(fn string) (*tree.FunctionDefinition, error) { - name := tree.UnresolvedName{NumParts: 1, Parts: [4]string{fn}} - return name.ResolveFunction(nil) - } - wrapper, err := resolver("test_builtin_from_unexported_function") - require.NoError(t, err) - require.Equal(t, "(: string) -> string", wrapper.Definition[0].(*tree.Overload).Signature(true)) - - args := tree.Datums{tree.NewDString("dog")} - ctx := eval.MakeTestingEvalContext(nil) - o, err := wrapper.Definition[0].(*tree.Overload).Fn.(eval.FnOverload)(&ctx, args) - require.NoError(t, err) - require.Equal(t, o.ResolvedType(), types.String) - require.Equal(t, "'behold: dog'", o.String()) - - petter, err := resolver("test_builtin_with_error") - require.NoError(t, err) - require.Equal(t, "(: string) -> string", petter.Definition[0].(*tree.Overload).Signature(true)) - - _, err = petter.Definition[0].(*tree.Overload).Fn.(eval.FnOverload)(&ctx, args) - require.NoError(t, err) - - args[0] = tree.NewDString("no dog") - _, err = petter.Definition[0].(*tree.Overload).Fn.(eval.FnOverload)(&ctx, args) - require.Error(t, err) - -} diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 8b4d4d79bea7..2c19204b3464 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -59,7 +59,6 @@ go_library( "//pkg/sql/lex", "//pkg/sql/lexbase", "//pkg/sql/memsize", - "//pkg/sql/oidext", "//pkg/sql/parser", "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", diff --git a/pkg/sql/sem/builtins/all_builtins.go b/pkg/sql/sem/builtins/all_builtins.go index 6a8815842d97..8bff6e4d2bed 100644 --- a/pkg/sql/sem/builtins/all_builtins.go +++ b/pkg/sql/sem/builtins/all_builtins.go @@ -22,35 +22,6 @@ import ( "github.com/cockroachdb/errors" ) -// orderedStrings sorts a slice of strings lazily -// for better performance. -type orderedStrings struct { - strings []string - sorted bool -} - -func (o *orderedStrings) add(s string) { - if o.sorted { - o.insert(s) - } else { - o.strings = append(o.strings, s) - } -} - -func (o *orderedStrings) sort() { - if !o.sorted { - sort.Strings(o.strings) - } - o.sorted = true -} - -func (o *orderedStrings) insert(s string) { - i := sort.SearchStrings(o.strings, s) - o.strings = append(o.strings, "") - copy(o.strings[i+1:], o.strings[i:]) - o.strings[i] = s -} - var allBuiltinNames orderedStrings // AllBuiltinNames returns a slice containing all the built-in function @@ -187,3 +158,36 @@ func collectOverloads( } return makeBuiltin(props, r...) } + +// orderedStrings sorts a slice of strings lazily +// for better performance. +type orderedStrings struct { + strings []string + sorted bool +} + +// add a string without changing whether or not +// the strings are sorted yet. +func (o *orderedStrings) add(s string) { + if o.sorted { + o.insert(s) + } else { + o.strings = append(o.strings, s) + } +} + +func (o *orderedStrings) sort() { + if !o.sorted { + sort.Strings(o.strings) + } + o.sorted = true +} + +// insert assumes the strings are already sorted +// and inserts s in the right place. +func (o *orderedStrings) insert(s string) { + i := sort.SearchStrings(o.strings, s) + o.strings = append(o.strings, "") + copy(o.strings[i+1:], o.strings[i:]) + o.strings[i] = s +} diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 0365c28ef934..84c7a2bac85d 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -1,3 +1,13 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + package builtins import ( @@ -471,6 +481,7 @@ var builtinOidsBySignature = map[string]oid.Oid{ `crdb_internal.table_span(table_id: int) -> bytes[]`: 1321, `crdb_internal.tenant_span(tenant_id: int) -> bytes[]`: 1320, `crdb_internal.testing_callback(name: string) -> int`: 321, + `crdb_internal.to_json_as_changefeed_with_flags(tuple, string...) -> bytes`: 470, `crdb_internal.trace_id() -> int`: 1290, `crdb_internal.trim_tenant_prefix(key: bytes) -> bytes`: 1318, `crdb_internal.trim_tenant_prefix(keys: bytes[]) -> bytes[]`: 1319,