Skip to content

Commit

Permalink
sql: add MERGING descriptor state
Browse files Browse the repository at this point in the history
This PR adds a MERGING index descriptor state. This state is similar
to DELETE_AND_WRITE_ONLY, but all writes are handled via Put, even in
cases where they would typically used CPut or InitPut.

This change is in support of the mvcc-compatible index backfiller.

As part of the backfill process, newly added indexes will be added in
a BACKFILLING state in which they see no writes or deletes.

However, this would allow for the following sequence of events. For
simplicity, the delete-preserving temporary index state transitions
have been elided:

    t0: `CREATE UNIQUE INDEX` creates "new index" in BACKFILLING state
         and a delete-preserving "temp index"
    t1: `INSERT INTO` with a = 1 (new index does not see this write)
    t2: Backfill starts at t1 (a=1 included in backfill, ends up in "new index")
    t3: UPDATE a = 2 where a = 1 (new index still in backfilling does not see these writes)
    t4: Backfilling completes
    t5: "new index" to DELETE-ONLY
    t6: "new index" to DELETE-AND-WRITE-ONLY
    t7: `INSERT INTO` into with a = 1
    t8: a = 2 merged from "temp index" into "new index"

Unfortunately, at t7 the user would encounter an erroneous duplicate
key violation when they attempt their insert because the attempted
CPut into "new index" would fail as there is an existing
value (a=1). The correct value of a=2 will not be written into "new
index" until a follow-up merge pass that occurs at t8

The MERGING phase is intended to solve this problem by allowing for
the following sequence:

    t0: `CREATE UNIQUE INDEX` creates "new index" in BACKFILLING state
         and a delete-preserving "temp index"
    t1: `INSERT INTO` with a = 1 (new index does not see this write)
    t2: Backfill starts at t1 (a=1 included in backfill, ends up in "new index")
    t3: UPDATE a = 2 where a = 1 (new index still in backfilling does not see these writes)
    t4: Backfilling completes
    t5: "new index" to DELETE-ONLY
    t5: "new index" to MERGING
    t7: `INSERT INTO` into with a = 1
    t8: a = 2 merged from "temp index" into "new index"
    t9: "new index" to DELETE-AND-WRITE-ONLY

Note that while in the MERGING state, the new index may accumulate
duplicate keys. This is OK because it will be checked for uniqueness
before being brought online.

The tests added here are nearly identical to the tests added for
DeletePreservingEncoding but have been written using a small
data-drive test that supports putting indexes into a mutated state.

Release note: None
  • Loading branch information
