Skip to content

Commit

Permalink
changefeedccl: Encoding and cdc_prev handling fixes
Browse files Browse the repository at this point in the history
Fix multiple related handling and encoding issues
when using CDC queries.

Assuming the following changefeed:
`CREATE CHANGEFEED ... AS SELECT *, cdc_prev, event_op() FROM t`

* Newly inserted rows will contain NULL in the `cdc_prev` column.
* Deleted rows will emit all *primary key* columns; all other columns
  will be left `NULL`.  `event_op` column will contain appropriate even
  op description (`deleted`)
* `cdc_prev` tuple no longer contains cdc_mvcc_internal_timestamp column
  This was a bug that incorrectly made that column available in
  cdc_prev.  If the application needs to determine how much time
  exlapsed since the previous update, the application should update an
  explicit timestamp column, and use that instead of the system column.

The deleted row handling, when using CDC Queries, might be a bit awkward
in that the row is still emitted, albeit with all but the primary key
columns set to `NULL`.  Of course, there is an `event_op()` function
that can make the distinction between a new row with `NULL` values, or
a deleted row.  However, the customers may find `wrapped` envelope
simpler to use (`WITH envelope='wrapped', format='json', diff`) would produce
JSON object with `before` and `after` keys containing prior/current
state of the row).  This PR makes it possible to use `wrapped` envelope
with `diff` option when using CDC queries.  The `before` value in the
output will always be an entire row -- without any projections.

Fixes cockroachdb#101000

Release note (enterprise change): CDC queries now support wrapped
envelope with diff (`envelope='wrapped', diff`).
  • Loading branch information
