Skip to content

Commit

Permalink
changefeedccl: add parquet writer
Browse files Browse the repository at this point in the history
This change adds the file `parquet.go` which contains
helper functions to help create parquet writers
and export data via `cdcevent.Row` structs.

This change also adds tests to ensure rows are written
to parquet files correctly.

Epic: None
Release note: None
  • Loading branch information
jayshrivastava committed Apr 6, 2023
1 parent 442e3bc commit 1c002e8
Show file tree
Hide file tree
Showing 5 changed files with 270 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"metrics.go",
"name.go",
"parallel_io.go",
"parquet.go",
"parquet_sink_cloudstorage.go",
"retry.go",
"scheduled_changefeed.go",
Expand Down Expand Up @@ -137,6 +138,7 @@ go_library(
"//pkg/util/metric",
"//pkg/util/metric/aggmetric",
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down Expand Up @@ -187,6 +189,7 @@ go_test(
"main_test.go",
"name_test.go",
"nemeses_test.go",
"parquet_test.go",
"scheduled_changefeed_test.go",
"schema_registry_test.go",
"show_changefeed_jobs_test.go",
Expand Down Expand Up @@ -291,6 +294,7 @@ go_test(
"//pkg/util/log",
"//pkg/util/log/eventpb",
"//pkg/util/mon",
"//pkg/util/parquet",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
Expand Down
85 changes: 85 additions & 0 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"io"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
)

type parquetWriter struct {
inner *parquet.Writer
datumAlloc []tree.Datum
}

// newParquetWriterFromRow constructs a new parquet writer which outputs to
// the given sink. This function interprets the schema from the supplied row.
func newParquetWriterFromRow(
row cdcevent.Row, sink io.Writer, maxRowGroupSize int64,
) (*parquetWriter, error) {
columnNames := make([]string, len(row.ResultColumns())+1)
columnTypes := make([]*types.T, len(row.ResultColumns())+1)

idx := 0
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error {
columnNames[idx] = col.Name
columnTypes[idx] = col.Typ
idx += 1
return nil
}); err != nil {
return nil, err
}

columnNames[idx] = parquetCrdbEventTypeColName
columnTypes[idx] = types.String

schemaDef, err := parquet.NewSchema(columnNames, columnTypes)
if err != nil {
return nil, err
}

writer, err := parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize))
if err != nil {
return nil, err
}
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, len(columnNames))}, nil
}

// addData writes the updatedRow, adding the row's event type. There is no guarantee
// that data will be flushed after this function returns.
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error {
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil {
return err
}

return w.inner.AddRow(w.datumAlloc)
}

// Close closes the writer and flushes any buffered data to the sink.
func (w *parquetWriter) close() error {
return w.inner.Close()
}

// populateDatums writes the appropriate datums into the datumAlloc slice.
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error {
datums := datumAlloc[:0]

if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
datums = append(datums, d)
return nil
}); err != nil {
return err
}
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString())
return nil
}
36 changes: 26 additions & 10 deletions pkg/ccl/changefeedccl/parquet_sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,24 @@ var includeParquetTestMetadata = false
// defined below.
const parquetCrdbEventTypeColName string = "__crdb_event_type__"

type parquetEventType int

const (
parquetEventInsert string = "c"
parquetEventUpdate string = "u"
parquetEventDelete string = "d"
parquetEventInsert parquetEventType = iota
parquetEventUpdate
parquetEventDelete
)

var parquetEventTypeDatumStringMap = map[parquetEventType]*tree.DString{
parquetEventInsert: tree.NewDString("c"),
parquetEventUpdate: tree.NewDString("u"),
parquetEventDelete: tree.NewDString("d"),
}

func (e parquetEventType) DString() *tree.DString {
return parquetEventTypeDatumStringMap[e]
}

// We need a separate sink for parquet format because the parquet encoder has to
// write metadata to the parquet file (buffer) after each flush. This means that the
// parquet encoder should have access to the buffer object inside
Expand Down Expand Up @@ -178,13 +190,8 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
return err
}

if updatedRow.IsDeleted() {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventDelete)
} else if prevRow.IsInitialized() && !prevRow.IsDeleted() {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventUpdate)
} else {
parquetRow[parquetCrdbEventTypeColName] = []byte(parquetEventInsert)
}
et := getEventTypeDatum(updatedRow, prevRow)
parquetRow[parquetCrdbEventTypeColName] = []byte(et.DString().String())

