Skip to content

Commit

Permalink
Merge #136718
Browse files Browse the repository at this point in the history
136718: changefeedccl: remove duplicate columns from parquet output r=asg0451 a=aerfrei

Previously, in changefeeds using CDC queries and parquet,
we would see duplicate columns in the output when using a
user defined primary key. This was confusing and unexpected.
This change has us deduplicating columns in the output when
writing to parquet.

Epic: none

Fixes: #124434

Release note (bug fix): Removes duplicate columns in the parquet
output from changefeeds using cdc queries.

Co-authored-by: Aerin Freilich <[email protected]>
  • Loading branch information
craig[bot] and aerfrei committed Dec 16, 2024
2 parents 5d6498d + 85cfb84 commit effc6b2
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 3 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type Encoder interface {
// `TableDescriptor`, but only the primary key fields will be used. The
// returned bytes are only valid until the next call to Encode*.
EncodeKey(context.Context, cdcevent.Row) ([]byte, error)
// EncodeValue encodes the primary key of the given row. The columns of the
// EncodeValue encodes the values of the given row. The columns of the
// datums are expected to match 1:1 with the `Columns` field of the
// `TableDescriptor`. The returned bytes are only valid until the next call
// to Encode*.
Expand Down
31 changes: 29 additions & 2 deletions pkg/ccl/changefeedccl/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ func newParquetSchemaDefintion(
) (*parquet.SchemaDefinition, error) {
var columnNames []string
var columnTypes []*types.T
seenColumnNames := make(map[string]bool)

numCols := 0
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// If a column is both the primary key and one of the selected columns in
// a cdc query, we do not want to duplicate it in the parquet output. We
// deduplicate that here and where we populate the datums (see
// populateDatums).
return nil
}
seenColumnNames[col.Name] = true
columnNames = append(columnNames, col.Name)
columnTypes = append(columnTypes, col.Typ)
numCols += 1
Expand Down Expand Up @@ -164,7 +173,16 @@ func (w *parquetWriter) populateDatums(
) error {
datums := w.datumAlloc[:0]

if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, _ cdcevent.ResultColumn) error {
seenColumnNames := make(map[string]bool)
if err := updatedRow.ForAllColumns().Datum(func(d tree.Datum, col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// If a column is both the primary key and one of the selected columns in
// a cdc query, we do not want to duplicate it in the parquet output. We
// deduplicate that here and in the schema definition (see
// newParquetSchemaDefintion).
return nil
}
seenColumnNames[col.Name] = true
datums = append(datums, d)
return nil
}); err != nil {
Expand Down Expand Up @@ -229,7 +247,7 @@ func addParquetTestMetadata(
) ([]parquet.Option, error) {
// NB: Order matters. When iterating using ForAllColumns, which is used when
// writing datums and defining the schema, the order of columns usually
// matches the underlying table. If a composite keys defined, the order in
// matches the underlying table. If a composite key is defined, the order in
// ForEachKeyColumn may not match. In tests, we want to use the latter
// order when printing the keys.
keyCols := map[string]int{}
Expand Down Expand Up @@ -262,7 +280,16 @@ func addParquetTestMetadata(
// cdcevent.ResultColumn. The Ordinal() method may return an invalid
// number for virtual columns.
idx := 0
seenColumnNames := make(map[string]bool)
if err := row.ForAllColumns().Col(func(col cdcevent.ResultColumn) error {
if _, ok := seenColumnNames[col.Name]; ok {
// Since we deduplicate columns with the same name in the parquet output,
// we should not even increment our index for columns we've seen before.
// Since we have already seen this column name we have also already found
// the relevant index.
return nil
}
seenColumnNames[col.Name] = true
if _, colIsInKey := keyCols[col.Name]; colIsInKey {
keyCols[col.Name] = idx
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package changefeedccl

import (
"context"
"fmt"
"math/rand"
"os"
"slices"
Expand Down Expand Up @@ -307,3 +308,68 @@ func TestParquetResolvedTimestamps(t *testing.T) {

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetDuplicateColumns(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO t VALUES (1)`)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`)
defer closeFeed(t, foo)

// Test that this should not fail with this error:
// `Number of datums in parquet output row doesn't match number of distinct
// columns, Expected: %d, Recieved: %d`.
assertPayloads(t, foo, []string{
`t: [1]->{"id": 1}`,
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetSpecifiedDuplicateQueryColumns(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8 PRIMARY KEY, a INT8)`)
sqlDB.Exec(t, `INSERT INTO t VALUES (1, 9)`)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT a, a, id, id FROM t`)
defer closeFeed(t, foo)

// Test that this should not fail with this error:
// `Number of datums in parquet output row doesn't match number of distinct
// columns, Expected: %d, Recieved: %d`.
assertPayloads(t, foo, []string{
`t: [1]->{"a": 9, "a_1": 9, "id": 1, "id_1": 1}`,
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}

func TestParquetNoUserDefinedPrimaryKey(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE t (id INT8)`)
var rowId int
sqlDB.QueryRow(t, `INSERT INTO t VALUES (0) RETURNING rowid`).Scan(&rowId)
foo := feed(t, f, `CREATE CHANGEFEED WITH format=parquet,initial_scan='only' AS SELECT id FROM t`)
defer closeFeed(t, foo)

// The parquet output always includes the primary key.
assertPayloads(t, foo, []string{
fmt.Sprintf(`t: [%d]->{"id": 0}`, rowId),
})
}

cdcTest(t, testFn, feedTestForceSink("cloudstorage"))
}
30 changes: 30 additions & 0 deletions pkg/ccl/changefeedccl/testfeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1318,10 +1318,40 @@ func (c *cloudFeed) appendParquetTestFeedMessages(
}
metaColumnNameSet := extractMetaColumns(columnNameSet)

// Count the number of distinct columns in the output so we can later verify
// that it matches the number of datums we output. Duplicating them in the
// output or missing one of these keys in the parquet output would be
// unexpected.
seenColumnNames := make(map[string]struct{})
for _, colName := range primaryKeysNamesOrdered {
seenColumnNames[colName] = struct{}{}
}
for _, colName := range valueColumnNamesOrdered {
// No key should appear twice within the primary key list or within the
// value columns. The role of this check is to verify that a key that
// appears as both a primary and value key is not duplicated in the parquet
// output.
if _, ok := seenColumnNames[colName]; !ok {
seenColumnNames[colName] = struct{}{}
}
}

for _, row := range datums {
rowJSONBuilder := json.NewObjectBuilder(len(valueColumnNamesOrdered) - len(metaColumnNameSet))
keyJSONBuilder := json.NewArrayBuilder(len(primaryKeysNamesOrdered))

numDatumsInRow := len(row)
numDistinctColumns := len(seenColumnNames)
if numDatumsInRow != numDistinctColumns {
// If a column is duplicated in the parquet output, we would catch that in
// tests by throwing this error.
return errors.Newf(
`Number of datums in parquet output row doesn't match number of distinct columns, Expected: %d, Recieved: %d`,
numDistinctColumns,
numDatumsInRow,
)
}

for _, primaryKeyColumnName := range primaryKeysNamesOrdered {
datum := row[primaryKeyColumnSet[primaryKeyColumnName]]
j, err := tree.AsJSON(datum, sessiondatapb.DataConversionConfig{}, time.UTC)
Expand Down

0 comments on commit effc6b2

Please sign in to comment.