Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: support the updated and mvcc with parquet format #104407

Merged
merged 2 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ go_library(
"//pkg/util/admission/admissionpb",
"//pkg/util/bitarray",
"//pkg/util/bufalloc",
"//pkg/util/buildutil",
"//pkg/util/cache",
"//pkg/util/ctxgroup",
"//pkg/util/duration",
Expand Down
7 changes: 2 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6091,7 +6091,6 @@ func TestChangefeedHandlesRollingRestart(t *testing.T) {
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
// Filter out draining nodes; normally we rely on dist sql planner
// to do that for us.
FilterDrainingNodes: func(
Expand Down Expand Up @@ -6282,10 +6281,8 @@ func TestChangefeedPropagatesTerminalError(t *testing.T) {
DefaultTestTenant: base.TestTenantDisabled,
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
DrainFast: true,
Changefeed: &TestingKnobs{
EnableParquetMetadata: true,
},
DrainFast: true,
Changefeed: &TestingKnobs{},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ var InitialScanOnlyUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, Opt
// columns, so there is no reason to emit duplicate key datums.
//
// TODO(#103129): add support for some of these
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff,
OptMVCCTimestamps, OptUpdatedTimestamps, OptKeyInValue)
var ParquetFormatUnsupportedOptions OptionsSet = makeStringSet(OptEndTime, OptDiff, OptKeyInValue, OptTopicInValue)

// AlterChangefeedUnsupportedOptions are changefeed options that we do not allow
// users to alter.
Expand Down
28 changes: 15 additions & 13 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ type frontier interface{ Frontier() hlc.Timestamp }

type kvEventToRowConsumer struct {
frontier
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingFormat changefeedbase.FormatType
encoder Encoder
scratch bufalloc.ByteAllocator
sink EventSink
cursor hlc.Timestamp
knobs TestingKnobs
decoder cdcevent.Decoder
details ChangefeedConfig
evaluator *cdceval.Evaluator
encodingOpts changefeedbase.EncodingOptions

topicDescriptorCache map[TopicIdentifier]TopicDescriptor
topicNamer *TopicNamer
Expand Down Expand Up @@ -256,7 +256,7 @@ func newKVEventToRowConsumer(
topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor),
topicNamer: topicNamer,
evaluator: evaluator,
encodingFormat: encodingOpts.Format,
encodingOpts: encodingOpts,
metrics: metrics,
pacer: pacer,
}, nil
Expand Down Expand Up @@ -429,9 +429,10 @@ func (c *kvEventToRowConsumer) encodeAndEmit(
}
}

if c.encodingFormat == changefeedbase.OptFormatParquet {
if c.encodingOpts.Format == changefeedbase.OptFormatParquet {
return c.encodeForParquet(
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp, alloc,
ctx, updatedRow, prevRow, topic, schemaTS, updatedRow.MvccTimestamp,
c.encodingOpts, alloc,
)
}
var keyCopy, valueCopy []byte
Expand Down Expand Up @@ -478,14 +479,15 @@ func (c *kvEventToRowConsumer) encodeForParquet(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
sinkWithEncoder, ok := c.sink.(SinkWithEncoder)
if !ok {
return errors.AssertionFailedf("Expected a SinkWithEncoder for parquet format, found %T", c.sink)
}
if err := sinkWithEncoder.EncodeAndEmitRow(
ctx, updatedRow, prevRow, topic, updated, mvcc, alloc,
ctx, updatedRow, prevRow, topic, updated, mvcc, encodingOpts, alloc,
); err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,11 +1085,6 @@ func cdcTestNamedWithSystem(
// Even if the parquet format is not being used, enable metadata
// in all tests for simplicity.
testServer, cleanupServer := makeServerWithOptions(t, options)
knobs := testServer.TestingKnobs.
DistSQL.(*execinfra.TestingKnobs).
Changefeed.(*TestingKnobs)
knobs.EnableParquetMetadata = true

feedFactory, cleanupSink := makeFeedFactoryWithOptions(t, sinkType, testServer.Server, testServer.DB, options)
feedFactory = maybeUseExternalConnection(feedFactory, testServer.DB, sinkType, options, t)
defer cleanupServer()
Expand Down
114 changes: 87 additions & 27 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,31 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/errors"
)

// includeParquestTestMetadata configures the parquet writer to write
// metadata required for reading parquet files in tests.
var includeParquestTestMetadata = buildutil.CrdbTestBuild

type parquetWriter struct {
inner *parquet.Writer
datumAlloc []tree.Datum
inner *parquet.Writer
encodingOpts changefeedbase.EncodingOptions
schemaDef *parquet.SchemaDefinition
datumAlloc []tree.Datum
}

// newParquetSchemaDefintion returns a parquet schema definition based on the
// cdcevent.Row and the number of cols in the schema.
func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int, error) {
func newParquetSchemaDefintion(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions,
) (*parquet.SchemaDefinition, error) {
var columnNames []string
var columnTypes []*types.T

Expand All @@ -39,49 +50,75 @@ func newParquetSchemaDefintion(row cdcevent.Row) (*parquet.SchemaDefinition, int
numCols += 1
return nil
}); err != nil {
return nil, 0, err
return nil, err
}

columnNames = append(columnNames, parquetCrdbEventTypeColName)
columnTypes = append(columnTypes, types.String)
numCols += 1

columnNames, columnTypes = appendMetadataColsToSchema(columnNames, columnTypes, encodingOpts)

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, 0, err
return nil, err
}
return schemaDef, nil
}

const parquetOptUpdatedTimestampColName = metaSentinel + changefeedbase.OptUpdatedTimestamps
const parquetOptMVCCTimestampColName = metaSentinel + changefeedbase.OptMVCCTimestamps

func appendMetadataColsToSchema(
columnNames []string, columnTypes []*types.T, encodingOpts changefeedbase.EncodingOptions,
) (updatedNames []string, updatedTypes []*types.T) {
if encodingOpts.UpdatedTimestamps {
columnNames = append(columnNames, parquetOptUpdatedTimestampColName)
columnTypes = append(columnTypes, types.String)
}
return schemaDef, numCols, nil
if encodingOpts.MVCCTimestamps {
columnNames = append(columnNames, parquetOptMVCCTimestampColName)
columnTypes = append(columnTypes, types.String)
}
return columnNames, columnTypes
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
encodingOpts changefeedbase.EncodingOptions,
opts ...parquet.Option,
) (*parquetWriter, error) {
schemaDef, numCols, err := newParquetSchemaDefintion(row)
schemaDef, err := newParquetSchemaDefintion(row, encodingOpts)
if err != nil {
return nil, err
}

if knobs != nil && knobs.EnableParquetMetadata {
if opts, err = addParquetTestMetadata(row, opts); err != nil {
if includeParquestTestMetadata {
if opts, err = addParquetTestMetadata(row, encodingOpts, opts); err != nil {
return nil, err
}
}
writer, err := newParquetWriter(schemaDef, sink, knobs, opts...)
writer, err := newParquetWriter(schemaDef, sink, opts...)
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, numCols)}, nil
return &parquetWriter{
inner: writer,
encodingOpts: encodingOpts,
schemaDef: schemaDef,
datumAlloc: make([]tree.Datum, schemaDef.NumColumns()),
}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
// that data will be flushed after this function returns.
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil {
func (w *parquetWriter) addData(
updatedRow cdcevent.Row, prevRow cdcevent.Row, updated, mvcc hlc.Timestamp,
) error {
if err := populateDatums(updatedRow, prevRow, w.encodingOpts, updated, mvcc, w.datumAlloc); err != nil {
return err
}

Expand All @@ -94,7 +131,13 @@ func (w *parquetWriter) close() error {
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
func populateDatums(
updatedRow cdcevent.Row,
prevRow cdcevent.Row,
encodingOpts changefeedbase.EncodingOptions,
updated, mvcc hlc.Timestamp,
datumAlloc []tree.Datum,
) error {
datums := datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
Expand All @@ -104,6 +147,13 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
return err
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())

if encodingOpts.UpdatedTimestamps {
datums = append(datums, tree.NewDString(timestampToString(updated)))
}
if encodingOpts.MVCCTimestamps {
datums = append(datums, tree.NewDString(timestampToString(mvcc)))
}
return nil
}

Expand All @@ -114,7 +164,9 @@ func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []
// `[0]->{"b": "b", "c": "c"}` with the key columns in square brackets and value
// columns in a JSON object. The metadata generated by this function contains
// key and value column names along with their offsets in the parquet file.
func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.Option, error) {
func addParquetTestMetadata(
row cdcevent.Row, encodingOpts changefeedbase.EncodingOptions, parquetOpts []parquet.Option,
) ([]parquet.Option, error) {
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
Expand All @@ -127,7 +179,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
keysInOrder = append(keysInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// NB: We do not use ForAllColumns here because it will always contain the
Expand All @@ -141,7 +193,7 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
valuesInOrder = append(valuesInOrder, col.Name)
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}

// Iterate over ForAllColumns to determine the offets of each column
Expand All @@ -160,15 +212,26 @@ func addParquetTestMetadata(row cdcevent.Row, opts []parquet.Option) ([]parquet.
idx += 1
return nil
}); err != nil {
return opts, err
return parquetOpts, err
}
valuesInOrder = append(valuesInOrder, parquetCrdbEventTypeColName)
valueCols[parquetCrdbEventTypeColName] = idx
idx += 1

opts = append(opts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
opts = append(opts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return opts, nil
if encodingOpts.UpdatedTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptUpdatedTimestampColName)
valueCols[parquetOptUpdatedTimestampColName] = idx
idx += 1
}
if encodingOpts.MVCCTimestamps {
valuesInOrder = append(valuesInOrder, parquetOptMVCCTimestampColName)
valueCols[parquetOptMVCCTimestampColName] = idx
idx += 1
}

parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"keyCols": serializeMap(keysInOrder, keyCols)}))
parquetOpts = append(parquetOpts, parquet.WithMetadata(map[string]string{"allCols": serializeMap(valuesInOrder, valueCols)}))
return parquetOpts, nil
}

// serializeMap serializes a map to a string. For example, orderedKeys=["b",
Expand Down Expand Up @@ -213,12 +276,9 @@ func deserializeMap(s string) (orderedKeys []string, m map[string]int, err error
// newParquetWriter allocates a new parquet writer using the provided
// schema definition.
func newParquetWriter(
sch *parquet.SchemaDefinition,
sink io.Writer,
knobs *TestingKnobs, /* may be nil */
opts ...parquet.Option,
sch *parquet.SchemaDefinition, sink io.Writer, opts ...parquet.Option,
) (*parquet.Writer, error) {
if knobs != nil && knobs.EnableParquetMetadata {
if includeParquestTestMetadata {
// To use parquet test utils for reading datums, the writer needs to be
// configured with additional metadata.
return parquet.NewWriterWithReaderMeta(sch, sink, opts...)
Expand Down
10 changes: 6 additions & 4 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
Expand All @@ -29,7 +30,7 @@ import (
// This is an extra column that will be added to every parquet file which tells
// us about the type of event that generated a particular row. The types are
// defined below.
const parquetCrdbEventTypeColName string = "__crdb__event_type__"
const parquetCrdbEventTypeColName string = metaSentinel + "event_type"

type parquetEventType int

Expand Down Expand Up @@ -147,7 +148,7 @@ func (parquetSink *parquetCloudStorageSink) EmitResolvedTimestamp(

// TODO: Ideally, we do not create a new schema and writer every time
// we emit a resolved timestamp. Currently, util/parquet does not support it.
writer, err := newParquetWriter(sch, &buf, parquetSink.wrapped.testingKnobs)
writer, err := newParquetWriter(sch, &buf)
if err != nil {
return err
}
Expand Down Expand Up @@ -183,6 +184,7 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
prevRow cdcevent.Row,
topic TopicDescriptor,
updated, mvcc hlc.Timestamp,
encodingOpts changefeedbase.EncodingOptions,
alloc kvevent.Alloc,
) error {
s := parquetSink.wrapped
Expand All @@ -195,14 +197,14 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
if file.parquetCodec == nil {
var err error
file.parquetCodec, err = newParquetWriterFromRow(
updatedRow, &file.buf, parquetSink.wrapped.testingKnobs,
updatedRow, &file.buf, encodingOpts,
parquet.WithCompressionCodec(parquetSink.compression))
if err != nil {
return err
}
}

if err := file.parquetCodec.addData(updatedRow, prevRow); err != nil {
if err := file.parquetCodec.addData(updatedRow, prevRow, updated, mvcc); err != nil {
return err
}

Expand Down
Loading