From 12a0b3d457e73e7f44b35bea300293039207f4aa Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Mon, 10 Apr 2023 19:03:03 -0400 Subject: [PATCH] changefeedccl: Encoding and cdc_prev handling fixes Fix multiple related handling and encoding issues when using CDC queries. Assuming the following changefeed: `CREATE CHANGEFEED ... AS SELECT *, cdc_prev, event_op() FROM t` * Newly inserted rows will contain NULL in the `cdc_prev` column. * Deleted rows will emit all *primary key* columns; all other columns will be left `NULL`. `event_op` column will contain appropriate even op description (`deleted`) * `cdc_prev` tuple no longer contains cdc_mvcc_internal_timestamp column This was a bug that incorrectly made that column available in cdc_prev. If the application needs to determine how much time exlapsed since the previous update, the application should update an explicit timestamp column, and use that instead of the system column. The deleted row handling, when using CDC Queries, might be a bit awkward in that the row is still emitted, albeit with all but the primary key columns set to `NULL`. Of course, there is an `event_op()` function that can make the distinction between a new row with `NULL` values, or a deleted row. However, the customers may find `wrapped` envelope simpler to use (`WITH envelope='wrapped', format='json', diff`) would produce JSON object with `before` and `after` keys containing prior/current state of the row). This PR makes it possible to use `wrapped` envelope with `diff` option when using CDC queries. The `before` value in the output will always be an entire row -- without any projections. Fixes #101000 Release note (enterprise change): CDC queries now support wrapped envelope with diff (`envelope='wrapped', diff`). --- pkg/ccl/changefeedccl/cdceval/cdc_prev.go | 6 +- pkg/ccl/changefeedccl/cdceval/expr_eval.go | 16 +- .../changefeedccl/cdceval/expr_eval_test.go | 4 +- .../changefeedccl/cdceval/functions_test.go | 7 +- pkg/ccl/changefeedccl/cdceval/plan_test.go | 16 +- pkg/ccl/changefeedccl/cdcevent/event.go | 8 +- .../changefeedccl/cdcevent/version_cache.go | 12 -- .../changefeedccl/changefeed_processors.go | 5 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 19 ++- pkg/ccl/changefeedccl/changefeed_test.go | 140 +++++++++++++++++- pkg/ccl/changefeedccl/encoder.go | 3 +- pkg/ccl/changefeedccl/encoder_json.go | 78 ++++++---- pkg/ccl/changefeedccl/encoder_test.go | 10 +- pkg/ccl/changefeedccl/event_processing.go | 11 +- .../changefeedccl/sink_cloudstorage_test.go | 2 +- pkg/ccl/changefeedccl/sink_webhook_test.go | 2 +- pkg/ccl/changefeedccl/testfeed_test.go | 16 +- 17 files changed, 254 insertions(+), 101 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdceval/cdc_prev.go b/pkg/ccl/changefeedccl/cdceval/cdc_prev.go index 15b223622927..abb4bf656f14 100644 --- a/pkg/ccl/changefeedccl/cdceval/cdc_prev.go +++ b/pkg/ccl/changefeedccl/cdceval/cdc_prev.go @@ -12,7 +12,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" @@ -79,7 +78,7 @@ func newPrevColumnForDesc(desc *cdcevent.EventDescriptor) (catalog.Column, error // cdcPrevType returns a types.T for the tuple corresponding to the // event descriptor. func cdcPrevType(desc *cdcevent.EventDescriptor) *types.T { - numCols := len(desc.ResultColumns()) + 1 /* crdb_internal_mvcc_timestamp */ + numCols := len(desc.ResultColumns()) tupleTypes := make([]*types.T, 0, numCols) tupleLabels := make([]string, 0, numCols) @@ -91,9 +90,6 @@ func cdcPrevType(desc *cdcevent.EventDescriptor) *types.T { tupleTypes = append(tupleTypes, c.Typ) } - // Add system columns. - tupleLabels = append(tupleLabels, colinfo.MVCCTimestampColumnName) - tupleTypes = append(tupleTypes, colinfo.MVCCTimestampColumnType) return types.MakeLabeledTuple(tupleTypes, tupleLabels) } diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 9027e2f0ba0a..32bff947ef11 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -196,10 +196,14 @@ func (e *familyEvaluator) eval( encDatums := updatedRow.EncDatums() if havePrev { - if err := e.copyPrevRow(prevRow); err != nil { - return cdcevent.Row{}, err + if prevRow.IsDeleted() { + encDatums = append(encDatums, rowenc.EncDatum{Datum: tree.DNull}) + } else { + if err := e.copyPrevRow(prevRow); err != nil { + return cdcevent.Row{}, err + } + encDatums = append(encDatums, rowenc.EncDatum{Datum: e.prevRowTuple}) } - encDatums = append(encDatums, rowenc.EncDatum{Datum: e.prevRowTuple}) } // Push data into DistSQL. @@ -465,10 +469,10 @@ func (e *familyEvaluator) setupContextForRow( } else { // Insert or update. if e.rowEvalCtx.withDiff { - if prevRow.IsInitialized() { - e.rowEvalCtx.op = eventTypeUpdate - } else { + if prevRow.IsDeleted() || !prevRow.IsInitialized() { e.rowEvalCtx.op = eventTypeInsert + } else { + e.rowEvalCtx.op = eventTypeUpdate } } else { // Without diff option we can't tell insert from update; so, use upsert. diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index ca2cbf90935b..a4c4f7b3d106 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -331,9 +331,7 @@ $$`) stmt: `SELECT a, b, c, (CASE WHEN (cdc_prev).c IS NULL THEN 'not there' ELSE (cdc_prev).c END) AS old_c - FROM foo - WHERE (cdc_prev).crdb_internal_mvcc_timestamp IS NULL OR - (cdc_prev).crdb_internal_mvcc_timestamp < crdb_internal_mvcc_timestamp`, + FROM foo`, expectMainFamily: []decodeExpectation{ { expectUnwatchedErr: true, diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 840c091a5199..5dfe4d00ae21 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -178,6 +178,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { schemaTS := s.Clock().Now() row := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), true) deletedRow := makeEventRow(t, desc, schemaTS, true, s.Clock().Now(), true) + prevRow := makeEventRow(t, desc, schemaTS, false, s.Clock().Now(), false) nilRow := cdcevent.Row{} for _, tc := range []struct { @@ -197,7 +198,7 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { { op: "update", row: row, - prevRow: row, + prevRow: prevRow, withDiff: true, expect: "update", }, @@ -220,14 +221,14 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { { op: "delete", row: deletedRow, - prevRow: row, + prevRow: prevRow, withDiff: true, expect: "delete", }, { op: "delete", row: deletedRow, - prevRow: row, + prevRow: prevRow, withDiff: false, expect: "delete", }, diff --git a/pkg/ccl/changefeedccl/cdceval/plan_test.go b/pkg/ccl/changefeedccl/cdceval/plan_test.go index 044c92266511..3123f3763f46 100644 --- a/pkg/ccl/changefeedccl/cdceval/plan_test.go +++ b/pkg/ccl/changefeedccl/cdceval/plan_test.go @@ -189,17 +189,11 @@ FAMILY extra (extra) presentation: append(mainColumns, rc("cdc_prev", cdcPrevType(eventDesc))), }, { - name: "full table with cdc_prev expanded", - desc: fooDesc, - stmt: "SELECT *, (cdc_prev).* FROM foo", - planSpans: roachpb.Spans{primarySpan}, - presentation: append(mainColumns, append( - // It would be nice to hide "system" columns from cdc_prev -- just like they are - // hidden from the table, unless explicitly accessed. - // Alas, this is a bit difficult, since cdc_prev is not a table, but a function. - mainColumns, - rc(colinfo.MVCCTimestampColumnName, colinfo.MVCCTimestampColumnType), - )...), + name: "full table with cdc_prev expanded", + desc: fooDesc, + stmt: "SELECT *, (cdc_prev).* FROM foo", + planSpans: roachpb.Spans{primarySpan}, + presentation: append(mainColumns, mainColumns...), }, { name: "full table with cdc_prev json", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 5c2a9c9ccb64..21e74d8db164 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -351,7 +351,7 @@ func NewEventDescriptor( allCols := make([]int, len(sd.cols)) for i := 0; i < len(sd.cols); i++ { - allCols = append(allCols, i) + allCols[i] = i } sd.allCols = allCols @@ -561,15 +561,15 @@ type fetcher struct { *row.Fetcher } -// nextRow returns the next row from the fetcher, but stips out -// tableoid system column if the row is the "previous" row. +// nextRow returns the next row from the fetcher, but strips out +// system columns. func (f *fetcher) nextRow(ctx context.Context, isPrev bool) (rowenc.EncDatumRow, error) { r, _, err := f.Fetcher.NextRow(ctx) if err != nil { return nil, err } if isPrev { - r = r[:len(r)-1] + r = r[:len(r)-len(systemColumns)] } return r, nil } diff --git a/pkg/ccl/changefeedccl/cdcevent/version_cache.go b/pkg/ccl/changefeedccl/cdcevent/version_cache.go index fabcb5ee68b3..a87528859353 100644 --- a/pkg/ccl/changefeedccl/cdcevent/version_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/version_cache.go @@ -28,15 +28,3 @@ type CacheKey struct { Version descpb.DescriptorVersion FamilyID descpb.FamilyID } - -// GetCachedOrCreate returns cached object, or creates and caches new one. -func GetCachedOrCreate( - k CacheKey, c *cache.UnorderedCache, creator func() interface{}, -) interface{} { - if v, ok := c.Get(k); ok { - return v - } - v := creator() - c.Add(k, v) - return v -} diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 76032968ded5..b2475ec17229 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -940,7 +940,10 @@ func newChangeFrontierProcessor( return nil, err } - if cf.encoder, err = getEncoder(encodingOpts, AllTargets(spec.Feed), makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics); err != nil { + if cf.encoder, err = getEncoder( + encodingOpts, AllTargets(spec.Feed), spec.Feed.Select != "", + makeExternalConnectionProvider(ctx, flowCtx.Cfg.DB), sliMertics, + ); err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index f7360ea5c990..5b3077966eeb 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -508,10 +508,18 @@ func createChangefeedJobRecord( } opts.ForceDiff() } else if opts.IsSet(changefeedbase.OptDiff) { - opts.ClearDiff() - p.BufferClientNotice(ctx, pgnotice.Newf( - "turning off unused %s option (expression <%s> does not use cdc_prev)", - changefeedbase.OptDiff, tree.AsString(normalized))) + // Expression didn't reference cdc_prev, but the diff option was specified. + // This only makes sense if we have wrapped envelope. + encopts, err := opts.GetEncodingOptions() + if err != nil { + return nil, err + } + if encopts.Envelope != changefeedbase.OptEnvelopeWrapped { + opts.ClearDiff() + p.BufferClientNotice(ctx, pgnotice.Newf( + "turning off unused %s option (expression <%s> does not use cdc_prev)", + changefeedbase.OptDiff, tree.AsString(normalized))) + } } // TODO: Set the default envelope to row here when using a sink and format @@ -569,7 +577,8 @@ func createChangefeedJobRecord( if err != nil { return nil, err } - if _, err := getEncoder(encodingOpts, AllTargets(details), makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil); err != nil { + if _, err := getEncoder(encodingOpts, AllTargets(details), details.Select != "", + makeExternalConnectionProvider(ctx, p.ExecCfg().InternalDB), nil); err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 3e979d35924f..1008f96c10ad 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -264,6 +264,138 @@ func TestChangefeedBasics(t *testing.T) { // cloudStorageTest is a regression test for #36994. } +func TestChangefeedBasicQuery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) + // Currently, parquet format (which may be injected by feed() call, doesn't + // know how to handle tuple types (cdc_prev); so, force JSON format. + foo := feed(t, f, ` +CREATE CHANGEFEED WITH format='json' +AS SELECT *, event_op() AS op, cdc_prev FROM foo`) + defer closeFeed(t, foo) + + // 'initial' is skipped because only the latest value ('updated') is + // emitted by the initial scan. + assertPayloads(t, foo, []string{ + `foo: [0]->{"a": 0, "b": "updated", "cdc_prev": null, "op": "insert"}`, + }) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) + assertPayloads(t, foo, []string{ + `foo: [1]->{"a": 1, "b": "a", "cdc_prev": null, "op": "insert"}`, + `foo: [2]->{"a": 2, "b": "b", "cdc_prev": null, "op": "insert"}`, + }) + + sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`) + assertPayloads(t, foo, []string{ + `foo: [2]->{"a": 2, "b": "c", "cdc_prev": {"a": 2, "b": "b"}, "op": "update"}`, + `foo: [3]->{"a": 3, "b": "d", "cdc_prev": null, "op": "insert"}`, + }) + + // Deleted rows with bare envelope are emitted with only + // the key columns set. + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) + assertPayloads(t, foo, []string{ + `foo: [1]->{"a": 1, "b": null, "cdc_prev": {"a": 1, "b": "a"}, "op": "delete"}`, + }) + } + + cdcTest(t, testFn) +} + +// Same test as TestChangefeedBasicQuery, but using wrapped envelope with CDC query. +func TestChangefeedBasicQueryWrapped(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) + // Currently, parquet format (which may be injected by feed() call), doesn't + // know how to handle tuple types (cdc_prev); so, force JSON format. + foo := feed(t, f, ` +CREATE CHANGEFEED WITH envelope='wrapped', format='json', diff +AS SELECT b||a AS ba, event_op() AS op FROM foo`) + defer closeFeed(t, foo) + + // 'initial' is skipped because only the latest value ('updated') is + // emitted by the initial scan. + assertPayloads(t, foo, []string{ + `foo: [0]->{"after": {"ba": "updated0", "op": "insert"}, "before": null}`, + }) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) + assertPayloads(t, foo, []string{ + `foo: [1]->{"after": {"ba": "a1", "op": "insert"}, "before": null}`, + `foo: [2]->{"after": {"ba": "b2", "op": "insert"}, "before": null}`, + }) + + // Wrapped envelope results in "before" having entire previous row state -- *not* projection. + sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`) + assertPayloads(t, foo, []string{ + `foo: [2]->{"after": {"ba": "c2", "op": "update"}, "before": {"a": 2, "b": "b"}}`, + `foo: [3]->{"after": {"ba": "d3", "op": "insert"}, "before": null}`, + }) + + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) + assertPayloads(t, foo, []string{ + `foo: [1]->{"after": null, "before": {"a": 1, "b": "a"}}`, + }) + } + + cdcTest(t, testFn, feedTestForceSink("webhook")) +} + +// Same test as TestChangefeedBasicQueryWrapped, but this time using AVRO. +func TestChangefeedBasicQueryWrappedAvro(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 'initial')`) + sqlDB.Exec(t, `UPSERT INTO foo VALUES (0, 'updated')`) + foo := feed(t, f, ` +CREATE CHANGEFEED WITH envelope='wrapped', format='avro', diff +AS SELECT *, event_op() AS op FROM foo`) + defer closeFeed(t, foo) + + // 'initial' is skipped because only the latest value ('updated') is + // emitted by the initial scan. + assertPayloads(t, foo, []string{ + `foo: {"a":{"long":0}}->{"after":{"foo":{"a":{"long":0},"b":{"string":"updated"},"op":{"string":"insert"}}},"before":null}`, + }) + + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a'), (2, 'b')`) + assertPayloads(t, foo, []string{ + `foo: {"a":{"long":1}}->{"after":{"foo":{"a":{"long":1},"b":{"string":"a"},"op":{"string":"insert"}}},"before":null}`, + `foo: {"a":{"long":2}}->{"after":{"foo":{"a":{"long":2},"b":{"string":"b"},"op":{"string":"insert"}}},"before":null}`, + }) + + sqlDB.Exec(t, `UPSERT INTO foo VALUES (2, 'c'), (3, 'd')`) + assertPayloads(t, foo, []string{ + `foo: {"a":{"long":2}}->{"after":{"foo":{"a":{"long":2},"b":{"string":"c"},"op":{"string":"update"}}},"before":{"foo_before":{"a":{"long":2},"b":{"string":"b"}}}}`, + `foo: {"a":{"long":3}}->{"after":{"foo":{"a":{"long":3},"b":{"string":"d"},"op":{"string":"insert"}}},"before":null}`, + }) + + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) + assertPayloads(t, foo, []string{ + `foo: {"a":{"long":1}}->{"after":null,"before":{"foo_before":{"a":{"long":1},"b":{"string":"a"}}}}`, + }) + } + + cdcTest(t, testFn, feedTestForceSink("kafka")) +} + func TestToJSONAsChangefeed(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1237,14 +1369,12 @@ func TestChangefeedProjectionDelete(t *testing.T) { sqlDB.Exec(t, `CREATE TABLE foo (id int primary key, a string)`) sqlDB.Exec(t, `INSERT INTO foo values (0, 'a')`) - foo := feed(t, f, `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT * FROM foo`) + foo := feed(t, f, `CREATE CHANGEFEED WITH envelope='wrapped' AS SELECT * FROM foo`) defer closeFeed(t, foo) - assertPayloads(t, foo, []string{ - `foo: [0]->{"a": "a", "id": 0}`, - }) + assertPayloads(t, foo, []string{`foo: [0]->{"after": {"a": "a", "id": 0}}`}) sqlDB.Exec(t, `DELETE FROM foo WHERE id = 0`) assertPayloads(t, foo, []string{ - `foo: [0]->{}`, + `foo: [0]->{"after": null}`, }) } cdcTest(t, testFn, feedTestForceSink("cloudstorage")) diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 1ebfb62ae4ce..195cf289776d 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -44,12 +44,13 @@ type Encoder interface { func getEncoder( opts changefeedbase.EncodingOptions, targets changefeedbase.Targets, + encodeForQuery bool, p externalConnectionProvider, sliMetrics *sliMetrics, ) (Encoder, error) { switch opts.Format { case changefeedbase.OptFormatJSON: - return makeJSONEncoder(opts) + return makeJSONEncoder(jsonEncoderOptions{EncodingOptions: opts, encodeForQuery: encodeForQuery}) case changefeedbase.OptFormatAvro, changefeedbase.DeprecatedOptFormatAvro: return newConfluentAvroEncoder(opts, targets, p, sliMetrics) case changefeedbase.OptFormatCSV: diff --git a/pkg/ccl/changefeedccl/encoder_json.go b/pkg/ccl/changefeedccl/encoder_json.go index 23de608c8fbf..8d181cb86766 100644 --- a/pkg/ccl/changefeedccl/encoder_json.go +++ b/pkg/ccl/changefeedccl/encoder_json.go @@ -40,7 +40,7 @@ type jsonEncoder struct { envelopeType changefeedbase.EnvelopeType buf bytes.Buffer - versionEncoder func(ed *cdcevent.EventDescriptor) *versionEncoder + versionEncoder func(ed *cdcevent.EventDescriptor, isPrev bool) *versionEncoder envelopeEncoder func(evCtx eventContext, updated, prev cdcevent.Row) (json.JSON, error) customKeyColumn string } @@ -54,7 +54,28 @@ func canJSONEncodeMetadata(e changefeedbase.EnvelopeType) bool { return e == changefeedbase.OptEnvelopeBare || e == changefeedbase.OptEnvelopeWrapped } -func makeJSONEncoder(opts changefeedbase.EncodingOptions) (*jsonEncoder, error) { +// getCachedOrCreate returns cached object, or creates and caches new one. +func getCachedOrCreate( + k jsonEncoderVersionKey, c *cache.UnorderedCache, creator func() interface{}, +) interface{} { + if v, ok := c.Get(k); ok { + return v + } + v := creator() + c.Add(k, v) + return v +} + +type jsonEncoderVersionKey struct { + cdcevent.CacheKey + splitPrevRowVersion bool // indicate that previous row encoding requires separate version. +} +type jsonEncoderOptions struct { + changefeedbase.EncodingOptions + encodeForQuery bool +} + +func makeJSONEncoder(opts jsonEncoderOptions) (*jsonEncoder, error) { versionCache := cache.NewUnorderedCache(cdcevent.DefaultCacheConfig) e := &jsonEncoder{ envelopeType: opts.Envelope, @@ -66,13 +87,19 @@ func makeJSONEncoder(opts changefeedbase.EncodingOptions) (*jsonEncoder, error) beforeField: opts.Diff && opts.Envelope != changefeedbase.OptEnvelopeBare, keyInValue: opts.KeyInValue, topicInValue: opts.TopicInValue, - versionEncoder: func(ed *cdcevent.EventDescriptor) *versionEncoder { - key := cdcevent.CacheKey{ - ID: ed.TableID, - Version: ed.Version, - FamilyID: ed.FamilyID, + versionEncoder: func(ed *cdcevent.EventDescriptor, isPrev bool) *versionEncoder { + key := jsonEncoderVersionKey{ + CacheKey: cdcevent.CacheKey{ + ID: ed.TableID, + Version: ed.Version, + FamilyID: ed.FamilyID, + }, + // When encoding for CDC query, if we are not using bare envelope. + // When using wrapped envelope, `before` field will always be an entire row + // instead of projection, and thus we must use a new version of the encoder. + splitPrevRowVersion: isPrev && opts.encodeForQuery && opts.Envelope != changefeedbase.OptEnvelopeBare, } - return cdcevent.GetCachedOrCreate(key, versionCache, func() interface{} { + return getCachedOrCreate(key, versionCache, func() interface{} { return &versionEncoder{} }).(*versionEncoder) }, @@ -118,7 +145,7 @@ func (e *jsonEncoder) EncodeKey(_ context.Context, row cdcevent.Row) (enc []byte return nil, err } } - j, err := e.versionEncoder(row.EventDescriptor).encodeKeyRaw(keys) + j, err := e.versionEncoder(row.EventDescriptor, false).encodeKeyRaw(keys) if err != nil { return nil, err } @@ -153,16 +180,10 @@ func (e *versionEncoder) encodeKeyInValue( return b.Set("key", keyEntries) } -var emptyJSONValue = func() json.JSON { - j, err := json.MakeJSON(map[string]interface{}{}) - if err != nil { - panic(err) - } - return j -}() - -func (e *versionEncoder) rowAsGoNative(row cdcevent.Row, meta json.JSON) (json.JSON, error) { - if !row.HasValues() || row.IsDeleted() { +func (e *versionEncoder) rowAsGoNative( + row cdcevent.Row, emitDeletedRowAsNull bool, meta json.JSON, +) (json.JSON, error) { + if !row.HasValues() || (emitDeletedRowAsNull && row.IsDeleted()) { if meta != nil { b := json.NewObjectBuilder(1) b.Add(jsonMetaSentinel, meta) @@ -232,13 +253,11 @@ func (e *jsonEncoder) initRawEnvelope() error { metaBuilder = b } + const emitDeletedRowAsNull = false e.envelopeEncoder = func(evCtx eventContext, updated, _ cdcevent.Row) (_ json.JSON, err error) { - ve := e.versionEncoder(updated.EventDescriptor) + ve := e.versionEncoder(updated.EventDescriptor, false) if len(metaKeys) == 0 { - if updated.IsDeleted() { - return emptyJSONValue, nil - } - return ve.rowAsGoNative(updated, nil) + return ve.rowAsGoNative(updated, emitDeletedRowAsNull, nil) } if e.updatedField { @@ -269,7 +288,7 @@ func (e *jsonEncoder) initRawEnvelope() error { if err != nil { return nil, err } - return ve.rowAsGoNative(updated, meta) + return ve.rowAsGoNative(updated, emitDeletedRowAsNull, meta) } return nil } @@ -296,9 +315,10 @@ func (e *jsonEncoder) initWrappedEnvelope() error { return err } + const emitDeletedRowAsNull = true e.envelopeEncoder = func(evCtx eventContext, updated, prev cdcevent.Row) (json.JSON, error) { - ve := e.versionEncoder(updated.EventDescriptor) - after, err := ve.rowAsGoNative(updated, nil) + ve := e.versionEncoder(updated.EventDescriptor, false) + after, err := ve.rowAsGoNative(updated, emitDeletedRowAsNull, nil) if err != nil { return nil, err } @@ -309,7 +329,7 @@ func (e *jsonEncoder) initWrappedEnvelope() error { if e.beforeField { var before json.JSON if prev.IsInitialized() && !prev.IsDeleted() { - before, err = e.versionEncoder(prev.EventDescriptor).rowAsGoNative(prev, nil) + before, err = e.versionEncoder(prev.EventDescriptor, true).rowAsGoNative(prev, emitDeletedRowAsNull, nil) if err != nil { return nil, err } @@ -412,7 +432,7 @@ func EncodeAsJSONChangefeedWithFlags(r cdcevent.Row, flags ...string) ([]byte, e // If this function ends up needing to be optimized, cache or pool these. // Nontrivial to do as an encoder generally isn't safe to call on different // rows in parallel. - e, err := makeJSONEncoder(opts) + e, err := makeJSONEncoder(jsonEncoderOptions{EncodingOptions: opts}) if err != nil { return nil, err } diff --git a/pkg/ccl/changefeedccl/encoder_test.go b/pkg/ccl/changefeedccl/encoder_test.go index e79a0a027785..a3f8ec18a416 100644 --- a/pkg/ccl/changefeedccl/encoder_test.go +++ b/pkg/ccl/changefeedccl/encoder_test.go @@ -236,7 +236,7 @@ func TestEncoders(t *testing.T) { return } require.NoError(t, o.Validate()) - e, err := getEncoder(o, targets, nil, nil) + e, err := getEncoder(o, targets, false, nil, nil) require.NoError(t, err) rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) @@ -382,7 +382,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { StatementTimeName: changefeedbase.StatementTimeName(tableDesc.GetName()), }) - e, err := getEncoder(opts, targets, nil, nil) + e, err := getEncoder(opts, targets, false, nil, nil) require.NoError(t, err) rowInsert := cdcevent.TestingMakeEventRow(tableDesc, 0, row, false) @@ -414,7 +414,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { defer noCertReg.Close() opts.SchemaRegistryURI = noCertReg.URL() - enc, err := getEncoder(opts, targets, nil, nil) + enc, err := getEncoder(opts, targets, false, nil, nil) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) require.Regexp(t, "x509", err) @@ -427,7 +427,7 @@ func TestAvroEncoderWithTLS(t *testing.T) { defer wrongCertReg.Close() opts.SchemaRegistryURI = wrongCertReg.URL() - enc, err = getEncoder(opts, targets, nil, nil) + enc, err = getEncoder(opts, targets, false, nil, nil) require.NoError(t, err) _, err = enc.EncodeKey(context.Background(), rowInsert) require.Regexp(t, `contacting confluent schema registry.*: x509`, err) @@ -917,7 +917,7 @@ func BenchmarkEncoders(b *testing.B) { b.ReportAllocs() b.StopTimer() - encoder, err := getEncoder(opts, targets, nil, nil) + encoder, err := getEncoder(opts, targets, false, nil, nil) if err != nil { b.Fatal(err) } diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 759e6a207f77..b968ba481eaa 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -108,7 +108,8 @@ func newEventConsumer( makeConsumer := func(s EventSink, frontier frontier) (eventConsumer, error) { var err error - encoder, err := getEncoder(encodingOpts, feed.Targets, makeExternalConnectionProvider(ctx, cfg.DB), sliMetrics) + encoder, err := getEncoder(encodingOpts, feed.Targets, spec.Select.Expr != "", + makeExternalConnectionProvider(ctx, cfg.DB), sliMetrics) if err != nil { return nil, err } @@ -369,22 +370,18 @@ func (c *kvEventToRowConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Even } if c.evaluator != nil { - projection, err := c.evaluator.Eval(ctx, updatedRow, prevRow) + updatedRow, err = c.evaluator.Eval(ctx, updatedRow, prevRow) if err != nil { return err } - if !projection.IsInitialized() { + if !updatedRow.IsInitialized() { // Filter did not match. c.metrics.FilteredMessages.Inc(1) a := ev.DetachAlloc() a.Release(ctx) return nil } - - // Clear out prevRow. Projection can already emit previous row; thus - // it would be superfluous to also encode prevRow. - updatedRow, prevRow = projection, cdcevent.Row{} } return c.encodeAndEmit(ctx, updatedRow, prevRow, schemaTimestamp, ev.DetachAlloc()) diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index b64f49ecb4f5..160e648efa30 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -166,7 +166,7 @@ func TestCloudStorageSink(t *testing.T) { // NB: compression added in single-node subtest. } ts := func(i int64) hlc.Timestamp { return hlc.Timestamp{WallTime: i} } - e, err := makeJSONEncoder(opts) + e, err := makeJSONEncoder(jsonEncoderOptions{EncodingOptions: opts}) require.NoError(t, err) clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) diff --git a/pkg/ccl/changefeedccl/sink_webhook_test.go b/pkg/ccl/changefeedccl/sink_webhook_test.go index 65356260c7e6..570eecf56153 100644 --- a/pkg/ccl/changefeedccl/sink_webhook_test.go +++ b/pkg/ccl/changefeedccl/sink_webhook_test.go @@ -123,7 +123,7 @@ func testSendAndReceiveRows(t *testing.T, sinkSrc Sink, sinkDest *cdctest.MockWe opts, err := getGenericWebhookSinkOptions().GetEncodingOptions() require.NoError(t, err) - enc, err := makeJSONEncoder(opts) + enc, err := makeJSONEncoder(jsonEncoderOptions{EncodingOptions: opts}) require.NoError(t, err) // test a resolved timestamp entry diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index f27df2aaf496..ba7f483c9c80 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1063,7 +1063,12 @@ func (f *cloudFeedFactory) Feed( // parquet format with a probability of 0.4. The rest of the time json is used parquetPossible := true + explicitEnvelope := false for _, opt := range createStmt.Options { + if string(opt.Key) == changefeedbase.OptEnvelope { + explicitEnvelope = true + } + if string(opt.Key) == changefeedbase.OptFormat { parquetPossible = false break @@ -1117,7 +1122,7 @@ func (f *cloudFeedFactory) Feed( ss: ss, seenTrackerMap: make(map[string]struct{}), dir: feedDir, - isBare: createStmt.Select != nil, + isBare: createStmt.Select != nil && !explicitEnvelope, } if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil { return nil, err @@ -2033,11 +2038,18 @@ func (f *webhookFeedFactory) Feed(create string, args ...interface{}) (cdctest.T return ¬ifyFlushSink{Sink: s, sync: ss} } + explicitEnvelope := false + for _, opt := range createStmt.Options { + if string(opt.Key) == changefeedbase.OptEnvelope { + explicitEnvelope = true + } + } + c := &webhookFeed{ jobFeed: newJobFeed(f.jobsTableConn(), wrapSink), seenTrackerMap: make(map[string]struct{}), ss: ss, - isBare: createStmt.Select != nil, + isBare: createStmt.Select != nil && !explicitEnvelope, mockSink: sinkDest, } if err := f.startFeedJob(c.jobFeed, createStmt.String(), args...); err != nil {