diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1aa07cb9d66d..ba86e494e32e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -485,6 +485,7 @@ go_test( "explain_bundle_test.go", "explain_test.go", "explain_tree_test.go", + "force_put_index_test.go", "indexbackfiller_test.go", "instrumentation_test.go", "internal_test.go", diff --git a/pkg/sql/catalog/descpb/structured.proto b/pkg/sql/catalog/descpb/structured.proto index 00610515c139..847094d177a1 100644 --- a/pkg/sql/catalog/descpb/structured.proto +++ b/pkg/sql/catalog/descpb/structured.proto @@ -696,6 +696,20 @@ message DescriptorMutation { // TODO(ssd): This is currently unused and is being added to // facilitate the new temporary-index-based backfilling process. BACKFILLING = 3; + + // Operations can use this invisible descriptor to implicitly + // write and delete entries. This is used by the MVCC-compatible + // index backfiller to ensure that unique indexes do not produce + // erroneous conflicts. + // + // Columns: Columns do not use this state. A column descriptor in + // this state is a programming error. + // + // Index: INSERT, UPDATE and DELETE treat this index like a + // DELETE_AND_WRITE_ONLY index, but use Put in instead of CPut or + // InitPut, effectively ignoring unique constraints. + MERGING = 4; + } optional State state = 3 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/catalog/table_elements.go b/pkg/sql/catalog/table_elements.go index f8ec82b129d7..df7bc09da00c 100644 --- a/pkg/sql/catalog/table_elements.go +++ b/pkg/sql/catalog/table_elements.go @@ -45,6 +45,10 @@ type TableElementMaybeMutation interface { // mutation in the backfilling state. Backfilling() bool + // Mergin returns true iff the table element is in a + // mutation in the merging state. + Merging() bool + // Adding returns true iff the table element is in an add mutation. Adding() bool @@ -186,6 +190,24 @@ type Index interface { NumCompositeColumns() int GetCompositeColumnID(compositeColumnOrdinal int) descpb.ColumnID UseDeletePreservingEncoding() bool + // ForcePut, if true, forces all writes to use Put rather than CPut or InitPut. + // + // Users of this options should take great care as it + // effectively mean unique constraints are not respected. + // + // Currently (2022-01-19) this two users: delete preserving + // indexes and merging indexes. + // + // Delete preserving encoding indexes are used only as a log of + // index writes during backfill, thus we can blindly put values into + // them. + // + // New indexes will be checked for uniqueness at the end of the may miss + // updates during the backfilling process that would lead to CPut + // failures until the missed updates are merged into the + // index. Uniqueness for such indexes is checked by the schema changer + // before they are brought back online. + ForcePut() bool } // Column is an interface around the column descriptor types. diff --git a/pkg/sql/catalog/tabledesc/index.go b/pkg/sql/catalog/tabledesc/index.go index 1e1c90f73d3b..9545ece32832 100644 --- a/pkg/sql/catalog/tabledesc/index.go +++ b/pkg/sql/catalog/tabledesc/index.go @@ -338,6 +338,14 @@ func (w index) UseDeletePreservingEncoding() bool { return w.desc.UseDeletePreservingEncoding && !w.maybeMutation.DeleteOnly() } +// ForcePut returns true if writes to the index should only use Put (rather than +// CPut or InitPut). This is used by indexes currently being built by the +// MVCC-compliant index backfiller and the temporary indexes that support that +// process. +func (w index) ForcePut() bool { + return w.Merging() || w.desc.UseDeletePreservingEncoding +} + // partitioning is the backing struct for a catalog.Partitioning interface. type partitioning struct { desc *descpb.PartitioningDescriptor diff --git a/pkg/sql/catalog/tabledesc/mutation.go b/pkg/sql/catalog/tabledesc/mutation.go index 39fd4d350788..27cbc06070e7 100644 --- a/pkg/sql/catalog/tabledesc/mutation.go +++ b/pkg/sql/catalog/tabledesc/mutation.go @@ -68,6 +68,12 @@ func (mm maybeMutation) Backfilling() bool { return mm.mutationState == descpb.DescriptorMutation_BACKFILLING } +// Merging returns true iff the table element is a mutation in the +// merging state. +func (mm maybeMutation) Merging() bool { + return mm.mutationState == descpb.DescriptorMutation_MERGING +} + // Adding returns true iff the table element is in an add mutation. func (mm maybeMutation) Adding() bool { return mm.mutationDirection == descpb.DescriptorMutation_ADD diff --git a/pkg/sql/force_put_index_test.go b/pkg/sql/force_put_index_test.go new file mode 100644 index 000000000000..cbb27184a54f --- /dev/null +++ b/pkg/sql/force_put_index_test.go @@ -0,0 +1,120 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + gosql "database/sql" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// TestMerginIndexKVOps is another poor-man's logictest that assert +// that indexes in the MERGING state do not see CPuts or InitPuts. +func TestMergingIndexKVOps(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + defer lease.TestingDisableTableLeases()() + + params, _ := tests.CreateTestServerParams() + // Decrease the adopt loop interval so that retries happen quickly. + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec("CREATE DATABASE t; USE t") + require.NoError(t, err) + datadriven.Walk(t, testutils.TestDataPath(t, "index_mutations"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "mutate-index": + if len(td.CmdArgs) < 3 { + td.Fatalf(t, "mutate-index requires at least an index name and a state") + } + tableName := td.CmdArgs[0].Key + name := td.CmdArgs[1].Key + stateStr := strings.ToUpper(td.CmdArgs[2].Key) + state := descpb.DescriptorMutation_State(descpb.DescriptorMutation_State_value[stateStr]) + + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "t", tableName) + err = mutateIndexByName(kvDB, codec, tableDesc, name, nil, state) + require.NoError(t, err) + case "statement": + _, err := sqlDB.Exec(td.Input) + require.NoError(t, err) + case "kvtrace": + _, err := sqlDB.Exec("SET TRACING=on,kv") + require.NoError(t, err) + _, err = sqlDB.Exec(td.Input) + require.NoError(t, err) + _, err = sqlDB.Exec("SET TRACING=off") + require.NoError(t, err) + return getKVTrace(t, sqlDB) + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return "" + }) + }) +} + +func getKVTrace(t *testing.T, db *gosql.DB) string { + // These are the same KVOps looked at by logictest. + allowedKVOpTypes := []string{ + "CPut", + "Put", + "InitPut", + "Del", + "DelRange", + "ClearRange", + "Get", + "Scan", + } + var sb strings.Builder + sb.WriteString("SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE ") + for i, op := range allowedKVOpTypes { + if i != 0 { + sb.WriteString("OR ") + } + sb.WriteString(fmt.Sprintf("message like '%s%%'", op)) + } + traceMessagesQuery := sb.String() + + rows, err := db.Query(traceMessagesQuery) + require.NoError(t, err) + + var trace strings.Builder + for rows.Next() { + var s string + require.NoError(t, rows.Scan(&s)) + trace.WriteString(s) + trace.WriteRune('\n') + } + require.NoError(t, rows.Err()) + return trace.String() +} diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 8cda4572c676..407970067441 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -192,8 +192,8 @@ func (ri *Inserter) InsertRow( for i := range entries { e := &entries[i] - // We don't want to check any conflicts when trying to preserve deletes. - if ri.Helper.Indexes[idx].UseDeletePreservingEncoding() { + if ri.Helper.Indexes[idx].ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, b, &e.Key, &e.Value, traceKV) } else { putFn(ctx, b, &e.Key, &e.Value, traceKV) diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 4aa25addaeae..c5d35b3f4e56 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -421,10 +421,8 @@ func (ru *Updater) UpdateRow( continue } - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of - // index writes during backfill, thus we can blindly put values into - // them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { if traceKV { @@ -459,10 +457,8 @@ func (ru *Updater) UpdateRow( ) } - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of - // index writes during backfill, thus we can blindly put values into - // them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { // In this case, the index now has a k/v that did not exist in the @@ -497,7 +493,8 @@ func (ru *Updater) UpdateRow( // and the old row values do not match the partial index // predicate. newEntry := &newEntries[newIdx] - if index.UseDeletePreservingEncoding() { + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { if traceKV { @@ -518,9 +515,8 @@ func (ru *Updater) UpdateRow( } // We're adding all of the inverted index entries from the row being updated. for j := range ru.newIndexEntries[i] { - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of index - // writes during backfill, thus we can blindly put values into them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV) } else { insertInvertedPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV) diff --git a/pkg/sql/testdata/index_mutations/merging b/pkg/sql/testdata/index_mutations/merging new file mode 100644 index 000000000000..d7971ecf0583 --- /dev/null +++ b/pkg/sql/testdata/index_mutations/merging @@ -0,0 +1,217 @@ +# The following tests are similar to +# pkg/sql/opt/exec/execbuilder/testdata/ but are here as they depend +# on index mutations. + +# +# Ensure MERGING indexes forces Puts even on unique indexes. +# +statement +CREATE TABLE ti ( + a INT PRIMARY KEY, + b INT, + FAMILY (a, b) +); +CREATE UNIQUE INDEX test_index_to_mutate ON ti (b); +INSERT INTO ti VALUES (1, 1), (2, 2), (4, 4) +---- + +mutate-index ti test_index_to_mutate MERGING +---- + +# Insert that would conflict in DELETE_AND_WRITE_ONLY does not conflict +kvtrace +INSERT INTO ti VALUES (3, 1) +---- +CPut /Table/56/1/3/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8b + +kvtrace +UPDATE ti SET b = 2 WHERE a = 4 +---- +Scan /Table/56/1/4/0 +Put /Table/56/1/4/0 -> /TUPLE/2:2:Int/2 +Del /Table/56/2/4/0 +Put /Table/56/2/2/0 -> /BYTES/0x8c + +kvtrace +UPSERT INTO ti VALUES (5, 1) +---- +Scan /Table/56/1/5/0 +CPut /Table/56/1/5/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8d + +kvtrace +UPSERT INTO ti VALUES (2, 1) +---- +Scan /Table/56/1/2/0 +Put /Table/56/1/2/0 -> /TUPLE/2:2:Int/1 +Del /Table/56/2/2/0 +Put /Table/56/2/1/0 -> /BYTES/0x8a + +kvtrace +INSERT INTO ti VALUES (6, 1) ON CONFLICT DO NOTHING +---- +Scan /Table/56/1/6/0 +CPut /Table/56/1/6/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8e + +kvtrace +INSERT INTO ti VALUES (1, 2) ON CONFLICT (a) DO UPDATE SET b = excluded.b +---- +Scan /Table/56/1/1/0 +Put /Table/56/1/1/0 -> /TUPLE/2:2:Int/2 +Del /Table/56/2/1/0 +Put /Table/56/2/2/0 -> /BYTES/0x89 + +# --------------------------------------------------------- +# Partial Index With ForcePut +# --------------------------------------------------------- +statement +CREATE TABLE tpfp ( + a INT PRIMARY KEY, + b INT, + c STRING, + FAMILY (a, b, c), + UNIQUE INDEX partial (c) WHERE a > b AND c IN ('foo', 'foobar') +) +---- + +mutate-index tpfp partial MERGING +---- + +statement +INSERT INTO tpfp VALUES (3, 4, 'bar') +---- + +# Update a row that doesn't match the partial index. +kvtrace +UPDATE tpfp SET b = b + 1 +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/5/1:3:Bytes/bar + +# Update a row that didn't match the partial index before but matches after. +kvtrace +UPDATE tpfp SET b = b - 3, c = 'foo' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/foo +Put /Table/57/2/"foo"/0 -> /BYTES/0x8b + +# Update a row that matches the partial index before and after, but the index +# entry doesn't change. +kvtrace +UPDATE tpfp SET b = b - 1 +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/1/1:3:Bytes/foo + +# Update a row that matches the partial index before and after, and the index +# entry changes. +kvtrace +UPDATE tpfp SET b = b + 1, c = 'foobar' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/foobar +Del /Table/57/2/"foo"/0 +Put /Table/57/2/"foobar"/0 -> /BYTES/0x8b + +# Update a row that matches the partial index before but not after. +kvtrace +UPDATE tpfp SET c = 'baz' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/baz +Del /Table/57/2/"foobar"/0 + +# --------------------------------------------------------- +# Expression Index With ForcePut +# --------------------------------------------------------- +statement +CREATE TABLE tefp ( + k INT PRIMARY KEY, + a INT, + b INT, + FAMILY (k, a, b), + UNIQUE INDEX t_a_plus_b_idx ((a + b)) +) +---- + +statement +INSERT INTO tefp VALUES (1, 2, 100) +---- + +# Update a row which changes the index entry. +kvtrace +UPDATE tefp SET a = a + 1, b = b + 100 +---- +Scan /Table/58/{1-2} +Put /Table/58/1/1/0 -> /TUPLE/2:2:Int/3/1:3:Int/200 +Del /Table/58/2/102/0 +CPut /Table/58/2/203/0 -> /BYTES/0x89 (expecting does not exist) + +# Update a row with different values without changing the index entry. +kvtrace +UPDATE tefp SET a = a + 1, b = b - 1 +---- +Scan /Table/58/{1-2} +Put /Table/58/1/1/0 -> /TUPLE/2:2:Int/4/1:3:Int/199 + +# --------------------------------------------------------- +# Inverted Index With ForcePut +# --------------------------------------------------------- + +statement +CREATE TABLE tifp ( + a INT PRIMARY KEY, + b INT[], + FAMILY (a,b), + INVERTED INDEX inverted (b) + ) +---- + +mutate-index tifp inverted MERGING +---- + +statement +INSERT INTO tifp VALUES (1, ARRAY[1, 2, 3, 2, 2, NULL, 3]) +---- + +# Update a row that has 1 new entry and 1 removed entry in the index. +kvtrace +UPDATE tifp SET b = ARRAY[1, 2, 2, NULL, 4, 4] +---- +Scan /Table/59/{1-2} +Put /Table/59/1/1/0 -> /TUPLE/ +Del /Table/59/2/3/1/0 +Put /Table/59/2/4/1/0 -> /BYTES/ + +# --------------------------------------------------------- +# Multicolumn Inverted Index With ForcePut +# --------------------------------------------------------- +statement ok +CREATE TABLE tmfp ( + a INT PRIMARY KEY, + b INT, + c JSON, + FAMILY (a, b, c), + INVERTED INDEX inverted (b, c) +) +---- + +mutate-index tmfp inverted MERGING +---- + +statement +INSERT INTO tmfp VALUES (1, 2, '{"a": "foo", "b": "bar"}'::json) +---- + +kvtrace +UPDATE tmfp SET b = 3, c = '{"a": "foobar", "c": "baz"}'::json +---- +Scan /Table/60/{1-2} +Put /Table/60/1/1/0 -> /TUPLE/2:2:Int/3/ +Del /Table/60/2/2/"a"/"foo"/1/0 +Del /Table/60/2/2/"b"/"bar"/1/0 +Put /Table/60/2/3/"a"/"foobar"/1/0 -> /BYTES/ +Put /Table/60/2/3/"c"/"baz"/1/0 -> /BYTES/