Skip to content

Commit

Permalink
Merge #76959
Browse files Browse the repository at this point in the history
76959: sql: fix bug in inverted index creation r=ajwerner,rhu713 a=stevendanna

When updating an inverted index, we calculate all index entries for
the old values and new values with the intent of issuing a Delete
for every old entry and a Put for every new entry. To try to save on
work, the updater looks for Del's that are followed by identical Put's
and removes both from the set of index entries to process.

This de-duplication is incorrect for temporary indexes used in the
mvcc-compatible index backfilling process. Consider the following:

```
t1: Write of k=1, v={"a": 1}
t2: CREATE INDEX (i2 is in BACKFILLING, i3 (temporary) is in DELETE_ONLY)
t3: i3 moves to DELETE_AND_WRITE_ONLY
t4: AddSSTable backfill into i2
t5: i2 moves to DELETE_ONLY
```

At this point, `i2` contains 1 inverted entry for k=1. and i3 is
empty. Now, say an update is issued:

```
t6: Write of k=1, v={"a": 1, "b": 2}
```

Index `i2` is in DELETE_ONLY. The delete path has no deduplication
logic, so we just see the delete for a key that looks something like

```
Del /Table/106/2/"a"/1/1/0
```

But, `i3` is in DELETE_AND_WRITE ONLY, so both the delete _and the
associated write_ is elided, so instead of:

```
Put /Table/106/3/"a"/1/1/0
Put (delete) /Table/106/3/"a"/1/1/0
Put /Table/106/3/"b"/2/1/0
```

it only sees a single Put:

```
Put /Table/106/3/"b"/2/1/0
```

Thus, we now have an empty `i2` and a single entry in `i3`.  When we
go to merge i3 into i2, we will get our single entry. But, we are
supposed to have two entries.  When we eventually try to validate `i2`,
it will fail because of the missing entry.

To account for this, we skip de-duplication on temporary indexes.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Feb 27, 2022
2 parents 1cd61cf + 9dcacce commit 6ff86bb
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 2 deletions.
12 changes: 11 additions & 1 deletion pkg/sql/backfill/mvcc_index_merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,20 @@ func (ibm *IndexBackfillMerger) Merge(
var nextStart roachpb.Key
err := ibm.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// For now just grab all of the destination KVs and merge the corresponding entries.
log.VInfof(ctx, 2, "merging batch [%s, %s) into index %d", startKey, endKey, destinationID)
kvs, err := txn.Scan(ctx, startKey, endKey, chunkSize)
if err != nil {
return err
}

var deletedCount int
txn.AddCommitTrigger(func(ctx context.Context) {
log.VInfof(ctx, 2, "merged batch of %d keys (%d deletes) (nextStart: %s) (commit timestamp: %s)",
len(kvs),
deletedCount,
nextStart,
txn.CommitTimestamp(),
)
})
if len(kvs) == 0 {
nextStart = nil
return nil
Expand Down Expand Up @@ -238,6 +247,7 @@ func (ibm *IndexBackfillMerger) Merge(
}

if deleted {
deletedCount++
wb.Del(mergedEntry.Key)
} else {
wb.Put(mergedEntry.Key, mergedEntry.Value)
Expand Down
62 changes: 62 additions & 0 deletions pkg/sql/mvcc_backfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -360,6 +362,66 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, x DECIMAL DEFAULT (DECIMAL '1.4')
}
}

func TestInvertedIndexMergeEveryStateWrite(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var chunkSize int64 = 1000
var initialRows = 10000
rowIdx := 0

params, _ := tests.CreateTestServerParams()
var writeMore func() error
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeDescTxn: func() error { return writeMore() },
BackfillChunkSize: chunkSize,
},
}

s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

rnd, _ := randutil.NewPseudoRand()
var mu syncutil.Mutex
writeMore = func() error {
mu.Lock()
defer mu.Unlock()

start := rowIdx
rowIdx += 20
for i := 1; i <= 20; i++ {
json, err := json.Random(20, rnd)
require.NoError(t, err)
_, err = sqlDB.Exec("UPSERT INTO t.test VALUES ($1, $2)", start+i, json.String())
require.NoError(t, err)
}
return nil
}

