Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add MERGING descriptor state #75663

Merged
merged 1 commit into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ go_test(
"explain_bundle_test.go",
"explain_test.go",
"explain_tree_test.go",
"force_put_index_test.go",
"indexbackfiller_test.go",
"instrumentation_test.go",
"internal_test.go",
Expand Down
14 changes: 14 additions & 0 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,20 @@ message DescriptorMutation {
// TODO(ssd): This is currently unused and is being added to
// facilitate the new temporary-index-based backfilling process.
BACKFILLING = 3;

// Operations can use this invisible descriptor to implicitly
// write and delete entries. This is used by the MVCC-compatible
// index backfiller to ensure that unique indexes do not produce
// erroneous conflicts.
//
// Columns: Columns do not use this state. A column descriptor in
// this state is a programming error.
//
// Index: INSERT, UPDATE and DELETE treat this index like a
// DELETE_AND_WRITE_ONLY index, but use Put in instead of CPut or
// InitPut, effectively ignoring unique constraints.
MERGING = 4;

}
optional State state = 3 [(gogoproto.nullable) = false];

Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/catalog/table_elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type TableElementMaybeMutation interface {
// mutation in the backfilling state.
Backfilling() bool

// Mergin returns true iff the table element is in a
// mutation in the merging state.
Merging() bool

// Adding returns true iff the table element is in an add mutation.
Adding() bool

Expand Down Expand Up @@ -186,6 +190,24 @@ type Index interface {
NumCompositeColumns() int
GetCompositeColumnID(compositeColumnOrdinal int) descpb.ColumnID
UseDeletePreservingEncoding() bool
// ForcePut, if true, forces all writes to use Put rather than CPut or InitPut.
//
// Users of this options should take great care as it
// effectively mean unique constraints are not respected.
//
// Currently (2022-01-19) this two users: delete preserving
// indexes and merging indexes.
//
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
//
// New indexes may miss updates during the backfilling process
// that would lead to CPut failures until the missed updates
// are merged into the index. Uniqueness for such indexes is
// checked by the schema changer before they are brought back
// online.
ForcePut() bool
}

// Column is an interface around the column descriptor types.
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/catalog/tabledesc/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,14 @@ func (w index) UseDeletePreservingEncoding() bool {
return w.desc.UseDeletePreservingEncoding && !w.maybeMutation.DeleteOnly()
}

// ForcePut returns true if writes to the index should only use Put (rather than
// CPut or InitPut). This is used by indexes currently being built by the
// MVCC-compliant index backfiller and the temporary indexes that support that
// process.
func (w index) ForcePut() bool {
return w.Merging() || w.desc.UseDeletePreservingEncoding
}

// partitioning is the backing struct for a catalog.Partitioning interface.
type partitioning struct {
desc *descpb.PartitioningDescriptor
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/tabledesc/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func (mm maybeMutation) Backfilling() bool {
return mm.mutationState == descpb.DescriptorMutation_BACKFILLING
}

// Merging returns true iff the table element is a mutation in the
// merging state.
func (mm maybeMutation) Merging() bool {
return mm.mutationState == descpb.DescriptorMutation_MERGING
}

// Adding returns true iff the table element is in an add mutation.
func (mm maybeMutation) Adding() bool {
return mm.mutationDirection == descpb.DescriptorMutation_ADD
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@ func validateMutation(m *descpb.DescriptorMutation) error {
"mutation in state %s, direction %s, and no column/index descriptor",
errors.Safe(m.State), errors.Safe(m.Direction))
}

switch m.State {
case descpb.DescriptorMutation_BACKFILLING:
if _, ok := m.Descriptor_.(*descpb.DescriptorMutation_Index); !ok {
return errors.AssertionFailedf("non-index mutation in state %s", errors.Safe(m.State))
}
case descpb.DescriptorMutation_MERGING:
if _, ok := m.Descriptor_.(*descpb.DescriptorMutation_Index); !ok {
return errors.AssertionFailedf("non-index mutation in state %s", errors.Safe(m.State))
}
}

return nil
}

Expand Down
46 changes: 46 additions & 0 deletions pkg/sql/catalog/tabledesc/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1701,6 +1701,52 @@ func TestValidateTableDesc(t *testing.T) {
},
NextColumnID: 2,
}},
{`non-index mutation in state BACKFILLING`,
descpb.TableDescriptor{
ID: 2,
ParentID: 1,
Name: "foo",
FormatVersion: descpb.InterleavedFormatVersion,
Columns: []descpb.ColumnDescriptor{
{ID: 1, Name: "c1"},
},
Mutations: []descpb.DescriptorMutation{
{
Descriptor_: &descpb.DescriptorMutation_Column{
Column: &descpb.ColumnDescriptor{
ID: 2,
Name: "c2",
},
},
Direction: descpb.DescriptorMutation_ADD,
State: descpb.DescriptorMutation_BACKFILLING,
},
},
NextColumnID: 3,
}},
{`non-index mutation in state MERGING`,
descpb.TableDescriptor{
ID: 2,
ParentID: 1,
Name: "foo",
FormatVersion: descpb.InterleavedFormatVersion,
Columns: []descpb.ColumnDescriptor{
{ID: 1, Name: "c1"},
},
Mutations: []descpb.DescriptorMutation{
{
Descriptor_: &descpb.DescriptorMutation_Column{
Column: &descpb.ColumnDescriptor{
ID: 2,
Name: "c2",
},
},
Direction: descpb.DescriptorMutation_ADD,
State: descpb.DescriptorMutation_MERGING,
},
},
NextColumnID: 3,
}},
}
for i, d := range testData {
t.Run(d.err, func(t *testing.T) {
Expand Down
120 changes: 120 additions & 0 deletions pkg/sql/force_put_index_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package sql_test

import (
"context"
gosql "database/sql"
"fmt"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/require"
)

// TestMerginIndexKVOps is another poor-man's logictest that assert
// that indexes in the MERGING state do not see CPuts or InitPuts.
func TestMergingIndexKVOps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

defer lease.TestingDisableTableLeases()()

params, _ := tests.CreateTestServerParams()
// Decrease the adopt loop interval so that retries happen quickly.
params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

_, err := sqlDB.Exec("CREATE DATABASE t; USE t")
require.NoError(t, err)
datadriven.Walk(t, testutils.TestDataPath(t, "index_mutations"), func(t *testing.T, path string) {
datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string {
switch td.Cmd {
case "mutate-index":
if len(td.CmdArgs) < 3 {
td.Fatalf(t, "mutate-index requires at least an index name and a state")
}
tableName := td.CmdArgs[0].Key
name := td.CmdArgs[1].Key
stateStr := strings.ToUpper(td.CmdArgs[2].Key)
state := descpb.DescriptorMutation_State(descpb.DescriptorMutation_State_value[stateStr])

codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec
tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "t", tableName)
err = mutateIndexByName(kvDB, codec, tableDesc, name, nil, state)
require.NoError(t, err)
case "statement":
_, err := sqlDB.Exec(td.Input)
require.NoError(t, err)
case "kvtrace":
_, err := sqlDB.Exec("SET TRACING=on,kv")
require.NoError(t, err)
_, err = sqlDB.Exec(td.Input)
require.NoError(t, err)
_, err = sqlDB.Exec("SET TRACING=off")
require.NoError(t, err)
return getKVTrace(t, sqlDB)
default:
td.Fatalf(t, "unknown directive: %s", td.Cmd)
}
return ""
})
})
}

