From 85cfb844b07da0bb777cd050b4fe295ef4873428 Mon Sep 17 00:00:00 2001 From: Aerin Freilich Date: Fri, 6 Dec 2024 11:40:14 -0500 Subject: [PATCH] changefeedccl: remove duplicate columns from parquet output Previously, in changefeeds using CDC queries and parquet, we would see duplicate columns in the output when using a user defined primary key. This was confusing and unexpected. This change has us deduplicating columns in the output when writing to parquet. Epic: none Fixes: #124434 Release note (bug fix): Removes duplicate columns in the parquet output from changefeeds using cdc queries. --- pkg/ccl/changefeedccl/encoder.go | 2 +- pkg/ccl/changefeedccl/parquet.go | 31 +++++++++++- pkg/ccl/changefeedccl/parquet_test.go | 66 ++++++++++++++++++++++++++ pkg/ccl/changefeedccl/testfeed_test.go | 30 ++++++++++++ 4 files changed, 126 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/encoder.go b/pkg/ccl/changefeedccl/encoder.go index 905acbf6f696..0aad928b62f8 100644 --- a/pkg/ccl/changefeedccl/encoder.go +++ b/pkg/ccl/changefeedccl/encoder.go @@ -22,7 +22,7 @@ type Encoder interface { // `TableDescriptor`, but only the primary key fields will be used. The // returned bytes are only valid until the next call to Encode*. EncodeKey(context.Context, cdcevent.Row) ([]byte, error) - // EncodeValue encodes the primary key of the given row. The columns of the + // EncodeValue encodes the values of the given row. The columns of the // datums are expected to match 1:1 with the `Columns` field of the // `TableDescriptor`. The returned bytes are only valid until the next call // to Encode*. diff --git a/pkg/ccl/changefeedccl/parquet.go b/pkg/ccl/changefeedccl/parquet.go index e83425b5354a..c139148fcc63 100644 --- a/pkg/ccl/changefeedccl/parquet.go +++ b/pkg/ccl/changefeedccl/parquet.go @@ -52,9 +52,18 @@ func newParquetSchemaDefintion( ) (*parquet.SchemaDefinition, error) { var columnNames []string var columnTypes []*types.T + seenColumnNames := make(map[string]bool) numCols := 0 if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error { + if _, ok := seenColumnNames[col.Name]; ok { + // If a column is both the primary key and one of the selected columns in + // a cdc query, we do not want to duplicate it in the parquet output. We + // deduplicate that here and where we populate the datums (see + // populateDatums). + return nil + } + seenColumnNames[col.Name] = true columnNames = append(columnNames, col.Name) columnTypes = append(columnTypes, col.Typ) numCols += 1 @@ -164,7 +173,16 @@ func (w *parquetWriter) populateDatums( ) error { datums := w.datumAlloc[:0] - if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error { + seenColumnNames := make(map[string]bool) + if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error { + if _, ok := seenColumnNames[col.Name]; ok { + // If a column is both the primary key and one of the selected columns in + // a cdc query, we do not want to duplicate it in the parquet output. We + // deduplicate that here and in the schema definition (see + // newParquetSchemaDefintion). + return nil + } + seenColumnNames[col.Name] = true datums = append(datums, d) return nil }); err != nil { @@ -229,7 +247,7 @@ func addParquetTestMetadata( ) ([]parquet.Option, error) { // NB: Order matters. When iterating using ForAllColumns, which is used when // writing datums and defining the schema, the order of columns usually - // matches the underlying table. If a composite keys defined, the order in + // matches the underlying table. If a composite key is defined, the order in // ForEachKeyColumn may not match. In tests, we want to use the latter // order when printing the keys. keyCols := map[string]int{} @@ -262,7 +280,16 @@ func addParquetTestMetadata( // cdcevent.ResultColumn. The Ordinal() method may return an invalid // number for virtual columns. idx := 0 + seenColumnNames := make(map[string]bool) if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error { + if _, ok := seenColumnNames[col.Name]; ok { + // Since we deduplicate columns with the same name in the parquet output, + // we should not even increment our index for columns we've seen before. + // Since we have already seen this column name we have also already found + // the relevant index. + return nil + } + seenColumnNames[col.Name] = true if _, colIsInKey := keyCols[col.Name]; colIsInKey { keyCols[col.Name] = idx } diff --git a/pkg/ccl/changefeedccl/parquet_test.go b/pkg/ccl/changefeedccl/parquet_test.go index 6d49074aca92..c5731e526202 100644 --- a/pkg/ccl/changefeedccl/parquet_test.go +++ b/pkg/ccl/changefeedccl/parquet_test.go @@ -7,6 +7,7 @@ package changefeedccl import ( "context" + "fmt" "math/rand" "os" "slices" @@ -307,3 +308,68 @@ func TestParquetResolvedTimestamps(t *testing.T) { cdcTest(t, testFn, feedTestForceSink("cloudstorage")) } + +func TestParquetDuplicateColumns(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO t VALUES (1)`) + foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`) + defer closeFeed(t, foo) + + // Test that this should not fail with this error: + // `Number of datums in parquet output row doesn't match number of distinct + // columns, Expected: %d, Recieved: %d`. + assertPayloads(t, foo, []string{ + `t: [1]->{"id": 1}`, + }) + } + + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) +} + +func TestParquetSpecifiedDuplicateQueryColumns(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY, a INT8)`) + sqlDB.Exec(t, `INSERT INTO t VALUES (1, 9)`) + foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT a, a, id, id FROM t`) + defer closeFeed(t, foo) + + // Test that this should not fail with this error: + // `Number of datums in parquet output row doesn't match number of distinct + // columns, Expected: %d, Recieved: %d`. + assertPayloads(t, foo, []string{ + `t: [1]->{"a": 9, "a_1": 9, "id": 1, "id_1": 1}`, + }) + } + + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) +} + +func TestParquetNoUserDefinedPrimaryKey(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE t (id INT8)`) + var rowId int + sqlDB.QueryRow(t, `INSERT INTO t VALUES (0) RETURNING rowid`).Scan(&rowId) + foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`) + defer closeFeed(t, foo) + + // The parquet output always includes the primary key. + assertPayloads(t, foo, []string{ + fmt.Sprintf(`t: [%d]->{"id": 0}`, rowId), + }) + } + + cdcTest(t, testFn, feedTestForceSink("cloudstorage")) +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index a0be728e0daf..f573ef705923 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -1318,10 +1318,40 @@ func (c *cloudFeed) appendParquetTestFeedMessages( } metaColumnNameSet := extractMetaColumns(columnNameSet) + // Count the number of distinct columns in the output so we can later verify + // that it matches the number of datums we output. Duplicating them in the + // output or missing one of these keys in the parquet output would be + // unexpected. + seenColumnNames := make(map[string]struct{}) + for _, colName := range primaryKeysNamesOrdered { + seenColumnNames[colName] = struct{}{} + } + for _, colName := range valueColumnNamesOrdered { + // No key should appear twice within the primary key list or within the + // value columns. The role of this check is to verify that a key that + // appears as both a primary and value key is not duplicated in the parquet + // output. + if _, ok := seenColumnNames[colName]; !ok { + seenColumnNames[colName] = struct{}{} + } + } + for _, row := range datums { rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet)) keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered)) + numDatumsInRow := len(row) + numDistinctColumns := len(seenColumnNames) + if numDatumsInRow != numDistinctColumns { + // If a column is duplicated in the parquet output, we would catch that in + // tests by throwing this error. + return errors.Newf( + `Number of datums in parquet output row doesn't match number of distinct columns, Expected: %d, Recieved: %d`, + numDistinctColumns, + numDatumsInRow, + ) + } + for _, primaryKeyColumnName := range primaryKeysNamesOrdered { datum := row[primaryKeyColumnSet[primaryKeyColumnName]] j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)