Skip to content

Commit

Permalink
changefeedccl: remove duplicate columns from parquet output
Browse files Browse the repository at this point in the history
Previously, in changefeeds using CDC queries and parquet,
we would see duplicate columns in the output when using a
user defined primary key. This was confusing and unexpected.
This change has us deduplicating columns in the output when
writing to parquet.

Epic: none

Fixes: #124434

Release note (bug fix): Removes duplicate columns in the parquet
output from changefeeds using cdc queries.
  • Loading branch information
aerfrei committed Dec 10, 2024
1 parent 6f2b16c commit 85cfb84
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Encoder interface {
// `TableDescriptor`, but only the primary key fields will be used. The
// returned bytes are only valid until the next call to Encode*.
EncodeKey(context.Context, cdcevent.Row) ([]byte, error)
// EncodeValue encodes the primary key of the given row. The columns of the
// EncodeValue encodes the values of the given row. The columns of the
// datums are expected to match 1:1 with the `Columns` field of the
// `TableDescriptor`. The returned bytes are only valid until the next call
// to Encode*.
Expand Down
31 changes: 29 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ func newParquetSchemaDefintion(
) (*parquet.SchemaDefinition, error) {
var columnNames []string
var columnTypes []*types.T
seenColumnNames := make(map[string]bool)

numCols := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// If a column is both the primary key and one of the selected columns in
// a cdc query, we do not want to duplicate it in the parquet output. We
// deduplicate that here and where we populate the datums (see
// populateDatums).
return nil
}
seenColumnNames[col.Name] = true
columnNames = append(columnNames, col.Name)
columnTypes = append(columnTypes, col.Typ)
numCols += 1
Expand Down Expand Up @@ -164,7 +173,16 @@ func (w *parquetWriter) populateDatums(
) error {
datums := w.datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
seenColumnNames := make(map[string]bool)
if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// If a column is both the primary key and one of the selected columns in
// a cdc query, we do not want to duplicate it in the parquet output. We
// deduplicate that here and in the schema definition (see
// newParquetSchemaDefintion).
return nil
}
seenColumnNames[col.Name] = true
datums = append(datums, d)
return nil
}); err != nil {
Expand Down Expand Up @@ -229,7 +247,7 @@ func addParquetTestMetadata(
) ([]parquet.Option, error) {
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
// matches the underlying table. If a composite key is defined, the order in
// ForEachKeyColumn may not match. In tests, we want to use the latter
// order when printing the keys.
keyCols := map[string]int{}
Expand Down Expand Up @@ -262,7 +280,16 @@ func addParquetTestMetadata(
// cdcevent.ResultColumn. The Ordinal() method may return an invalid
// number for virtual columns.
idx := 0
seenColumnNames := make(map[string]bool)
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// Since we deduplicate columns with the same name in the parquet output,
// we should not even increment our index for columns we've seen before.
// Since we have already seen this column name we have also already found
// the relevant index.
return nil
}
seenColumnNames[col.Name] = true
if _, colIsInKey := keyCols[col.Name]; colIsInKey {
keyCols[col.Name] = idx
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"context"
"fmt"
"math/rand"
"os"
"slices"
Expand Down Expand Up @@ -307,3 +308,68 @@ func TestParquetResolvedTimestamps(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetDuplicateColumns(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO t VALUES (1)`)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`)
defer closeFeed(t, foo)

// Test that this should not fail with this error:
// `Number of datums in parquet output row doesn't match number of distinct
// columns, Expected: %d, Recieved: %d`.
assertPayloads(t, foo, []string{
`t: [1]->{"id": 1}`,
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetSpecifiedDuplicateQueryColumns(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY, a INT8)`)
sqlDB.Exec(t, `INSERT INTO t VALUES (1, 9)`)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT a, a, id, id FROM t`)
defer closeFeed(t, foo)

// Test that this should not fail with this error:
// `Number of datums in parquet output row doesn't match number of distinct
// columns, Expected: %d, Recieved: %d`.
assertPayloads(t, foo, []string{
`t: [1]->{"a": 9, "a_1": 9, "id": 1, "id_1": 1}`,
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetNoUserDefinedPrimaryKey(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8)`)
var rowId int
sqlDB.QueryRow(t, `INSERT INTO t VALUES (0) RETURNING rowid`).Scan(&rowId)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`)
defer closeFeed(t, foo)

// The parquet output always includes the primary key.
assertPayloads(t, foo, []string{
fmt.Sprintf(`t: [%d]->{"id": 0}`, rowId),
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}
30 changes: 30 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,10 +1318,40 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
}
metaColumnNameSet := extractMetaColumns(columnNameSet)

// Count the number of distinct columns in the output so we can later verify
// that it matches the number of datums we output. Duplicating them in the
// output or missing one of these keys in the parquet output would be
// unexpected.
seenColumnNames := make(map[string]struct{})
for _, colName := range primaryKeysNamesOrdered {
seenColumnNames[colName] = struct{}{}
}
for _, colName := range valueColumnNamesOrdered {
// No key should appear twice within the primary key list or within the
// value columns. The role of this check is to verify that a key that
// appears as both a primary and value key is not duplicated in the parquet
// output.
if _, ok := seenColumnNames[colName]; !ok {
seenColumnNames[colName] = struct{}{}
}
}

for _, row := range datums {
rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet))
keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered))

numDatumsInRow := len(row)
numDistinctColumns := len(seenColumnNames)
if numDatumsInRow != numDistinctColumns {
// If a column is duplicated in the parquet output, we would catch that in
// tests by throwing this error.
return errors.Newf(
`Number of datums in parquet output row doesn't match number of distinct columns, Expected: %d, Recieved: %d`,
numDistinctColumns,
numDatumsInRow,
)
}

for _, primaryKeyColumnName := range primaryKeysNamesOrdered {
datum := row[primaryKeyColumnSet[primaryKeyColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
Expand Down

0 comments on commit 85cfb84

Please sign in to comment.