From 796d82f0edb177d66e016adb4e16b24bb472faa1 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Wed, 28 Aug 2024 21:41:00 +0000 Subject: [PATCH] changefeedccl: emit mvcc_timestamp for avro format This PR adds support for the mvcc_timestamp option with the avro format. Before this change, changefeeds using avro would not fail if mvcc_timestamp was specified, but would ignore the option. Now avro supports the mvcc_timestamp by adding mvcc_timestamp to the schema and emitting the mvcc value with the row data. Epic: none Fixes: #123078 Release note (enterprise change): Adds changefeed support for the mvcc_timestamp option with the avro format. If both options are specified, the avro schema includes an mvcc_timestamp metadata field and emits the row's mvcc timestamp with the row data. --- pkg/ccl/changefeedccl/avro.go | 23 +++++++++++++++++++ pkg/ccl/changefeedccl/changefeed_test.go | 28 ++++++++++++++++++++++++ pkg/ccl/changefeedccl/encoder_avro.go | 15 ++++++++----- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/pkg/ccl/changefeedccl/avro.go b/pkg/ccl/changefeedccl/avro.go index 725b45205cfc..113549dca3f2 100644 --- a/pkg/ccl/changefeedccl/avro.go +++ b/pkg/ccl/changefeedccl/avro.go @@ -166,6 +166,7 @@ type avroMetadata map[string]interface{} type avroEnvelopeOpts struct { beforeField, afterField, recordField bool updatedField, resolvedField bool + mvccTimestampField bool } // avroEnvelopeRecord is an `avroRecord` that wraps a changed SQL row and some @@ -951,6 +952,14 @@ func envelopeToAvroSchema( } schema.Fields = append(schema.Fields, updatedField) } + if opts.mvccTimestampField { + mvccTimestampField := &avroSchemaField{ + SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString}, + Name: `mvcc_timestamp`, + Default: nil, + } + schema.Fields = append(schema.Fields, mvccTimestampField) + } if opts.resolvedField { resolvedField := &avroSchemaField{ SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString}, @@ -1034,6 +1043,20 @@ func (r *avroEnvelopeRecord) BinaryFromRow( native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) } } + + if r.opts.mvccTimestampField { + native[`mvcc_timestamp`] = nil + if u, ok := meta[`mvcc_timestamp`]; ok { + delete(meta, `mvcc_timestamp`) + ts, ok := u.(hlc.Timestamp) + if !ok { + return nil, changefeedbase.WithTerminalError( + errors.Errorf(`unknown metadata timestamp type: %T`, u)) + } + native[`mvcc_timestamp`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime()) + } + } + if r.opts.resolvedField { native[`resolved`] = nil if u, ok := meta[`resolved`]; ok { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 9bc63437e83c..79b7bab7b71d 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -996,6 +996,34 @@ func TestChangefeedMVCCTimestamps(t *testing.T) { cdcTest(t, testFn) } +func TestChangefeedMVCCTimestampsAvro(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE mvcc_timestamp_test_table (id UUID PRIMARY KEY DEFAULT gen_random_uuid())`) + + const rowCount = 5 + expectedPayloads := make([]string, rowCount) + for i := 0; i < rowCount; i++ { + row := sqlDB.QueryRow(t, `INSERT INTO mvcc_timestamp_test_table VALUES (DEFAULT) RETURNING id, cluster_logical_timestamp()`) + + var id string + var mvccTimestamp string + row.Scan(&id, &mvccTimestamp) + expectedPayloads[i] = fmt.Sprintf(`mvcc_timestamp_test_table: {"id":{"string":"%[1]s"}}->{"after":{"mvcc_timestamp_test_table":{"id":{"string":"%[1]s"}}},"mvcc_timestamp":{"string":"%[2]s"}}`, + id, mvccTimestamp) + } + + changeFeed := feed(t, f, `CREATE CHANGEFEED FOR mvcc_timestamp_test_table WITH mvcc_timestamp, format='avro'`) + defer closeFeed(t, changeFeed) + assertPayloads(t, changeFeed, expectedPayloads) + } + + cdcTest(t, testFn, feedTestForceSink(`kafka`)) +} + func TestChangefeedResolvedFrequency(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/encoder_avro.go b/pkg/ccl/changefeedccl/encoder_avro.go index 99942ab9b589..6e899aac1a12 100644 --- a/pkg/ccl/changefeedccl/encoder_avro.go +++ b/pkg/ccl/changefeedccl/encoder_avro.go @@ -31,6 +31,7 @@ type confluentAvroEncoder struct { schemaRegistry schemaRegistry schemaPrefix string updatedField, beforeField bool + mvccTimestampField bool virtualColumnVisibility changefeedbase.VirtualColumnVisibility targets changefeedbase.Targets envelopeType changefeedbase.EnvelopeType @@ -87,6 +88,7 @@ func newConfluentAvroEncoder( e.updatedField = opts.UpdatedTimestamps e.beforeField = opts.Diff e.customKeyColumn = opts.CustomKeyColumn + e.mvccTimestampField = opts.MVCCTimestamps // TODO: Implement this. if opts.KeyInValue { @@ -254,10 +256,10 @@ func (e *confluentAvroEncoder) EncodeValue( // This means metadata can safely go at the top level as there are never arbitrary column names // for it to conflict with. if e.envelopeType == changefeedbase.OptEnvelopeWrapped { - opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField} + opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField} afterDataSchema = currentSchema } else { - opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField} + opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField} recordDataSchema = currentSchema } @@ -281,11 +283,12 @@ func (e *confluentAvroEncoder) EncodeValue( e.valueCache.Add(cacheKey, registered) } - var meta avroMetadata + meta := avroMetadata{} if registered.schema.opts.updatedField { - meta = map[string]interface{}{ - `updated`: evCtx.updated, - } + meta[`updated`] = evCtx.updated + } + if registered.schema.opts.mvccTimestampField { + meta[`mvcc_timestamp`] = evCtx.mvcc } // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format