diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 6be98c32925a..97ee75b2348e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // columns, so there is no reason to emit duplicate key datums. // // TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, - OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index f4994e01cda1..a447cbcc250f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp } type kvEventToRowConsumer struct { frontier - encoder Encoder - scratch bufalloc.ByteAllocator - sink EventSink - cursor hlc.Timestamp - knobs TestingKnobs - decoder cdcevent.Decoder - details ChangefeedConfig - evaluator *cdceval.Evaluator - encodingFormat changefeedbase.FormatType + encoder Encoder + scratch bufalloc.ByteAllocator + sink EventSink + cursor hlc.Timestamp + knobs TestingKnobs + decoder cdcevent.Decoder + details ChangefeedConfig + evaluator *cdceval.Evaluator + encodingOpts changefeedbase.EncodingOptions topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer @@ -256,7 +256,7 @@ func newKVEventToRowConsumer( topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, evaluator: evaluator, - encodingFormat: encodingOpts.Format, + encodingOpts: encodingOpts, metrics: metrics, pacer: pacer, }, nil @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit( } } - if c.encodingFormat == changefeedbase.OptFormatParquet { + if c.encodingOpts.Format == changefeedbase.OptFormatParquet { return c.encodeForParquet( - ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc, + ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, + c.encodingOpts, alloc, ) } var keyCopy, valueCopy []byte @@ -478,6 +479,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { sinkWithEncoder, ok := c.sink.(SinkWithEncoder) @@ -485,7 +487,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink) } if err := sinkWithEncoder.EncodeAndEmitRow( - ctx, updatedRow, prevRow, topic, updated, mvcc, alloc, + ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc, ); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 243a8dfd7ddf..9e50c8360adb 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -15,9 +15,11 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) @@ -27,13 +29,17 @@ import ( var includeParquestTestMetadata = buildutil.CrdbTestBuild type parquetWriter struct { - inner *parquet.Writer - datumAlloc []tree.Datum + inner *parquet.Writer + encodingOpts changefeedbase.EncodingOptions + schemaDef *parquet.SchemaDefinition + datumAlloc []tree.Datum } // newParquetSchemaDefintion returns a parquet schema definition based on the // cdcevent.Row and the number of cols in the schema. -func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) { +func newParquetSchemaDefintion( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, +) (*parquet.SchemaDefinition, error) { var columnNames []string var columnTypes []*types.T @@ -44,32 +50,54 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int numCols += 1 return nil }); err != nil { - return nil, 0, err + return nil, err } columnNames = append(columnNames, parquetCrdbEventTypeColName) columnTypes = append(columnTypes, types.String) numCols += 1 + columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts) + schemaDef, err := parquet.NewSchema(columnNames, columnTypes) if err != nil { - return nil, 0, err + return nil, err + } + return schemaDef, nil +} + +const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps +const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps + +func appendMetadataColsToSchema( + columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions, +) (updatedNames []string, updatedTypes []*types.T) { + if encodingOpts.UpdatedTimestamps { + columnNames = append(columnNames, parquetOptUpdatedTimestampColName) + columnTypes = append(columnTypes, types.String) + } + if encodingOpts.MVCCTimestamps { + columnNames = append(columnNames, parquetOptMVCCTimestampColName) + columnTypes = append(columnTypes, types.String) } - return schemaDef, numCols, nil + return columnNames, columnTypes } // newParquetWriterFromRow constructs a new parquet writer which outputs to // the given sink. This function interprets the schema from the supplied row. func newParquetWriterFromRow( - row cdcevent.Row, sink io.Writer, opts ...parquet.Option, + row cdcevent.Row, + sink io.Writer, + encodingOpts changefeedbase.EncodingOptions, + opts ...parquet.Option, ) (*parquetWriter, error) { - schemaDef, numCols, err := newParquetSchemaDefintion(row) + schemaDef, err := newParquetSchemaDefintion(row, encodingOpts) if err != nil { return nil, err } if includeParquestTestMetadata { - if opts, err = addParquetTestMetadata(row, opts); err != nil { + if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil { return nil, err } } @@ -77,13 +105,20 @@ func newParquetWriterFromRow( if err != nil { return nil, err } - return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil + return &parquetWriter{ + inner: writer, + encodingOpts: encodingOpts, + schemaDef: schemaDef, + datumAlloc: make([]tree.Datum, schemaDef.NumColumns()), + }, nil } // addData writes the updatedRow, adding the row's event type. There is no guarantee // that data will be flushed after this function returns. -func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error { - if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil { +func (w *parquetWriter) addData( + updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, +) error { + if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil { return err } @@ -96,7 +131,13 @@ func (w *parquetWriter) close() error { } // populateDatums writes the appropriate datums into the datumAlloc slice. -func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { +func populateDatums( + updatedRow cdcevent.Row, + prevRow cdcevent.Row, + encodingOpts changefeedbase.EncodingOptions, + updated, mvcc hlc.Timestamp, + datumAlloc []tree.Datum, +) error { datums := datumAlloc[:0] if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { @@ -106,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] return err } datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) + + if encodingOpts.UpdatedTimestamps { + datums = append(datums, tree.NewDString(timestampToString(updated))) + } + if encodingOpts.MVCCTimestamps { + datums = append(datums, tree.NewDString(timestampToString(mvcc))) + } return nil } @@ -116,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] // `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value // columns in a JSON object. The metadata generated by this function contains // key and value column names along with their offsets in the parquet file. -func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) { +func addParquetTestMetadata( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option, +) ([]parquet.Option, error) { // NB: Order matters. When iterating using ForAllColumns, which is used when // writing datums and defining the schema, the order of columns usually // matches the underlying table. If a composite keys defined, the order in @@ -129,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. keysInOrder = append(keysInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // NB: We do not use ForAllColumns here because it will always contain the @@ -143,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. valuesInOrder = append(valuesInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // Iterate over ForAllColumns to determine the offets of each column @@ -162,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. idx += 1 return nil }); err != nil { - return opts, err + return parquetOpts, err } valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName) valueCols[parquetCrdbEventTypeColName] = idx idx += 1 - opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) - opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) - return opts, nil + if encodingOpts.UpdatedTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName) + valueCols[parquetOptUpdatedTimestampColName] = idx + idx += 1 + } + if encodingOpts.MVCCTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName) + valueCols[parquetOptMVCCTimestampColName] = idx + idx += 1 + } + + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) + return parquetOpts, nil } // serializeMap serializes a map to a string. For example, orderedKeys=["b", diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index c27db2d0a72d..007182be03a2 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -15,6 +15,7 @@ import ( "path/filepath" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" @@ -29,7 +30,7 @@ import ( // This is an extra column that will be added to every parquet file which tells // us about the type of event that generated a particular row. The types are // defined below. -const parquetCrdbEventTypeColName string = "__crdb__event_type__" +const parquetCrdbEventTypeColName string = metaSentinel + "event_type" type parquetEventType int @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { s := parquetSink.wrapped @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if file.parquetCodec == nil { var err error file.parquetCodec, err = newParquetWriterFromRow( - updatedRow, &file.buf, + updatedRow, &file.buf, encodingOpts, parquet.WithCompressionCodec(parquetSink.compression)) if err != nil { return err } } - if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil { + if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 9bc71934d9f5..a55fd3ca8839 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" @@ -120,8 +121,10 @@ func TestParquetRows(t *testing.T) { ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false) require.NoError(t, err) + encodingOpts := changefeedbase.EncodingOptions{} + if writer == nil { - writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize), + writer, err = newParquetWriterFromRow(updatedRow, f, encodingOpts, parquet.WithMaxRowGroupLength(maxRowGroupSize), parquet.WithCompressionCodec(parquet.CompressionGZIP)) if err != nil { t.Fatalf(err.Error()) @@ -129,12 +132,12 @@ func TestParquetRows(t *testing.T) { numCols = len(updatedRow.ResultColumns()) + 1 } - err = writer.addData(updatedRow, prevRow) + err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{}) require.NoError(t, err) // Save a copy of the datums we wrote. - datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1) - err = populateDatums(updatedRow, prevRow, datumRow) + datumRow := make([]tree.Datum, writer.schemaDef.NumColumns()) + err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow) require.NoError(t, err) datums[i] = datumRow } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5269d6c213f4..0d730a835f4a 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -450,10 +450,11 @@ func (s errorWrapperSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped) } @@ -716,6 +717,7 @@ type SinkWithEncoder interface { prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index dc9c672bcc76..c5df6e32538d 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -623,10 +623,11 @@ func (s *notifyFlushSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.Sink.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.Sink) } @@ -1245,18 +1246,28 @@ func (c *cloudFeed) appendParquetTestFeedMessages( return err } - for _, row := range datums { - rowCopy := make([]string, len(valueColumnNamesOrdered)-1) - copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1]) - rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy) - if err != nil { - return err + // Extract metadata columns into metaColumnNameSet. + extractMetaColumns := func(columnNameSet map[string]int) map[string]int { + metaColumnNameSet := make(map[string]int) + for colName, colIdx := range columnNameSet { + switch colName { + case parquetCrdbEventTypeColName: + metaColumnNameSet[colName] = colIdx + case parquetOptUpdatedTimestampColName: + metaColumnNameSet[colName] = colIdx + case parquetOptMVCCTimestampColName: + metaColumnNameSet[colName] = colIdx + default: + } } + return metaColumnNameSet + } + metaColumnNameSet := extractMetaColumns(columnNameSet) + for _, row := range datums { + rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet)) keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered)) - isDeleted := false - for _, primaryKeyColumnName := range primaryKeysNamesOrdered { datum := row[primaryKeyColumnSet[primaryKeyColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) @@ -1267,28 +1278,26 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } for _, valueColumnName := range valueColumnNamesOrdered { - if valueColumnName == parquetCrdbEventTypeColName { - if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() { - isDeleted = true - } - break + if _, isMeta := metaColumnNameSet[valueColumnName]; isMeta { + continue } + datum := row[columnNameSet[valueColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err := rowJSONBuilder.Set(valueColumnName, j); err != nil { - return err - } + rowJSONBuilder.Add(valueColumnName, j) } - var valueWithAfter *json.FixedKeysObjectBuilder + var valueWithAfter *json.ObjectBuilder + + isDeleted := *(row[metaColumnNameSet[parquetCrdbEventTypeColName]].(*tree.DString)) == *parquetEventDelete.DString() if envelopeType == changefeedbase.OptEnvelopeBare { valueWithAfter = rowJSONBuilder } else { - valueWithAfter, err = json.NewFixedKeysObjectBuilder([]string{"after"}) + valueWithAfter = json.NewObjectBuilder(1) if err != nil { return err } @@ -1297,26 +1306,31 @@ func (c *cloudFeed) appendParquetTestFeedMessages( if err != nil { return err } - if err = valueWithAfter.Set("after", nullJSON); err != nil { - return err - } + valueWithAfter.Add("after", nullJSON) } else { - vbJson, err := rowJSONBuilder.Build() + vbJson := rowJSONBuilder.Build() + valueWithAfter.Add("after", vbJson) + } + + if updatedColIdx, updated := metaColumnNameSet[parquetOptUpdatedTimestampColName]; updated { + j, err := tree.AsJSON(row[updatedColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err = valueWithAfter.Set("after", vbJson); err != nil { + valueWithAfter.Add(changefeedbase.OptUpdatedTimestamps, j) + } + if mvccColIdx, mvcc := metaColumnNameSet[parquetOptMVCCTimestampColName]; mvcc { + j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { return err } + valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j) } } keyJSON := keyJSONBuilder.Build() - rowJSON, err := valueWithAfter.Build() - if err != nil { - return err - } + rowJSON := valueWithAfter.Build() var keyBuf bytes.Buffer keyJSON.Format(&keyBuf) diff --git a/pkg/util/parquet/schema.go b/pkg/util/parquet/schema.go index 5ffb4e650b5a..920550b9ee18 100644 --- a/pkg/util/parquet/schema.go +++ b/pkg/util/parquet/schema.go @@ -72,6 +72,10 @@ type SchemaDefinition struct { schema *schema.Schema } +func (sd *SchemaDefinition) NumColumns() int { + return len(sd.cols) +} + // NewSchema generates a SchemaDefinition. // // Columns in the returned SchemaDefinition will match the order they appear in