if err = file.parquetCodec.parquetWriter.AddData(parquetRow); err != nil {
return err
Expand All @@ -204,6 +211,15 @@ func (parquetSink *parquetCloudStorageSink) EncodeAndEmitRow(
return nil
}

func getEventTypeDatum(updatedRow cdcevent.Row, prevRow cdcevent.Row) parquetEventType {
if updatedRow.IsDeleted() {
return parquetEventDelete
} else if prevRow.IsInitialized() && !prevRow.IsDeleted() {
return parquetEventUpdate
}
return parquetEventInsert
}

func makeParquetWriterWrapper(
ctx context.Context, row cdcevent.Row, buf *bytes.Buffer, compression parquet.CompressionCodec,
) (*parquetFileWriter, error) {
Expand Down
154 changes: 154 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package changefeedccl

import (
"context"
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/stretchr/testify/require"
)

func TestParquetRows(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Rangefeed reader can time out under stress.
skip.UnderStress(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
// TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs.
// Once it is fixed, this flag can be removed.
DefaultTestTenant: base.TestTenantDisabled,
})
defer s.Stopper().Stop(ctx)

maxRowGroupSize := int64(2)

sqlDB := sqlutils.MakeSQLRunner(db)

for _, tc := range []struct {
testName string
createTable string
inserts []string
}{
{
testName: "mixed",
createTable: `CREATE TABLE foo (
int32Col INT4 PRIMARY KEY,
varCharCol VARCHAR(16) ,
charCol CHAR(2),
tsCol TIMESTAMP ,
stringCol STRING ,
decimalCOl DECIMAL(12,2),
uuidCol UUID
)`,
inserts: []string{
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`,
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`,
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`,
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`,
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`,
},
},
} {
t.Run(tc.testName, func(t *testing.T) {
sqlDB.Exec(t, tc.createTable)
defer func() {
sqlDB.Exec(t, "DROP TABLE foo")
}()

popRow, cleanup, decoder := makeRangefeedReaderAndDecoder(t, s)
defer cleanup()

fileName := "TestParquetRows"
var writer *parquetWriter
var numCols int
f, err := os.CreateTemp(os.TempDir(), fileName)
require.NoError(t, err)

numRows := len(tc.inserts)
for _, insertStmt := range tc.inserts {
sqlDB.Exec(t, insertStmt)
}

datums := make([][]tree.Datum, numRows)
for i := 0; i < numRows; i++ {
v := popRow(t)

updatedRow, err := decoder.DecodeKV(
ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, cdcevent.CurrentRow, v.Timestamp(), false)
require.NoError(t, err)

prevRow, err := decoder.DecodeKV(
ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false)
require.NoError(t, err)

if writer == nil {
writer, err = newParquetWriterFromRow(updatedRow, f, maxRowGroupSize)
if err != nil {
t.Fatalf(err.Error())
}
numCols = len(updatedRow.ResultColumns()) + 1
}

err = writer.addData(updatedRow, prevRow)
require.NoError(t, err)

// Save a copy of the datums we wrote.
datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1)
err = populateDatums(updatedRow, prevRow, datumRow)
require.NoError(t, err)
datums[i] = datumRow
}

err = writer.close()
require.NoError(t, err)

parquet.ReadFileAndVerifyDatums(t, f.Name(), numRows, numCols, writer.inner, datums)
})
}
}

func makeRangefeedReaderAndDecoder(
t *testing.T, s serverutils.TestServerInterface,
) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) {
tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo")
popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc)
targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY,
TableID: tableDesc.GetID(),
FamilyName: "primary",
})
sqlExecCfg := s.ExecutorConfig().(sql.ExecutorConfig)
ctx := context.Background()
decoder, err := cdcevent.NewEventDecoder(ctx, &sqlExecCfg, targets, false, false)
require.NoError(t, err)
return popRow, cleanup, decoder
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ func (c *cloudFeed) appendParquetTestFeedMessages(

for k, v := range row {
if k == parquetCrdbEventTypeColName {
if string(v.([]byte)) == parquetEventDelete {
if string(v.([]byte)) == parquetEventDelete.DString().String() {
isDeleted = true
}
continue
Expand Down

0 comments on commit 1c002e8

Please sign in to comment.