From 2d91ef0283cb5d0c7bc04aa0c4454dab11256fbf Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Mon, 14 Nov 2022 20:53:57 +0000 Subject: [PATCH] changefeedccl: fix handling of deletes in multi-column families rowfetchers assume that a value with no column header must be for a single-column column family, for which the column header isn't needed. This doesn't reliably hold true in changefeeds, when such a value might also be a tombstone. It looks like the only reason we never ran into this before is that if a column family has only one column, the rowfetcher won't throw an error in this situation as that column is set as the default. Release note (bug fix): Fixed a bug causing changefeeds to fail when a value is deleted while running on a non-primary column family with multiple columns. --- pkg/ccl/changefeedccl/changefeed_test.go | 21 ++++++++++++++------- pkg/sql/row/fetcher.go | 10 +++++++--- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 91a4a0bdd9e6..8004f9df2f28 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1876,9 +1876,9 @@ func TestChangefeedSingleColumnFamily(t *testing.T) { sqlDB := sqlutils.MakeSQLRunner(db) // Table with 2 column families. - sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, FAMILY most (a,b), FAMILY rest (c))`) - sqlDB.Exec(t, `INSERT INTO foo values (0, 'dog', 'cat')`) - sqlDB.Exec(t, `INSERT INTO foo values (1, 'dollar', 'cent')`) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, c STRING, d STRING, FAMILY most (a,b), FAMILY rest (c, d))`) + sqlDB.Exec(t, `INSERT INTO foo(a,b,c) values (0, 'dog', 'cat')`) + sqlDB.Exec(t, `INSERT INTO foo(a,b,c) values (1, 'dollar', 'cent')`) sqlDB.ExpectErr(t, `nosuchfamily`, `CREATE CHANGEFEED FOR foo FAMILY nosuchfamily`) @@ -1892,18 +1892,25 @@ func TestChangefeedSingleColumnFamily(t *testing.T) { fooRest := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest`) defer closeFeed(t, fooRest) assertPayloads(t, fooRest, []string{ - `foo.rest: [0]->{"after": {"c": "cat"}}`, - `foo.rest: [1]->{"after": {"c": "cent"}}`, + `foo.rest: [0]->{"after": {"c": "cat", "d": null}}`, + `foo.rest: [1]->{"after": {"c": "cent", "d": null}}`, }) fooBoth := feed(t, f, `CREATE CHANGEFEED FOR foo FAMILY rest, foo FAMILY most`) defer closeFeed(t, fooBoth) assertPayloads(t, fooBoth, []string{ `foo.most: [0]->{"after": {"a": 0, "b": "dog"}}`, - `foo.rest: [0]->{"after": {"c": "cat"}}`, + `foo.rest: [0]->{"after": {"c": "cat", "d": null}}`, `foo.most: [1]->{"after": {"a": 1, "b": "dollar"}}`, - `foo.rest: [1]->{"after": {"c": "cent"}}`, + `foo.rest: [1]->{"after": {"c": "cent", "d": null}}`, }) + + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 0`) + assertPayloads(t, fooBoth, []string{ + `foo.most: [0]->{"after": null}`, + `foo.rest: [0]->{"after": null}`, + }) + } t.Run(`sinkless`, sinklessTest(testFn)) diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 6763ed39e8e3..bf0d2492a844 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -739,10 +739,14 @@ func (rf *Fetcher) processKV( } } if defaultColumnID == 0 { - return "", "", errors.Errorf("single entry value with no default column id") + if kv.Value.GetTag() == roachpb.ValueType_UNKNOWN { + // Tombstone for a secondary column family, nothing needs to be done. + } else { + return "", "", errors.Errorf("single entry value with no default column id") + } + } else { + prettyKey, prettyValue, err = rf.processValueSingle(ctx, table, defaultColumnID, kv, prettyKey) } - - prettyKey, prettyValue, err = rf.processValueSingle(ctx, table, defaultColumnID, kv, prettyKey) } } if err != nil {