-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This change adds the file `parquet.go` which contains helper functions to help create parquet writers and export data via `cdcevent.Row` structs. This change also adds tests to ensure rows are written to parquet files correctly. Epic: None Release note: None
- Loading branch information
1 parent
d3db655
commit e9d3b84
Showing
5 changed files
with
270 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package changefeedccl | ||
|
||
import ( | ||
"io" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" | ||
"github.com/cockroachdb/cockroach/pkg/sql/types" | ||
"github.com/cockroachdb/cockroach/pkg/util/parquet" | ||
) | ||
|
||
type parquetWriter struct { | ||
inner *parquet.Writer | ||
datumAlloc []tree.Datum | ||
} | ||
|
||
// newParquetWriterFromRow constructs a new parquet writer which outputs to | ||
// the given sink. This function interprets the schema from the supplied row. | ||
func newParquetWriterFromRow( | ||
row cdcevent.Row, sink io.Writer, maxRowGroupSize int64, | ||
) (*parquetWriter, error) { | ||
columnNames := make([]string, len(row.ResultColumns())+1) | ||
columnTypes := make([]*types.T, len(row.ResultColumns())+1) | ||
|
||
idx := 0 | ||
if err := row.ForEachColumn().Col(func(col cdcevent.ResultColumn) error { | ||
columnNames[idx] = col.Name | ||
columnTypes[idx] = col.Typ | ||
idx += 1 | ||
return nil | ||
}); err != nil { | ||
return nil, err | ||
} | ||
|
||
columnNames[idx] = parquetCrdbEventTypeColName | ||
columnTypes[idx] = types.String | ||
|
||
schemaDef, err := parquet.NewSchema(columnNames, columnTypes) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
writer, err := parquet.NewWriter(schemaDef, sink, parquet.WithMaxRowGroupLength(maxRowGroupSize)) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &parquetWriter{inner: writer, datumAlloc: make([]tree.Datum, len(columnNames))}, nil | ||
} | ||
|
||
// addData writes the updatedRow, adding the row's event type. There is no guarantee | ||
// that data will be flushed after this function returns. | ||
func (w *parquetWriter) addData(updatedRow cdcevent.Row, prevRow cdcevent.Row) error { | ||
if err := populateDatums(updatedRow, prevRow, w.datumAlloc); err != nil { | ||
return err | ||
} | ||
|
||
return w.inner.AddRow(w.datumAlloc) | ||
} | ||
|
||
// Close closes the writer and flushes any buffered data to the sink. | ||
func (w *parquetWriter) close() error { | ||
return w.inner.Close() | ||
} | ||
|
||
// populateDatums writes the appropriate datums into the datumAlloc slice. | ||
func populateDatums(updatedRow cdcevent.Row, prevRow cdcevent.Row, datumAlloc []tree.Datum) error { | ||
datums := datumAlloc[:0] | ||
|
||
if err := updatedRow.ForEachColumn().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { | ||
datums = append(datums, d) | ||
return nil | ||
}); err != nil { | ||
return err | ||
} | ||
datums = append(datums, getEventTypeDatum(updatedRow, prevRow).DString()) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
// Copyright 2022 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package changefeedccl | ||
|
||
import ( | ||
"context" | ||
"os" | ||
"testing" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/base" | ||
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" | ||
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" | ||
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" | ||
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
"github.com/cockroachdb/cockroach/pkg/kv/kvpb" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/sql" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/skip" | ||
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" | ||
"github.com/cockroachdb/cockroach/pkg/util/leaktest" | ||
"github.com/cockroachdb/cockroach/pkg/util/log" | ||
"github.com/cockroachdb/cockroach/pkg/util/parquet" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestParquetRows(t *testing.T) { | ||
defer leaktest.AfterTest(t)() | ||
defer log.Scope(t).Close(t) | ||
|
||
// Rangefeed reader can time out under stress. | ||
skip.UnderStress(t) | ||
|
||
ctx := context.Background() | ||
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ | ||
// TODO(#98816): cdctest.GetHydratedTableDescriptor does not work with tenant dbs. | ||
// Once it is fixed, this flag can be removed. | ||
DefaultTestTenant: base.TestTenantDisabled, | ||
}) | ||
defer s.Stopper().Stop(ctx) | ||
|
||
maxRowGroupSize := int64(2) | ||
|
||
sqlDB := sqlutils.MakeSQLRunner(db) | ||
|
||
for _, tc := range []struct { | ||
testName string | ||
createTable string | ||
inserts []string | ||
}{ | ||
{ | ||
testName: "mixed", | ||
createTable: `CREATE TABLE foo ( | ||
int32Col INT4 PRIMARY KEY, | ||
varCharCol VARCHAR(16) , | ||
charCol CHAR(2), | ||
tsCol TIMESTAMP , | ||
stringCol STRING , | ||
decimalCOl DECIMAL(12,2), | ||
uuidCol UUID | ||
)`, | ||
inserts: []string{ | ||
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`, | ||
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`, | ||
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`, | ||
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, | ||
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`, | ||
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`, | ||
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`, | ||
}, | ||
}, | ||
} { | ||
t.Run(tc.testName, func(t *testing.T) { | ||
sqlDB.Exec(t, tc.createTable) | ||
defer func() { | ||
sqlDB.Exec(t, "DROP TABLE foo") | ||
}() | ||
|
||
popRow, cleanup, decoder := makeRangefeedReaderAndDecoder(t, s) | ||
defer cleanup() | ||
|
||
fileName := "TestParquetRows" | ||
var writer *parquetWriter | ||
var numCols int | ||
f, err := os.CreateTemp(os.TempDir(), fileName) | ||
require.NoError(t, err) | ||
|
||
numRows := len(tc.inserts) | ||
for _, insertStmt := range tc.inserts { | ||
sqlDB.Exec(t, insertStmt) | ||
} | ||
|
||
datums := make([][]tree.Datum, numRows) | ||
for i := 0; i < numRows; i++ { | ||
v := popRow(t) | ||
|
||
updatedRow, err := decoder.DecodeKV( | ||
ctx, roachpb.KeyValue{Key: v.Key, Value: v.Value}, cdcevent.CurrentRow, v.Timestamp(), false) | ||
require.NoError(t, err) | ||
|
||
prevRow, err := decoder.DecodeKV( | ||
ctx, roachpb.KeyValue{Key: v.Key, Value: v.PrevValue}, cdcevent.PrevRow, v.Timestamp(), false) | ||
require.NoError(t, err) | ||
|
||
if writer == nil { | ||
writer, err = newParquetWriterFromRow(updatedRow, f, maxRowGroupSize) | ||
if err != nil { | ||
t.Fatalf(err.Error()) | ||
} | ||
numCols = len(updatedRow.ResultColumns()) + 1 | ||
} | ||
|
||
err = writer.addData(updatedRow, prevRow) | ||
require.NoError(t, err) | ||
|
||
// Save a copy of the datums we wrote. | ||
datumRow := make([]tree.Datum, len(updatedRow.ResultColumns())+1) | ||
err = populateDatums(updatedRow, prevRow, datumRow) | ||
require.NoError(t, err) | ||
datums[i] = datumRow | ||
} | ||
|
||
err = writer.close() | ||
require.NoError(t, err) | ||
|
||
parquet.ReadFileAndVerifyDatums(t, f.Name(), numRows, numCols, writer.inner, datums) | ||
}) | ||
} | ||
} | ||
|
||
func makeRangefeedReaderAndDecoder( | ||
t *testing.T, s serverutils.TestServerInterface, | ||
) (func(t *testing.T) *kvpb.RangeFeedValue, func(), cdcevent.Decoder) { | ||
tableDesc := cdctest.GetHydratedTableDescriptor(t, s.ExecutorConfig(), "foo") | ||
popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), tableDesc) | ||
targets := changefeedbase.Targets{} | ||
targets.Add(changefeedbase.Target{ | ||
Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, | ||
TableID: tableDesc.GetID(), | ||
FamilyName: "primary", | ||
}) | ||
sqlExecCfg := s.ExecutorConfig().(sql.ExecutorConfig) | ||
ctx := context.Background() | ||
decoder, err := cdcevent.NewEventDecoder(ctx, &sqlExecCfg, targets, false, false) | ||
require.NoError(t, err) | ||
return popRow, cleanup, decoder | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters