Skip to content

Commit

Permalink
Merge pull request cockroachdb#91953 from cockroachdb/blathers/backpo…
Browse files Browse the repository at this point in the history
…rt-release-22.1-91870

release-22.1: changefeedccl: fix handling of deletes in multi-column families
  • Loading branch information
HonoreDB authored Nov 21, 2022
2 parents 6474322 + 2d91ef0 commit ba1d8e7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
21 changes: 14 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1876,9 +1876,9 @@ func TestChangefeedSingleColumnFamily(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)

// Table with 2 column families.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY most (a,b), FAMILY rest (c))`)
sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog', 'cat')`)
sqlDB.Exec(t, `INSERT INTO foo values (1, 'dollar', 'cent')`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING, FAMILY most (a,b), FAMILY rest (c, d))`)
sqlDB.Exec(t, `INSERT INTO foo(a,b,c) values (0, 'dog', 'cat')`)
sqlDB.Exec(t, `INSERT INTO foo(a,b,c) values (1, 'dollar', 'cent')`)

sqlDB.ExpectErr(t, `nosuchfamily`, `CREATE CHANGEFEED FOR foo FAMILY nosuchfamily`)

Expand All @@ -1892,18 +1892,25 @@ func TestChangefeedSingleColumnFamily(t *testing.T) {
fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`)
defer closeFeed(t, fooRest)
assertPayloads(t, fooRest, []string{
`foo.rest: [0]->{"after": {"c": "cat"}}`,
`foo.rest: [1]->{"after": {"c": "cent"}}`,
`foo.rest: [0]->{"after": {"c": "cat", "d": null}}`,
`foo.rest: [1]->{"after": {"c": "cent", "d": null}}`,
})

fooBoth := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most`)
defer closeFeed(t, fooBoth)
assertPayloads(t, fooBoth, []string{
`foo.most: [0]->{"after": {"a": 0, "b": "dog"}}`,
`foo.rest: [0]->{"after": {"c": "cat"}}`,
`foo.rest: [0]->{"after": {"c": "cat", "d": null}}`,
`foo.most: [1]->{"after": {"a": 1, "b": "dollar"}}`,
`foo.rest: [1]->{"after": {"c": "cent"}}`,
`foo.rest: [1]->{"after": {"c": "cent", "d": null}}`,
})

sqlDB.Exec(t, `DELETE FROM foo WHERE a = 0`)
assertPayloads(t, fooBoth, []string{
`foo.most: [0]->{"after": null}`,
`foo.rest: [0]->{"after": null}`,
})

}

t.Run(`sinkless`, sinklessTest(testFn))
Expand Down
10 changes: 7 additions & 3 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,10 +739,14 @@ func (rf *Fetcher) processKV(
}
}
if defaultColumnID == 0 {
return "", "", errors.Errorf("single entry value with no default column id")
if kv.Value.GetTag() == roachpb.ValueType_UNKNOWN {
// Tombstone for a secondary column family, nothing needs to be done.
} else {
return "", "", errors.Errorf("single entry value with no default column id")
}
} else {
prettyKey, prettyValue, err = rf.processValueSingle(ctx, table, defaultColumnID, kv, prettyKey)
}

prettyKey, prettyValue, err = rf.processValueSingle(ctx, table, defaultColumnID, kv, prettyKey)
}
}
if err != nil {
Expand Down

0 comments on commit ba1d8e7

Please sign in to comment.