func getKVTrace(t *testing.T, db *gosql.DB) string {
// These are the same KVOps looked at by logictest.
allowedKVOpTypes := []string{
"CPut",
"Put",
"InitPut",
"Del",
"DelRange",
"ClearRange",
"Get",
"Scan",
}
var sb strings.Builder
sb.WriteString("SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE ")
for i, op := range allowedKVOpTypes {
if i != 0 {
sb.WriteString("OR ")
}
sb.WriteString(fmt.Sprintf("message like '%s%%'", op))
}
traceMessagesQuery := sb.String()

rows, err := db.Query(traceMessagesQuery)
require.NoError(t, err)

var trace strings.Builder
for rows.Next() {
var s string
require.NoError(t, rows.Scan(&s))
trace.WriteString(s)
trace.WriteRune('\n')
}
require.NoError(t, rows.Err())
return trace.String()
}
4 changes: 2 additions & 2 deletions pkg/sql/row/inserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ func (ri *Inserter) InsertRow(
for i := range entries {
e := &entries[i]

// We don't want to check any conflicts when trying to preserve deletes.
if ri.Helper.Indexes[idx].UseDeletePreservingEncoding() {
if ri.Helper.Indexes[idx].ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, b, &e.Key, &e.Value, traceKV)
} else {
putFn(ctx, b, &e.Key, &e.Value, traceKV)
Expand Down
20 changes: 8 additions & 12 deletions pkg/sql/row/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,10 +421,8 @@ func (ru *Updater) UpdateRow(
continue
}

if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
if traceKV {
Expand Down Expand Up @@ -459,10 +457,8 @@ func (ru *Updater) UpdateRow(
)
}

if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of
// index writes during backfill, thus we can blindly put values into
// them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
// In this case, the index now has a k/v that did not exist in the
Expand Down Expand Up @@ -497,7 +493,8 @@ func (ru *Updater) UpdateRow(
// and the old row values do not match the partial index
// predicate.
newEntry := &newEntries[newIdx]
if index.UseDeletePreservingEncoding() {
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV)
} else {
if traceKV {
Expand All @@ -518,9 +515,8 @@ func (ru *Updater) UpdateRow(
}
// We're adding all of the inverted index entries from the row being updated.
for j := range ru.newIndexEntries[i] {
if index.UseDeletePreservingEncoding() {
// Delete preserving encoding indexes are used only as a log of index
// writes during backfill, thus we can blindly put values into them.
if index.ForcePut() {
// See the comemnt on (catalog.Index).ForcePut() for more details.
insertPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV)
} else {
insertInvertedPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV)
Expand Down
Loading