Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
129840: changefeedccl: emit mvcc_timestamp for avro format r=andyyang890 a=rharding6373

This PR adds support for the mvcc_timestamp option with the avro format. Before this change, changefeeds using avro would not fail if mvcc_timestamp was specified, but would ignore the option. Now avro supports the mvcc_timestamp by adding mvcc_timestamp to the schema and emitting the mvcc value with the row data.

Epic: none
Fixes: cockroachdb#123078

Release note (enterprise change): Adds changefeed support for the mvcc_timestamp option with the avro format. If both options are specified, the avro schema includes an mvcc_timestamp metadata field and emits the row's mvcc timestamp with the row data.

Co-authored-by: rharding6373 <[email protected]>
  • Loading branch information
craig[bot] and rharding6373 committed Aug 31, 2024
2 parents a304b34 + 597bf39 commit 8551145
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 6 deletions.
23 changes: 23 additions & 0 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ type avroMetadata map[string]interface{}
type avroEnvelopeOpts struct {
beforeField, afterField, recordField bool
updatedField, resolvedField bool
mvccTimestampField bool
}

// avroEnvelopeRecord is an `avroRecord` that wraps a changed SQL row and some
Expand Down Expand Up @@ -954,6 +955,14 @@ func envelopeToAvroSchema(
}
schema.Fields = append(schema.Fields, updatedField)
}
if opts.mvccTimestampField {
mvccTimestampField := &avroSchemaField{
SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString},
Name: `mvcc_timestamp`,
Default: nil,
}
schema.Fields = append(schema.Fields, mvccTimestampField)
}
if opts.resolvedField {
resolvedField := &avroSchemaField{
SchemaType: []avroSchemaType{avroSchemaNull, avroSchemaString},
Expand Down Expand Up @@ -1037,6 +1046,20 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
native[`updated`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}

if r.opts.mvccTimestampField {
native[`mvcc_timestamp`] = nil
if u, ok := meta[`mvcc_timestamp`]; ok {
delete(meta, `mvcc_timestamp`)
ts, ok := u.(hlc.Timestamp)
if !ok {
return nil, changefeedbase.WithTerminalError(
errors.Errorf(`unknown metadata timestamp type: %T`, u))
}
native[`mvcc_timestamp`] = goavro.Union(avroUnionKey(avroSchemaString), ts.AsOfSystemTime())
}
}

if r.opts.resolvedField {
native[`resolved`] = nil
if u, ok := meta[`resolved`]; ok {
Expand Down
28 changes: 28 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,34 @@ func TestChangefeedMVCCTimestamps(t *testing.T) {
cdcTest(t, testFn)
}

func TestChangefeedMVCCTimestampsAvro(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE mvcc_timestamp_test_table (id UUID PRIMARY KEY DEFAULT gen_random_uuid())`)

const rowCount = 5
expectedPayloads := make([]string, rowCount)
for i := 0; i < rowCount; i++ {
row := sqlDB.QueryRow(t, `INSERT INTO mvcc_timestamp_test_table VALUES (DEFAULT) RETURNING id, cluster_logical_timestamp()`)

var id string
var mvccTimestamp string
row.Scan(&id, &mvccTimestamp)
expectedPayloads[i] = fmt.Sprintf(`mvcc_timestamp_test_table: {"id":{"string":"%[1]s"}}->{"after":{"mvcc_timestamp_test_table":{"id":{"string":"%[1]s"}}},"mvcc_timestamp":{"string":"%[2]s"}}`,
id, mvccTimestamp)
}

changeFeed := feed(t, f, `CREATE CHANGEFEED FOR mvcc_timestamp_test_table WITH mvcc_timestamp, format='avro'`)
defer closeFeed(t, changeFeed)
assertPayloads(t, changeFeed, expectedPayloads)
}

cdcTest(t, testFn, feedTestForceSink(`kafka`))
}

func TestChangefeedResolvedFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
15 changes: 9 additions & 6 deletions pkg/ccl/changefeedccl/encoder_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type confluentAvroEncoder struct {
schemaRegistry schemaRegistry
schemaPrefix string
updatedField, beforeField bool
mvccTimestampField bool
virtualColumnVisibility changefeedbase.VirtualColumnVisibility
targets changefeedbase.Targets
envelopeType changefeedbase.EnvelopeType
Expand Down Expand Up @@ -90,6 +91,7 @@ func newConfluentAvroEncoder(
e.updatedField = opts.UpdatedTimestamps
e.beforeField = opts.Diff
e.customKeyColumn = opts.CustomKeyColumn
e.mvccTimestampField = opts.MVCCTimestamps

// TODO: Implement this.
if opts.KeyInValue {
Expand Down Expand Up @@ -257,10 +259,10 @@ func (e *confluentAvroEncoder) EncodeValue(
// This means metadata can safely go at the top level as there are never arbitrary column names
// for it to conflict with.
if e.envelopeType == changefeedbase.OptEnvelopeWrapped {
opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField}
opts = avroEnvelopeOpts{afterField: true, beforeField: e.beforeField, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField}
afterDataSchema = currentSchema
} else {
opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField}
opts = avroEnvelopeOpts{recordField: true, updatedField: e.updatedField, mvccTimestampField: e.mvccTimestampField}
recordDataSchema = currentSchema
}

Expand All @@ -284,11 +286,12 @@ func (e *confluentAvroEncoder) EncodeValue(
e.valueCache.Add(cacheKey, registered)
}

var meta avroMetadata
meta := avroMetadata{}
if registered.schema.opts.updatedField {
meta = map[string]interface{}{
`updated`: evCtx.updated,
}
meta[`updated`] = evCtx.updated
}
if registered.schema.opts.mvccTimestampField {
meta[`mvcc_timestamp`] = evCtx.mvcc
}

// https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
Expand Down

0 comments on commit 8551145

Please sign in to comment.