if _, err := sqlDB.Exec(fmt.Sprintf(`SET CLUSTER SETTING bulkio.index_backfill.merge_batch_size = %d`, chunkSize)); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (k INT PRIMARY KEY, v JSONB);
`); err != nil {
t.Fatal(err)
}

// Initial insert
for i := 0; i < initialRows; i++ {
json, err := json.Random(20, rnd)
require.NoError(t, err)
_, err = sqlDB.Exec("INSERT INTO t.test VALUES ($1, $2)", i, json.String())
require.NoError(t, err)
}

_, err := sqlDB.Exec("CREATE INVERTED INDEX invidx ON t.test (v)")
require.NoError(t, err)
}

// TestIndexBackfillMergeTxnRetry tests that the merge completes
// successfully even in the face of a transaction retry.
func TestIndexBackfillMergeTxnRetry(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/row/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (ru *Updater) UpdateRow(
return nil, err
}
}
if ru.Helper.Indexes[i].GetType() == descpb.IndexDescriptor_INVERTED {
if ru.Helper.Indexes[i].GetType() == descpb.IndexDescriptor_INVERTED && !ru.Helper.Indexes[i].IsTemporaryIndexForBackfill() {
// Deduplicate the keys we're adding and removing if we're updating an
// inverted index. For example, imagine a table with an inverted index on j:
//
Expand All @@ -335,6 +335,10 @@ func (ru *Updater) UpdateRow(
// want to delete the /foo/bar key and re-add it, that would be wasted work.
// So, we are going to remove keys from both the new and old index entry
// array if they're identical.
//
// We don't do this deduplication on temporary indexes used during the
// backfill because any deletes that are elided here are not elided on the
// newly added index when it is in DELETE_ONLY.
newIndexEntries := ru.newIndexEntries[i]
oldIndexEntries := ru.oldIndexEntries[i]
sort.Slice(oldIndexEntries, func(i, j int) bool {
Expand Down
42 changes: 42 additions & 0 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,48 @@ func TestRollbackOfAddingTable(t *testing.T) {
require.NoError(t, err)
}

func TestUniqueViolationsAreCaught(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

readyToMerge := make(chan struct{})
startMerge := make(chan struct{})
params, _ := tests.CreateTestServerParams()
params.Knobs = base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeTempIndexMerge: func() {
close(readyToMerge)
<-startMerge
},
},
}
server, sqlDB, _ := serverutils.StartServer(t, params)
defer server.Stopper().Stop(context.Background())

_, err := sqlDB.Exec(`CREATE DATABASE t;
CREATE TABLE t.test (pk INT PRIMARY KEY, v INT);
INSERT INTO t.test VALUES (1,1), (2,2), (3,3)
`)
require.NoError(t, err)
grp := ctxgroup.WithContext(context.Background())
grp.GoCtx(func(ctx context.Context) error {
_, err := sqlDB.Exec(`CREATE UNIQUE INDEX ON t.test (v)`)
return err
})

<-readyToMerge
// This conflicts with the new index but doesn't conflict with
// the online indexes. It should produce a failure on
// validation.
_, err = sqlDB.Exec(`INSERT INTO t.test VALUES (4, 1), (5, 2)`)
require.NoError(t, err)

close(startMerge)
err = grp.Wait()
require.Error(t, err)
}

// Test schema change backfills are not affected by various operations
// that run simultaneously.
func TestRaceWithBackfill(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/testdata/index_mutations/delete_preserving_update
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,13 @@ UPDATE tii SET b = ARRAY[1, 2, 2, NULL, 4, 4]
----
Scan /Table/109/{1-2}
Put /Table/109/1/1/0 -> /TUPLE/
Put (delete) /Table/109/2/NULL/1/0
Put (delete) /Table/109/2/1/1/0
Put (delete) /Table/109/2/2/1/0
Put (delete) /Table/109/2/3/1/0
Put /Table/109/2/NULL/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/1/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/2/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/4/1/0 -> /BYTES/0x0a0103

# ---------------------------------------------------------
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/testdata/index_mutations/delete_preserving_upsert
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ UPSERT INTO tii VALUES (1, ARRAY[1, 2, 2, NULL, 4, 4])
----
Scan /Table/109/1/1/0
Put /Table/109/1/1/0 -> /TUPLE/
Put (delete) /Table/109/2/NULL/1/0
Put (delete) /Table/109/2/1/1/0
Put (delete) /Table/109/2/2/1/0
Put (delete) /Table/109/2/3/1/0
Put /Table/109/2/NULL/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/1/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/2/1/0 -> /BYTES/0x0a0103
Put /Table/109/2/4/1/0 -> /BYTES/0x0a0103

# ---------------------------------------------------------
Expand Down

0 comments on commit 6ff86bb

Please sign in to comment.