From b050218d53a37b2da3d9b365167002634b0602a4 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 5 Jun 2023 16:42:21 -0400 Subject: [PATCH 1/2] changefeedccl: use buildutil.CrdbTestBuild for parquet testing Parquet testing requires that extra metadata be written to parquet files so tests can create CDC rows from the raw data. Previously, the production parquet code relied on a testing knob to be passed to write this extra metadata. This is problematic as not all tests would pass the testing knob, making it so that we could not randomly use parquet in those tests for metamorphic testing purposes. With this change, the parquet production code uses `buildutil.CrdbTestBuild`, which is a global flag enabled in tests. Now, metamorphic parquet testing can be applied to more tests. Epic: None Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeed_test.go | 7 ++----- pkg/ccl/changefeedccl/helpers_test.go | 5 ----- pkg/ccl/changefeedccl/parquet.go | 21 +++++++++---------- .../parquet_sink_cloudstorage.go | 4 ++-- pkg/ccl/changefeedccl/parquet_test.go | 2 +- pkg/ccl/changefeedccl/testfeed_test.go | 7 +------ pkg/ccl/changefeedccl/testing_knobs.go | 4 ---- 8 files changed, 17 insertions(+), 34 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 7671a1c4f881..c180f3344f40 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -122,6 +122,7 @@ go_library( "//pkg/util/admission/admissionpb", "//pkg/util/bitarray", "//pkg/util/bufalloc", + "//pkg/util/buildutil", "//pkg/util/cache", "//pkg/util/ctxgroup", "//pkg/util/duration", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e20522641e10..ef87dadc34c7 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6091,7 +6091,6 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) { DistSQL: &execinfra.TestingKnobs{ DrainFast: true, Changefeed: &TestingKnobs{ - EnableParquetMetadata: true, // Filter out draining nodes; normally we rely on dist sql planner // to do that for us. FilterDrainingNodes: func( @@ -6282,10 +6281,8 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) { DefaultTestTenant: base.TestTenantDisabled, Knobs: base.TestingKnobs{ DistSQL: &execinfra.TestingKnobs{ - DrainFast: true, - Changefeed: &TestingKnobs{ - EnableParquetMetadata: true, - }, + DrainFast: true, + Changefeed: &TestingKnobs{}, }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index dfba5ca8dfb0..194f4011e234 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -1085,11 +1085,6 @@ func cdcTestNamedWithSystem( // Even if the parquet format is not being used, enable metadata // in all tests for simplicity. testServer, cleanupServer := makeServerWithOptions(t, options) - knobs := testServer.TestingKnobs. - DistSQL.(*execinfra.TestingKnobs). - Changefeed.(*TestingKnobs) - knobs.EnableParquetMetadata = true - feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options) feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t) defer cleanupServer() diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index d3a8b1726321..243a8dfd7ddf 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -17,10 +17,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) +// includeParquestTestMetadata configures the parquet writer to write +// metadata required for reading parquet files in tests. +var includeParquestTestMetadata = buildutil.CrdbTestBuild + type parquetWriter struct { inner *parquet.Writer datumAlloc []tree.Datum @@ -56,22 +61,19 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int // newParquetWriterFromRow constructs a new parquet writer which outputs to // the given sink. This function interprets the schema from the supplied row. func newParquetWriterFromRow( - row cdcevent.Row, - sink io.Writer, - knobs *TestingKnobs, /* may be nil */ - opts ...parquet.Option, + row cdcevent.Row, sink io.Writer, opts ...parquet.Option, ) (*parquetWriter, error) { schemaDef, numCols, err := newParquetSchemaDefintion(row) if err != nil { return nil, err } - if knobs != nil && knobs.EnableParquetMetadata { + if includeParquestTestMetadata { if opts, err = addParquetTestMetadata(row, opts); err != nil { return nil, err } } - writer, err := newParquetWriter(schemaDef, sink, knobs, opts...) + writer, err := newParquetWriter(schemaDef, sink, opts...) if err != nil { return nil, err } @@ -213,12 +215,9 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error // newParquetWriter allocates a new parquet writer using the provided // schema definition. func newParquetWriter( - sch *parquet.SchemaDefinition, - sink io.Writer, - knobs *TestingKnobs, /* may be nil */ - opts ...parquet.Option, + sch *parquet.SchemaDefinition, sink io.Writer, opts ...parquet.Option, ) (*parquet.Writer, error) { - if knobs != nil && knobs.EnableParquetMetadata { + if includeParquestTestMetadata { // To use parquet test utils for reading datums, the writer needs to be // configured with additional metadata. return parquet.NewWriterWithReaderMeta(sch, sink, opts...) diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index 1ca96cccab84..c27db2d0a72d 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -147,7 +147,7 @@ func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp( // TODO: Ideally, we do not create a new schema and writer every time // we emit a resolved timestamp. Currently, util/parquet does not support it. - writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs) + writer, err := newParquetWriter(sch, &buf) if err != nil { return err } @@ -195,7 +195,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if file.parquetCodec == nil { var err error file.parquetCodec, err = newParquetWriterFromRow( - updatedRow, &file.buf, parquetSink.wrapped.testingKnobs, + updatedRow, &file.buf, parquet.WithCompressionCodec(parquetSink.compression)) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 0f1772ea4a5d..9bc71934d9f5 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -121,7 +121,7 @@ func TestParquetRows(t *testing.T) { require.NoError(t, err) if writer == nil { - writer, err = newParquetWriterFromRow(updatedRow, f, &TestingKnobs{EnableParquetMetadata: true}, parquet.WithMaxRowGroupLength(maxRowGroupSize), + writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize), parquet.WithCompressionCodec(parquet.CompressionGZIP)) if err != nil { t.Fatalf(err.Error()) diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index de6662574316..dc9c672bcc76 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/distsql" - "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -1053,11 +1052,7 @@ func (f *cloudFeedFactory) Feed( // Determine if we can enable the parquet format if the changefeed is not // being created with incompatible options. If it can be enabled, we will use // parquet format with a probability of 0.4. - // - // TODO: Consider making this knob a global flag so tests that don't - // initialize testing knobs can use parquet metamorphically. - knobs := f.s.TestingKnobs().DistSQL.(*execinfra.TestingKnobs).Changefeed - parquetPossible := knobs != nil && knobs.(*TestingKnobs).EnableParquetMetadata + parquetPossible := includeParquestTestMetadata explicitEnvelope := false for _, opt := range createStmt.Options { if string(opt.Key) == changefeedbase.OptEnvelope { diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index d49815132ae1..f50f775864db 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -74,10 +74,6 @@ type TestingKnobs struct { // OnDrain returns the channel to select on to detect node drain OnDrain func() <-chan struct{} - - // EnableParquetMetadata configures the parquet format writer to write - // metadata which is required for testing. - EnableParquetMetadata bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. From 3045e41823cc0f15d6b0def9df9ff1858fa0f3ee Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 5 Jun 2023 15:56:31 -0400 Subject: [PATCH 2/2] changefeedccl: support the updated and mvcc timestamp options with parquet format Previously, using the parquet format in changefeeds did not support using the `mvcc` or `updated` options. This change adds support for using these options with parquet. Epic: None Release note: None --- .../changefeedccl/changefeedbase/options.go | 3 +- pkg/ccl/changefeedccl/event_processing.go | 28 ++--- pkg/ccl/changefeedccl/parquet.go | 101 ++++++++++++++---- .../parquet_sink_cloudstorage.go | 8 +- pkg/ccl/changefeedccl/parquet_test.go | 11 +- pkg/ccl/changefeedccl/sink.go | 4 +- pkg/ccl/changefeedccl/testfeed_test.go | 70 +++++++----- pkg/util/parquet/schema.go | 4 + 8 files changed, 158 insertions(+), 71 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/options.go b/pkg/ccl/changefeedccl/changefeedbase/options.go index 6be98c32925a..97ee75b2348e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/options.go +++ b/pkg/ccl/changefeedccl/changefeedbase/options.go @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt // columns, so there is no reason to emit duplicate key datums. // // TODO(#103129): add support for some of these -var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, - OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue) +var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue) // AlterChangefeedUnsupportedOptions are changefeed options that we do not allow // users to alter. diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index f4994e01cda1..a447cbcc250f 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp } type kvEventToRowConsumer struct { frontier - encoder Encoder - scratch bufalloc.ByteAllocator - sink EventSink - cursor hlc.Timestamp - knobs TestingKnobs - decoder cdcevent.Decoder - details ChangefeedConfig - evaluator *cdceval.Evaluator - encodingFormat changefeedbase.FormatType + encoder Encoder + scratch bufalloc.ByteAllocator + sink EventSink + cursor hlc.Timestamp + knobs TestingKnobs + decoder cdcevent.Decoder + details ChangefeedConfig + evaluator *cdceval.Evaluator + encodingOpts changefeedbase.EncodingOptions topicDescriptorCache map[TopicIdentifier]TopicDescriptor topicNamer *TopicNamer @@ -256,7 +256,7 @@ func newKVEventToRowConsumer( topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, evaluator: evaluator, - encodingFormat: encodingOpts.Format, + encodingOpts: encodingOpts, metrics: metrics, pacer: pacer, }, nil @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit( } } - if c.encodingFormat == changefeedbase.OptFormatParquet { + if c.encodingOpts.Format == changefeedbase.OptFormatParquet { return c.encodeForParquet( - ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc, + ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, + c.encodingOpts, alloc, ) } var keyCopy, valueCopy []byte @@ -478,6 +479,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { sinkWithEncoder, ok := c.sink.(SinkWithEncoder) @@ -485,7 +487,7 @@ func (c *kvEventToRowConsumer) encodeForParquet( return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink) } if err := sinkWithEncoder.EncodeAndEmitRow( - ctx, updatedRow, prevRow, topic, updated, mvcc, alloc, + ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc, ); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index 243a8dfd7ddf..9e50c8360adb 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -15,9 +15,11 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/parquet" "github.com/cockroachdb/errors" ) @@ -27,13 +29,17 @@ import ( var includeParquestTestMetadata = buildutil.CrdbTestBuild type parquetWriter struct { - inner *parquet.Writer - datumAlloc []tree.Datum + inner *parquet.Writer + encodingOpts changefeedbase.EncodingOptions + schemaDef *parquet.SchemaDefinition + datumAlloc []tree.Datum } // newParquetSchemaDefintion returns a parquet schema definition based on the // cdcevent.Row and the number of cols in the schema. -func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) { +func newParquetSchemaDefintion( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, +) (*parquet.SchemaDefinition, error) { var columnNames []string var columnTypes []*types.T @@ -44,32 +50,54 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int numCols += 1 return nil }); err != nil { - return nil, 0, err + return nil, err } columnNames = append(columnNames, parquetCrdbEventTypeColName) columnTypes = append(columnTypes, types.String) numCols += 1 + columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts) + schemaDef, err := parquet.NewSchema(columnNames, columnTypes) if err != nil { - return nil, 0, err + return nil, err + } + return schemaDef, nil +} + +const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps +const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps + +func appendMetadataColsToSchema( + columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions, +) (updatedNames []string, updatedTypes []*types.T) { + if encodingOpts.UpdatedTimestamps { + columnNames = append(columnNames, parquetOptUpdatedTimestampColName) + columnTypes = append(columnTypes, types.String) + } + if encodingOpts.MVCCTimestamps { + columnNames = append(columnNames, parquetOptMVCCTimestampColName) + columnTypes = append(columnTypes, types.String) } - return schemaDef, numCols, nil + return columnNames, columnTypes } // newParquetWriterFromRow constructs a new parquet writer which outputs to // the given sink. This function interprets the schema from the supplied row. func newParquetWriterFromRow( - row cdcevent.Row, sink io.Writer, opts ...parquet.Option, + row cdcevent.Row, + sink io.Writer, + encodingOpts changefeedbase.EncodingOptions, + opts ...parquet.Option, ) (*parquetWriter, error) { - schemaDef, numCols, err := newParquetSchemaDefintion(row) + schemaDef, err := newParquetSchemaDefintion(row, encodingOpts) if err != nil { return nil, err } if includeParquestTestMetadata { - if opts, err = addParquetTestMetadata(row, opts); err != nil { + if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil { return nil, err } } @@ -77,13 +105,20 @@ func newParquetWriterFromRow( if err != nil { return nil, err } - return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil + return &parquetWriter{ + inner: writer, + encodingOpts: encodingOpts, + schemaDef: schemaDef, + datumAlloc: make([]tree.Datum, schemaDef.NumColumns()), + }, nil } // addData writes the updatedRow, adding the row's event type. There is no guarantee // that data will be flushed after this function returns. -func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error { - if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil { +func (w *parquetWriter) addData( + updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp, +) error { + if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil { return err } @@ -96,7 +131,13 @@ func (w *parquetWriter) close() error { } // populateDatums writes the appropriate datums into the datumAlloc slice. -func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { +func populateDatums( + updatedRow cdcevent.Row, + prevRow cdcevent.Row, + encodingOpts changefeedbase.EncodingOptions, + updated, mvcc hlc.Timestamp, + datumAlloc []tree.Datum, +) error { datums := datumAlloc[:0] if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { @@ -106,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] return err } datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) + + if encodingOpts.UpdatedTimestamps { + datums = append(datums, tree.NewDString(timestampToString(updated))) + } + if encodingOpts.MVCCTimestamps { + datums = append(datums, tree.NewDString(timestampToString(mvcc))) + } return nil } @@ -116,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc [] // `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value // columns in a JSON object. The metadata generated by this function contains // key and value column names along with their offsets in the parquet file. -func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) { +func addParquetTestMetadata( + row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option, +) ([]parquet.Option, error) { // NB: Order matters. When iterating using ForAllColumns, which is used when // writing datums and defining the schema, the order of columns usually // matches the underlying table. If a composite keys defined, the order in @@ -129,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. keysInOrder = append(keysInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // NB: We do not use ForAllColumns here because it will always contain the @@ -143,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. valuesInOrder = append(valuesInOrder, col.Name) return nil }); err != nil { - return opts, err + return parquetOpts, err } // Iterate over ForAllColumns to determine the offets of each column @@ -162,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet. idx += 1 return nil }); err != nil { - return opts, err + return parquetOpts, err } valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName) valueCols[parquetCrdbEventTypeColName] = idx idx += 1 - opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) - opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) - return opts, nil + if encodingOpts.UpdatedTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName) + valueCols[parquetOptUpdatedTimestampColName] = idx + idx += 1 + } + if encodingOpts.MVCCTimestamps { + valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName) + valueCols[parquetOptMVCCTimestampColName] = idx + idx += 1 + } + + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)})) + parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)})) + return parquetOpts, nil } // serializeMap serializes a map to a string. For example, orderedKeys=["b", diff --git a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go index c27db2d0a72d..007182be03a2 100644 --- a/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go @@ -15,6 +15,7 @@ import ( "path/filepath" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" @@ -29,7 +30,7 @@ import ( // This is an extra column that will be added to every parquet file which tells // us about the type of event that generated a particular row. The types are // defined below. -const parquetCrdbEventTypeColName string = "__crdb__event_type__" +const parquetCrdbEventTypeColName string = metaSentinel + "event_type" type parquetEventType int @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { s := parquetSink.wrapped @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow( if file.parquetCodec == nil { var err error file.parquetCodec, err = newParquetWriterFromRow( - updatedRow, &file.buf, + updatedRow, &file.buf, encodingOpts, parquet.WithCompressionCodec(parquetSink.compression)) if err != nil { return err } } - if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil { + if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil { return err } diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 9bc71934d9f5..a55fd3ca8839 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/parquet" @@ -120,8 +121,10 @@ func TestParquetRows(t *testing.T) { ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false) require.NoError(t, err) + encodingOpts := changefeedbase.EncodingOptions{} + if writer == nil { - writer, err = newParquetWriterFromRow(updatedRow, f, parquet.WithMaxRowGroupLength(maxRowGroupSize), + writer, err = newParquetWriterFromRow(updatedRow, f, encodingOpts, parquet.WithMaxRowGroupLength(maxRowGroupSize), parquet.WithCompressionCodec(parquet.CompressionGZIP)) if err != nil { t.Fatalf(err.Error()) @@ -129,12 +132,12 @@ func TestParquetRows(t *testing.T) { numCols = len(updatedRow.ResultColumns()) + 1 } - err = writer.addData(updatedRow, prevRow) + err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{}) require.NoError(t, err) // Save a copy of the datums we wrote. - datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1) - err = populateDatums(updatedRow, prevRow, datumRow) + datumRow := make([]tree.Datum, writer.schemaDef.NumColumns()) + err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow) require.NoError(t, err) datums[i] = datumRow } diff --git a/pkg/ccl/changefeedccl/sink.go b/pkg/ccl/changefeedccl/sink.go index 5269d6c213f4..0d730a835f4a 100644 --- a/pkg/ccl/changefeedccl/sink.go +++ b/pkg/ccl/changefeedccl/sink.go @@ -450,10 +450,11 @@ func (s errorWrapperSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.wrapped.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.wrapped) } @@ -716,6 +717,7 @@ type SinkWithEncoder interface { prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index dc9c672bcc76..c5df6e32538d 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -623,10 +623,11 @@ func (s *notifyFlushSink) EncodeAndEmitRow( prevRow cdcevent.Row, topic TopicDescriptor, updated, mvcc hlc.Timestamp, + encodingOpts changefeedbase.EncodingOptions, alloc kvevent.Alloc, ) error { if sinkWithEncoder, ok := s.Sink.(SinkWithEncoder); ok { - return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, alloc) + return sinkWithEncoder.EncodeAndEmitRow(ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc) } return errors.AssertionFailedf("Expected a sink with encoder for, found %T", s.Sink) } @@ -1245,18 +1246,28 @@ func (c *cloudFeed) appendParquetTestFeedMessages( return err } - for _, row := range datums { - rowCopy := make([]string, len(valueColumnNamesOrdered)-1) - copy(rowCopy, valueColumnNamesOrdered[:len(valueColumnNamesOrdered)-1]) - rowJSONBuilder, err := json.NewFixedKeysObjectBuilder(rowCopy) - if err != nil { - return err + // Extract metadata columns into metaColumnNameSet. + extractMetaColumns := func(columnNameSet map[string]int) map[string]int { + metaColumnNameSet := make(map[string]int) + for colName, colIdx := range columnNameSet { + switch colName { + case parquetCrdbEventTypeColName: + metaColumnNameSet[colName] = colIdx + case parquetOptUpdatedTimestampColName: + metaColumnNameSet[colName] = colIdx + case parquetOptMVCCTimestampColName: + metaColumnNameSet[colName] = colIdx + default: + } } + return metaColumnNameSet + } + metaColumnNameSet := extractMetaColumns(columnNameSet) + for _, row := range datums { + rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet)) keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered)) - isDeleted := false - for _, primaryKeyColumnName := range primaryKeysNamesOrdered { datum := row[primaryKeyColumnSet[primaryKeyColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) @@ -1267,28 +1278,26 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } for _, valueColumnName := range valueColumnNamesOrdered { - if valueColumnName == parquetCrdbEventTypeColName { - if *(row[columnNameSet[valueColumnName]].(*tree.DString)) == *parquetEventDelete.DString() { - isDeleted = true - } - break + if _, isMeta := metaColumnNameSet[valueColumnName]; isMeta { + continue } + datum := row[columnNameSet[valueColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err := rowJSONBuilder.Set(valueColumnName, j); err != nil { - return err - } + rowJSONBuilder.Add(valueColumnName, j) } - var valueWithAfter *json.FixedKeysObjectBuilder + var valueWithAfter *json.ObjectBuilder + + isDeleted := *(row[metaColumnNameSet[parquetCrdbEventTypeColName]].(*tree.DString)) == *parquetEventDelete.DString() if envelopeType == changefeedbase.OptEnvelopeBare { valueWithAfter = rowJSONBuilder } else { - valueWithAfter, err = json.NewFixedKeysObjectBuilder([]string{"after"}) + valueWithAfter = json.NewObjectBuilder(1) if err != nil { return err } @@ -1297,26 +1306,31 @@ func (c *cloudFeed) appendParquetTestFeedMessages( if err != nil { return err } - if err = valueWithAfter.Set("after", nullJSON); err != nil { - return err - } + valueWithAfter.Add("after", nullJSON) } else { - vbJson, err := rowJSONBuilder.Build() + vbJson := rowJSONBuilder.Build() + valueWithAfter.Add("after", vbJson) + } + + if updatedColIdx, updated := metaColumnNameSet[parquetOptUpdatedTimestampColName]; updated { + j, err := tree.AsJSON(row[updatedColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) if err != nil { return err } - if err = valueWithAfter.Set("after", vbJson); err != nil { + valueWithAfter.Add(changefeedbase.OptUpdatedTimestamps, j) + } + if mvccColIdx, mvcc := metaColumnNameSet[parquetOptMVCCTimestampColName]; mvcc { + j, err := tree.AsJSON(row[mvccColIdx], sessiondatapb.DataConversionConfig{}, time.UTC) + if err != nil { return err } + valueWithAfter.Add(changefeedbase.OptMVCCTimestamps, j) } } keyJSON := keyJSONBuilder.Build() - rowJSON, err := valueWithAfter.Build() - if err != nil { - return err - } + rowJSON := valueWithAfter.Build() var keyBuf bytes.Buffer keyJSON.Format(&keyBuf) diff --git a/pkg/util/parquet/schema.go b/pkg/util/parquet/schema.go index 5ffb4e650b5a..920550b9ee18 100644 --- a/pkg/util/parquet/schema.go +++ b/pkg/util/parquet/schema.go @@ -72,6 +72,10 @@ type SchemaDefinition struct { schema *schema.Schema } +func (sd *SchemaDefinition) NumColumns() int { + return len(sd.cols) +} + // NewSchema generates a SchemaDefinition. // // Columns in the returned SchemaDefinition will match the order they appear in