Skip to content

Commit

Permalink
changefeedccl: add a sql builtin to emulate changefeed encoding
Browse files Browse the repository at this point in the history
Informs #75730.

Release note (sql change): Added the crdb_internal.to_json_as_changefeed_with_flags function to help
debug json changefeeds.

Release justification: Additive change to help with debugging.
  • Loading branch information
HonoreDB committed Sep 7, 2022
1 parent d4a41be commit 2ea997c
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 94 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ go_library(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/types",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent",
visibility = ["//visibility:public"],
deps = [
"//pkg/build",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/keys",
"//pkg/kv",
Expand All @@ -34,6 +35,7 @@ go_library(
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
Expand All @@ -57,6 +59,7 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
Expand Down
29 changes: 29 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand All @@ -21,10 +22,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
Expand Down Expand Up @@ -530,3 +533,29 @@ func TestingGetFamilyIDFromKey(
_, familyID, err := decoder.(*eventDecoder).rfCache.tableDescForKey(context.Background(), key, ts)
return familyID, err
}

// MakeRowFromTuple converts a SQL datum produced by, for example, SELECT ROW(foo.*),
// into the same kind of cdcevent.Row you'd get as a result of an insert, but without
// the primary key.
func MakeRowFromTuple(evalCtx *eval.Context, t *tree.DTuple) Row {
r := Projection{EventDescriptor: &EventDescriptor{}}
names := t.ResolvedType().TupleLabels()
for i, d := range t.D {
var name string
if names == nil {
name = fmt.Sprintf("col%d", i+1)
} else {
name = names[i]
}
r.AddValueColumn(name, d.ResolvedType())
if err := r.SetValueDatumAt(evalCtx, i, d); err != nil {
if build.IsRelease() {
log.Warningf(context.Background(), "failed to set row value from tuple due to error %v", err)
_ = r.SetValueDatumAt(evalCtx, i, tree.DNull)
} else {
panic(err)
}
}
}
return Row(r)
}
61 changes: 61 additions & 0 deletions pkg/ccl/changefeedccl/cdcevent/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -397,3 +400,61 @@ func slurpDatums(t *testing.T, it Iterator) (res []string) {
}))
return res
}

func TestMakeRowFromTuple(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

i := tree.NewDInt(1234)
f := tree.NewDFloat(12.34)
s := tree.NewDString("testing")
typ := types.MakeTuple([]*types.T{types.Int, types.Float, types.String})
unlabeledTuple := tree.NewDTuple(typ, i, f, s)
st := cluster.MakeTestingClusterSettings()
evalCtx := eval.MakeTestingEvalContext(st)

rowFromUnlabeledTuple := MakeRowFromTuple(&evalCtx, unlabeledTuple)
expectedCols := []struct {
name string
typ *types.T
valAsString string
}{
{name: "col1", typ: types.Int, valAsString: "1234"},
{name: "col2", typ: types.Float, valAsString: "12.34"},
{name: "col3", typ: types.String, valAsString: "testing"},
}

remainingCols := expectedCols

require.NoError(t, rowFromUnlabeledTuple.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error {
current := remainingCols[0]
remainingCols = remainingCols[1:]
require.Equal(t, current.name, col.Name)
require.Equal(t, current.typ, col.Typ)
require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport))
return nil
}))

require.Empty(t, remainingCols)

typ.InternalType.TupleLabels = []string{"a", "b", "c"}
labeledTuple := tree.NewDTuple(typ, i, f, s)

expectedCols[0].name = "a"
expectedCols[1].name = "b"
expectedCols[2].name = "c"

remainingCols = expectedCols

rowFromLabeledTuple := MakeRowFromTuple(&evalCtx, labeledTuple)

require.NoError(t, rowFromLabeledTuple.ForEachColumn().Datum(func(d tree.Datum, col ResultColumn) error {
current := remainingCols[0]
remainingCols = remainingCols[1:]
require.Equal(t, current.name, col.Name)
require.Equal(t, current.typ, col.Typ)
require.Equal(t, current.valAsString, tree.AsStringWithFlags(d, tree.FmtExport))
return nil
}))

}
29 changes: 29 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,35 @@ func TestChangefeedBasics(t *testing.T) {
// cloudStorageTest is a regression test for #36994.
}

func TestToJSONAsChangefeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo values (1, 'hello')`)
sqlDB.CheckQueryResults(t,
`SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*) from foo`,
[][]string{{`{"after": {"a": 1, "b": "hello"}}`}},
)
sqlDB.CheckQueryResults(t,
`SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'diff') from foo`,
[][]string{{`{"after": {"a": 1, "b": "hello"}, "before": null, "updated": "0.0000000000"}`}},
)

sqlDB.CheckQueryResults(t,
`SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'envelope=row') from foo`,
[][]string{{`{"__crdb__": {"updated": "0.0000000000"}, "a": 1, "b": "hello"}`}},
)

sqlDB.ExpectErr(t, `unknown envelope: lobster`,
`SELECT crdb_internal.to_json_as_changefeed_with_flags(foo.*, 'updated', 'envelope=lobster') from foo`)
}

cdcTest(t, testFn)
}

func TestChangefeedIdleness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
65 changes: 65 additions & 0 deletions pkg/ccl/changefeedccl/encoder_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (
"bytes"
"context"
gojson "encoding/json"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/volatility"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -195,3 +201,62 @@ func (e *jsonEncoder) EncodeResolvedTimestamp(
}
return gojson.Marshal(jsonEntries)
}

var placeholderCtx = eventContext{topic: "topic"}

// EncodeAsJSONChangefeedWithFlags implements the crdb_internal.to_json_as_changefeed_with_flags
// builtin.
func EncodeAsJSONChangefeedWithFlags(r cdcevent.Row, flags ...string) ([]byte, error) {
optsMap := make(map[string]string, len(flags))
for _, f := range flags {
split := strings.SplitN(f, "=", 2)
k := split[0]
var v string
if len(split) == 2 {
v = split[1]
}
optsMap[k] = v
}
opts, err := changefeedbase.MakeStatementOptions(optsMap).GetEncodingOptions()
if err != nil {
return nil, err
}
// If this function ends up needing to be optimized, cache or pool these.
// Nontrivial to do as an encoder generally isn't safe to call on different
// rows in parallel.
e, err := makeJSONEncoder(opts, changefeedbase.Targets{})
if err != nil {
return nil, err
}
return e.EncodeValue(context.TODO(), placeholderCtx, r, cdcevent.Row{})

}

func init() {

overload := tree.Overload{
Types: tree.VariadicType{FixedTypes: []*types.T{types.AnyTuple}, VarType: types.String},
ReturnType: tree.FixedReturnType(types.Bytes),
Fn: func(evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
row := cdcevent.MakeRowFromTuple(evalCtx, tree.MustBeDTuple(args[0]))
flags := make([]string, len(args)-1)
for i, d := range args[1:] {
flags[i] = string(tree.MustBeDString(d))
}
o, err := EncodeAsJSONChangefeedWithFlags(row, flags...)
if err != nil {
return nil, pgerror.Wrap(err, pgcode.InvalidParameterValue, ``)
}
return tree.NewDBytes(tree.DBytes(o)), nil
},
Info: "Strings can be of the form 'resolved' or 'resolved=1s'.",
// Probably actually stable, but since this is tightly coupled to changefeed logic by design,
// best to be defensive.
Volatility: volatility.Volatile,
}

utilccl.RegisterCCLBuiltin("crdb_internal.to_json_as_changefeed_with_flags",
`Encodes a tuple the way a changefeed would output it if it were inserted as a row or emitted by a changefeed expression, and returns the raw bytes.
Flags such as 'diff' modify the encoding as though specified in the WITH portion of a changefeed.`,
overload)
}
5 changes: 0 additions & 5 deletions pkg/ccl/utilccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ go_test(
name = "utilccl_test",
size = "small",
srcs = [
"builtins_test.go",
"license_check_test.go",
"license_test.go",
],
Expand All @@ -45,15 +44,11 @@ go_test(
"//pkg/ccl/utilccl/licenseccl",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/util/envutil",
"//pkg/util/stop",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Expand Down
59 changes: 0 additions & 59 deletions pkg/ccl/utilccl/builtins_test.go

This file was deleted.

1 change: 0 additions & 1 deletion pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ go_library(
"//pkg/sql/lex",
"//pkg/sql/lexbase",
"//pkg/sql/memsize",
"//pkg/sql/oidext",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
Expand Down
Loading

0 comments on commit 2ea997c

Please sign in to comment.