Yevgeniy Miretskiy committed Apr 12, 2023
1 parent 88d8256 commit 9ee4c32
Show file tree
Hide file tree
Showing 17 changed files with 254 additions and 101 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/changefeedccl/cdceval/cdc_prev.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -79,7 +78,7 @@ func newPrevColumnForDesc(desc *cdcevent.EventDescriptor) (catalog.Column, error
// cdcPrevType returns a types.T for the tuple corresponding to the
// event descriptor.
func cdcPrevType(desc *cdcevent.EventDescriptor) *types.T {
numCols := len(desc.ResultColumns()) + 1 /* crdb_internal_mvcc_timestamp */
numCols := len(desc.ResultColumns())
tupleTypes := make([]*types.T, 0, numCols)
tupleLabels := make([]string, 0, numCols)

Expand All @@ -91,9 +90,6 @@ func cdcPrevType(desc *cdcevent.EventDescriptor) *types.T {
tupleTypes = append(tupleTypes, c.Typ)
}

// Add system columns.
tupleLabels = append(tupleLabels, colinfo.MVCCTimestampColumnName)
tupleTypes = append(tupleTypes, colinfo.MVCCTimestampColumnType)
return types.MakeLabeledTuple(tupleTypes, tupleLabels)
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,14 @@ func (e *familyEvaluator) eval(

encDatums := updatedRow.EncDatums()
if havePrev {
if err := e.copyPrevRow(prevRow); err != nil {
return cdcevent.Row{}, err
if prevRow.IsDeleted() {
encDatums = append(encDatums, rowenc.EncDatum{Datum: tree.DNull})
} else {
if err := e.copyPrevRow(prevRow); err != nil {
return cdcevent.Row{}, err
}
encDatums = append(encDatums, rowenc.EncDatum{Datum: e.prevRowTuple})
}
encDatums = append(encDatums, rowenc.EncDatum{Datum: e.prevRowTuple})
}

// Push data into DistSQL.
Expand Down Expand Up @@ -465,10 +469,10 @@ func (e *familyEvaluator) setupContextForRow(
} else {
// Insert or update.
if e.rowEvalCtx.withDiff {
if prevRow.IsInitialized() {
e.rowEvalCtx.op = eventTypeUpdate
} else {
if prevRow.IsDeleted() || !prevRow.IsInitialized() {
e.rowEvalCtx.op = eventTypeInsert
} else {
e.rowEvalCtx.op = eventTypeUpdate
}
} else {
// Without diff option we can't tell insert from update; so, use upsert.
Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,7 @@ $$`)
stmt: `SELECT
a, b, c,
(CASE WHEN (cdc_prev).c IS NULL THEN 'not there' ELSE (cdc_prev).c END) AS old_c
FROM foo
WHERE (cdc_prev).crdb_internal_mvcc_timestamp IS NULL OR
(cdc_prev).crdb_internal_mvcc_timestamp < crdb_internal_mvcc_timestamp`,
FROM foo`,
expectMainFamily: []decodeExpectation{
{
expectUnwatchedErr: true,
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/functions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
schemaTS := s.Clock().Now()
row := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), true)
deletedRow := makeEventRow(t, desc, schemaTS, true, s.Clock().Now(), true)
prevRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), false)
nilRow := cdcevent.Row{}

for _, tc := range []struct {
Expand All @@ -197,7 +198,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
{
op: "update",
row: row,
prevRow: row,
prevRow: prevRow,
withDiff: true,
expect: "update",
},
Expand All @@ -220,14 +221,14 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) {
{
op: "delete",
row: deletedRow,
prevRow: row,
prevRow: prevRow,
withDiff: true,
expect: "delete",
},
{
op: "delete",
row: deletedRow,
prevRow: row,
prevRow: prevRow,
withDiff: false,
expect: "delete",
},
Expand Down
16 changes: 5 additions & 11 deletions pkg/ccl/changefeedccl/cdceval/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,17 +189,11 @@ FAMILY extra (extra)
presentation: append(mainColumns, rc("cdc_prev", cdcPrevType(eventDesc))),
},
{
name: "full table with cdc_prev expanded",
desc: fooDesc,
stmt: "SELECT *, (cdc_prev).* FROM foo",
planSpans: roachpb.Spans{primarySpan},
presentation: append(mainColumns, append(
// It would be nice to hide "system" columns from cdc_prev -- just like they are
// hidden from the table, unless explicitly accessed.
// Alas, this is a bit difficult, since cdc_prev is not a table, but a function.
mainColumns,
rc(colinfo.MVCCTimestampColumnName, colinfo.MVCCTimestampColumnType),
)...),
name: "full table with cdc_prev expanded",
desc: fooDesc,
stmt: "SELECT *, (cdc_prev).* FROM foo",
planSpans: roachpb.Spans{primarySpan},
presentation: append(mainColumns, mainColumns...),
},
{
name: "full table with cdc_prev json",
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/cdcevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func NewEventDescriptor(

allCols := make([]int, len(sd.cols))
for i := 0; i < len(sd.cols); i++ {
allCols = append(allCols, i)
allCols[i] = i
}
sd.allCols = allCols

Expand Down Expand Up @@ -561,15 +561,15 @@ type fetcher struct {
*row.Fetcher
}

// nextRow returns the next row from the fetcher, but stips out
// tableoid system column if the row is the "previous" row.
// nextRow returns the next row from the fetcher, but strips out
// system columns.
func (f *fetcher) nextRow(ctx context.Context, isPrev bool) (rowenc.EncDatumRow, error) {
r, _, err := f.Fetcher.NextRow(ctx)
if err != nil {
return nil, err
}
if isPrev {
r = r[:len(r)-1]
r = r[:len(r)-len(systemColumns)]
}
return r, nil
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/ccl/changefeedccl/cdcevent/version_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,3 @@ type CacheKey struct {
Version descpb.DescriptorVersion
FamilyID descpb.FamilyID
}

// GetCachedOrCreate returns cached object, or creates and caches new one.
func GetCachedOrCreate(
k CacheKey, c *cache.UnorderedCache, creator func() interface{},
) interface{} {
if v, ok := c.Get(k); ok {
return v
}
v := creator()
c.Add(k, v)
return v
}
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,10 @@ func newChangeFrontierProcessor(
return nil, err
}

if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed), makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics); err != nil {
if cf.encoder, err = getEncoder(
encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "",
makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics,
); err != nil {
return nil, err
}

Expand Down
19 changes: 14 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,18 @@ func createChangefeedJobRecord(
}
opts.ForceDiff()
} else if opts.IsSet(changefeedbase.OptDiff) {
opts.ClearDiff()
p.BufferClientNotice(ctx, pgnotice.Newf(
"turning off unused %s option (expression <%s> does not use cdc_prev)",
changefeedbase.OptDiff, tree.AsString(normalized)))
// Expression didn't reference cdc_prev, but the diff option was specified.
// This only makes sense if we have wrapped envelope.
encopts, err := opts.GetEncodingOptions()
if err != nil {
return nil, err
}
if encopts.Envelope != changefeedbase.OptEnvelopeWrapped {
opts.ClearDiff()
p.BufferClientNotice(ctx, pgnotice.Newf(
"turning off unused %s option (expression <%s> does not use cdc_prev)",
changefeedbase.OptDiff, tree.AsString(normalized)))
}
}

// TODO: Set the default envelope to row here when using a sink and format
Expand Down Expand Up @@ -569,7 +577,8 @@ func createChangefeedJobRecord(
if err != nil {
return nil, err
}
if _, err := getEncoder(encodingOpts, AllTargets(details), makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil); err != nil {
if _, err := getEncoder(encodingOpts, AllTargets(details), details.Select != "",
makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil); err != nil {
return nil, err
}

Expand Down
140 changes: 135 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,138 @@ func TestChangefeedBasics(t *testing.T) {
// cloudStorageTest is a regression test for #36994.
}

func TestChangefeedBasicQuery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
// Currently, parquet format (which may be injected by feed() call, doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH format='json'
AS SELECT *, event_op() AS op, cdc_prev FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
assertPayloads(t, foo, []string{
`foo: [0]->{"a": 0, "b": "updated", "cdc_prev": null, "op": "insert"}`,
})

sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
assertPayloads(t, foo, []string{
`foo: [1]->{"a": 1, "b": "a", "cdc_prev": null, "op": "insert"}`,
`foo: [2]->{"a": 2, "b": "b", "cdc_prev": null, "op": "insert"}`,
})

sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`)
assertPayloads(t, foo, []string{
`foo: [2]->{"a": 2, "b": "c", "cdc_prev": {"a": 2, "b": "b"}, "op": "update"}`,
`foo: [3]->{"a": 3, "b": "d", "cdc_prev": null, "op": "insert"}`,
})

// Deleted rows with bare envelope are emitted with only
// the key columns set.
sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
assertPayloads(t, foo, []string{
`foo: [1]->{"a": 1, "b": null, "cdc_prev": {"a": 1, "b": "a"}, "op": "delete"}`,
})
}

cdcTest(t, testFn)
}

// Same test as TestChangefeedBasicQuery, but using wrapped envelope with CDC query.
func TestChangefeedBasicQueryWrapped(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
// Currently, parquet format (which may be injected by feed() call), doesn't
// know how to handle tuple types (cdc_prev); so, force JSON format.
foo := feed(t, f, `
CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff
AS SELECT b||a AS ba, event_op() AS op FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
assertPayloads(t, foo, []string{
`foo: [0]->{"after": {"ba": "updated0", "op": "insert"}, "before": null}`,
})

sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"ba": "a1", "op": "insert"}, "before": null}`,
`foo: [2]->{"after": {"ba": "b2", "op": "insert"}, "before": null}`,
})

// Wrapped envelope results in "before" having entire previous row state -- *not* projection.
sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`)
assertPayloads(t, foo, []string{
`foo: [2]->{"after": {"ba": "c2", "op": "update"}, "before": {"a": 2, "b": "b"}}`,
`foo: [3]->{"after": {"ba": "d3", "op": "insert"}, "before": null}`,
})

sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
assertPayloads(t, foo, []string{
`foo: [1]->{"after": null, "before": {"a": 1, "b": "a"}}`,
})
}

cdcTest(t, testFn, feedTestForceSink("webhook"))
}

// Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO.
func TestChangefeedBasicQueryWrappedAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`)
sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`)
foo := feed(t, f, `
CREATE CHANGEFEED WITH envelope='wrapped', format='avro', diff
AS SELECT *, event_op() AS op FROM foo`)
defer closeFeed(t, foo)

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
assertPayloads(t, foo, []string{
`foo: {"a":{"long":0}}->{"after":{"foo":{"a":{"long":0},"b":{"string":"updated"},"op":{"string":"insert"}}},"before":null}`,
})

sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`)
assertPayloads(t, foo, []string{
`foo: {"a":{"long":1}}->{"after":{"foo":{"a":{"long":1},"b":{"string":"a"},"op":{"string":"insert"}}},"before":null}`,
`foo: {"a":{"long":2}}->{"after":{"foo":{"a":{"long":2},"b":{"string":"b"},"op":{"string":"insert"}}},"before":null}`,
})

sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`)
assertPayloads(t, foo, []string{
`foo: {"a":{"long":2}}->{"after":{"foo":{"a":{"long":2},"b":{"string":"c"},"op":{"string":"update"}}},"before":{"foo_before":{"a":{"long":2},"b":{"string":"b"}}}}`,
`foo: {"a":{"long":3}}->{"after":{"foo":{"a":{"long":3},"b":{"string":"d"},"op":{"string":"insert"}}},"before":null}`,
})

sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`)
assertPayloads(t, foo, []string{
`foo: {"a":{"long":1}}->{"after":null,"before":{"foo_before":{"a":{"long":1},"b":{"string":"a"}}}}`,
})
}

cdcTest(t, testFn, feedTestForceSink("kafka"))
}

func TestToJSONAsChangefeed(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1237,14 +1369,12 @@ func TestChangefeedProjectionDelete(t *testing.T) {

sqlDB.Exec(t, `CREATE TABLE foo (id int primary key, a string)`)
sqlDB.Exec(t, `INSERT INTO foo values (0, 'a')`)
foo := feed(t, f, `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT * FROM foo`)
foo := feed(t, f, `CREATE CHANGEFEED WITH envelope='wrapped' AS SELECT * FROM foo`)
defer closeFeed(t, foo)
assertPayloads(t, foo, []string{
`foo: [0]->{"a": "a", "id": 0}`,
})
assertPayloads(t, foo, []string{`foo: [0]->{"after": {"a": "a", "id": 0}}`})
sqlDB.Exec(t, `DELETE FROM foo WHERE id = 0`)
assertPayloads(t, foo, []string{
`foo: [0]->{}`,
`foo: [0]->{"after": null}`,
})
}
cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ type Encoder interface {
func getEncoder(
opts changefeedbase.EncodingOptions,
targets changefeedbase.Targets,
encodeForQuery bool,
p externalConnectionProvider,
sliMetrics *sliMetrics,
) (Encoder, error) {
switch opts.Format {
case changefeedbase.OptFormatJSON:
return makeJSONEncoder(opts)
return makeJSONEncoder(jsonEncoderOptions{EncodingOptions: opts, encodeForQuery: encodeForQuery})
case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro:
return newConfluentAvroEncoder(opts, targets, p, sliMetrics)
case changefeedbase.OptFormatCSV:
Expand Down
Loading

0 comments on commit 9ee4c32

Please sign in to comment.