Skip to content

Commit

Permalink
changefeedccl: add test coverage for parquet event types
Browse files Browse the repository at this point in the history
When using `format=parquet`, an additional column is produced to
indicate the type of operation corresponding to the row: create,
update, or delete. This change adds coverage for this in unit
testing.

Additionally, the test modified in this change is made more simple
by reducing the number of rows and different types because this
complexity is unnecessary as all types are tested within the
util/parquet package already.

Informs: cockroachdb#99028
Epic: CRDB-27372
Release note: None
Epic: None
  • Loading branch information
jayshrivastava committed Jun 21, 2023
1 parent 0cadd55 commit 8dde1c4
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions pkg/ccl/changefeedccl/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/parquet"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)
Expand All @@ -50,29 +51,39 @@ func TestParquetRows(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)

for _, tc := range []struct {
testName string
createTable string
inserts []string
testName string
createTable string
stmts []string
expectedDatumRows [][]tree.Datum
}{
{
testName: "mixed",
createTable: `CREATE TABLE foo (
int32Col INT4 PRIMARY KEY,
varCharCol VARCHAR(16) ,
charCol CHAR(2),
tsCol TIMESTAMP ,
stringCol STRING ,
decimalCOl DECIMAL(12,2),
uuidCol UUID
)`,
inserts: []string{
`INSERT INTO foo values (0, 'zero', 'CA', now(), 'oiwjfoijsdjif', 'inf', gen_random_uuid())`,
`INSERT INTO foo values (1, 'one', 'NY', now(), 'sdi9fu90d', '-1.90', gen_random_uuid())`,
`INSERT INTO foo values (2, 'two', 'WA', now(), 'sd9fid9fuj', '0.01', gen_random_uuid())`,
`INSERT INTO foo values (3, 'three', 'ON', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (4, 'four', 'NS', now(), '123123', '-11222221.2', gen_random_uuid())`,
`INSERT INTO foo values (5, 'five', 'BC', now(), 'sadklfhkdlsjf', '1.2', gen_random_uuid())`,
`INSERT INTO foo values (6, 'siz', 'AB', now(), '123123', '-11222221.2', gen_random_uuid())`,
stmts: []string{
`INSERT INTO foo VALUES (0, 'a1', '2fec7a4b-0a78-40ce-92e0-d1c0fac70436')`,
`INSERT INTO foo VALUES (1, 'b1', '0ce43188-e4a9-4b73-803b-a253abc57e6b')`,
`INSERT INTO foo VALUES (2, 'c1', '5a02bd48-ba64-4134-9199-844c1517f722')`,
`UPDATE foo SET stringCol = 'changed' WHERE int32Col = 1`,
`DELETE FROM foo WHERE int32Col = 0`,
},
expectedDatumRows: [][]tree.Datum{
{tree.NewDInt(0), tree.NewDString("a1"),
&tree.DUuid{uuid.FromStringOrNil("2fec7a4b-0a78-40ce-92e0-d1c0fac70436")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("b1"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(2), tree.NewDString("c1"),
&tree.DUuid{uuid.FromStringOrNil("5a02bd48-ba64-4134-9199-844c1517f722")},
parquetEventTypeDatumStringMap[parquetEventInsert]},
{tree.NewDInt(1), tree.NewDString("changed"),
&tree.DUuid{uuid.FromStringOrNil("0ce43188-e4a9-4b73-803b-a253abc57e6b")},
parquetEventTypeDatumStringMap[parquetEventUpdate]},
{tree.NewDInt(0), tree.DNull, tree.DNull, parquetEventTypeDatumStringMap[parquetEventDelete]},
},
},
} {
Expand All @@ -91,8 +102,8 @@ func TestParquetRows(t *testing.T) {
f, err := os.CreateTemp(os.TempDir(), fileName)
require.NoError(t, err)

numRows := len(tc.inserts)
for _, insertStmt := range tc.inserts {
numRows := len(tc.stmts)
for _, insertStmt := range tc.stmts {
sqlDB.Exec(t, insertStmt)
}

Expand Down Expand Up @@ -122,11 +133,7 @@ func TestParquetRows(t *testing.T) {
err = writer.addData(updatedRow, prevRow, hlc.Timestamp{}, hlc.Timestamp{})
require.NoError(t, err)

// Save a copy of the datums we wrote.
datumRow := make([]tree.Datum, writer.schemaDef.NumColumns())
err = populateDatums(updatedRow, prevRow, encodingOpts, hlc.Timestamp{}, hlc.Timestamp{}, datumRow)
require.NoError(t, err)
datums[i] = datumRow
datums[i] = tc.expectedDatumRows[i]
}

err = writer.close()
Expand Down

0 comments on commit 8dde1c4

Please sign in to comment.