stevendanna committed Feb 1, 2022
1 parent 73b6819 commit 0ea9520
Show file tree
Hide file tree
Showing 9 changed files with 398 additions and 14 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ go_test(
"explain_bundle_test.go",
"explain_test.go",
"explain_tree_test.go",
"force_put_index_test.go",
"indexbackfiller_test.go",
"instrumentation_test.go",
"internal_test.go",
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,20 @@ message DescriptorMutation {
// TODO(ssd): This is currently unused and is being added to
// facilitate the new temporary-index-based backfilling process.
BACKFILLING = 3;

// Operations can use this invisible descriptor to implicitly
// write and delete entries. This is used by the MVCC-compatible
// index backfiller to ensure that unique indexes do not produce
// erroneous conflicts.
//
// Columns: Columns do not use this state. A column descriptor in
// this state is a programming error.
//
// Index: INSERT, UPDATE and DELETE treat this index like a
// DELETE_AND_WRITE_ONLY index, but use Put in instead of CPut or
// InitPut, effectively ignoring unique constraints.
MERGING = 4;

}
optional State state = 3 [(gogoproto.nullable) = false];

Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/catalog/table_elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type TableElementMaybeMutation interface {
// mutation in the backfilling state.
Backfilling() bool

// Mergin returns true iff the table element is in a
// mutation in the merging state.
Merging() bool

// Adding returns true iff the table element is in an add mutation.
Adding() bool

Expand Down Expand Up @@ -186,6 +190,24 @@ type Index interface {
NumCompositeColumns() int
GetCompositeColumnID(compositeColumnOrdinal int) descpb.ColumnID
UseDeletePreservingEncoding() bool
// ForcePut, if true, forces all writes to use Put rather than CPut or InitPut.
//
// Users of this options should take great care as it
// effectively mean unique constraints are not respected.
//
// Currently (2022-01-19) this two users: delete preserving
// indexes and merging indexes.
//
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
//
// New indexes will be checked for uniqueness at the end of the may miss
// updates during the backfilling process that would lead to CPut
// failures until the missed updates are merged into the
// index. Uniqueness for such indexes is checked by the schema changer
// before they are brought back online.
ForcePut() bool
}

// Column is an interface around the column descriptor types.
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/catalog/tabledesc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ func (w index) UseDeletePreservingEncoding() bool {
return w.desc.UseDeletePreservingEncoding && !w.maybeMutation.DeleteOnly()
}

// ForcePut returns true if writes to the index should only use Put (rather than
// CPut or InitPut). This is used by indexes currently being built by the
// MVCC-compliant index backfiller and the temporary indexes that support that
// process.
func (w index) ForcePut() bool {
return w.Merging() || w.desc.UseDeletePreservingEncoding
}

// partitioning is the backing struct for a catalog.Partitioning interface.
type partitioning struct {
desc *descpb.PartitioningDescriptor
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/tabledesc/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (mm maybeMutation) Backfilling() bool {
return mm.mutationState == descpb.DescriptorMutation_BACKFILLING
}

// Merging returns true iff the table element is a mutation in the
// merging state.
func (mm maybeMutation) Merging() bool {
return mm.mutationState == descpb.DescriptorMutation_MERGING
}

// Adding returns true iff the table element is in an add mutation.
func (mm maybeMutation) Adding() bool {
return mm.mutationDirection == descpb.DescriptorMutation_ADD
Expand Down
120 changes: 120 additions & 0 deletions pkg/sql/force_put_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql_test

import (
"context"
gosql "database/sql"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

// TestMerginIndexKVOps is another poor-man's logictest that assert
// that indexes in the MERGING state do not see CPuts or InitPuts.
func TestMergingIndexKVOps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

defer lease.TestingDisableTableLeases()()

params, _ := tests.CreateTestServerParams()
// Decrease the adopt loop interval so that retries happen quickly.
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec("CREATE DATABASE t; USE t")
require.NoError(t, err)
datadriven.Walk(t, testutils.TestDataPath(t, "index_mutations"), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "mutate-index":
if len(td.CmdArgs) < 3 {
td.Fatalf(t, "mutate-index requires at least an index name and a state")
}
tableName := td.CmdArgs[0].Key
name := td.CmdArgs[1].Key
stateStr := strings.ToUpper(td.CmdArgs[2].Key)
state := descpb.DescriptorMutation_State(descpb.DescriptorMutation_State_value[stateStr])

codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "t", tableName)
err = mutateIndexByName(kvDB, codec, tableDesc, name, nil, state)
require.NoError(t, err)
case "statement":
_, err := sqlDB.Exec(td.Input)
require.NoError(t, err)
case "kvtrace":
_, err := sqlDB.Exec("SET TRACING=on,kv")
require.NoError(t, err)
_, err = sqlDB.Exec(td.Input)
require.NoError(t, err)
_, err = sqlDB.Exec("SET TRACING=off")
require.NoError(t, err)
return getKVTrace(t, sqlDB)
default:
td.Fatalf(t, "unknown directive: %s", td.Cmd)
}
return ""
})
})
}

func getKVTrace(t *testing.T, db *gosql.DB) string {
// These are the same KVOps looked at by logictest.
allowedKVOpTypes := []string{
"CPut",
"Put",
"InitPut",
"Del",
"DelRange",
"ClearRange",
"Get",
"Scan",
}
var sb strings.Builder
sb.WriteString("SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE ")
for i, op := range allowedKVOpTypes {
if i != 0 {
sb.WriteString("OR ")
}
sb.WriteString(fmt.Sprintf("message like '%s%%'", op))
}
traceMessagesQuery := sb.String()

rows, err := db.Query(traceMessagesQuery)
require.NoError(t, err)

var trace strings.Builder
for rows.Next() {
var s string
require.NoError(t, rows.Scan(&s))
trace.WriteString(s)
trace.WriteRune('\n')
}
require.NoError(t, rows.Err())
return trace.String()
}
4 changes: 2 additions & 2 deletions pkg/sql/row/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (ri *Inserter) InsertRow(
for i := range entries {
e := &entries[i]

// We don't want to check any conflicts when trying to preserve deletes.
if ri.Helper.Indexes[idx].UseDeletePreservingEncoding() {
if ri.Helper.Indexes[idx].ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, b, &e.Key, &e.Value, traceKV)
} else {
putFn(ctx, b, &e.Key, &e.Value, traceKV)
Expand Down
20 changes: 8 additions & 12 deletions pkg/sql/row/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,8 @@ func (ru *Updater) UpdateRow(
continue
}

if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
if traceKV {
Expand Down Expand Up @@ -459,10 +457,8 @@ func (ru *Updater) UpdateRow(
)
}

if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
// In this case, the index now has a k/v that did not exist in the
Expand Down Expand Up @@ -497,7 +493,8 @@ func (ru *Updater) UpdateRow(
// and the old row values do not match the partial index
// predicate.
newEntry := &newEntries[newIdx]
if index.UseDeletePreservingEncoding() {
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
if traceKV {
Expand All @@ -518,9 +515,8 @@ func (ru *Updater) UpdateRow(
}
// We're adding all of the inverted index entries from the row being updated.
for j := range ru.newIndexEntries[i] {
if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of index
// writes during backfill, thus we can blindly put values into them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV)
} else {
insertInvertedPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV)
Expand Down
Loading

0 comments on commit 0ea9520

Please sign in to comment.