From 0ea952053bc0bac5e91b78285645e96b1946ebbe Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Fri, 28 Jan 2022 18:46:25 +0000 Subject: [PATCH] sql: add MERGING descriptor state This PR adds a MERGING index descriptor state. This state is similar to DELETE_AND_WRITE_ONLY, but all writes are handled via Put, even in cases where they would typically used CPut or InitPut. This change is in support of the mvcc-compatible index backfiller. As part of the backfill process, newly added indexes will be added in a BACKFILLING state in which they see no writes or deletes. However, this would allow for the following sequence of events. For simplicity, the delete-preserving temporary index state transitions have been elided: t0: `CREATE UNIQUE INDEX` creates "new index" in BACKFILLING state and a delete-preserving "temp index" t1: `INSERT INTO` with a = 1 (new index does not see this write) t2: Backfill starts at t1 (a=1 included in backfill, ends up in "new index") t3: UPDATE a = 2 where a = 1 (new index still in backfilling does not see these writes) t4: Backfilling completes t5: "new index" to DELETE-ONLY t6: "new index" to DELETE-AND-WRITE-ONLY t7: `INSERT INTO` into with a = 1 t8: a = 2 merged from "temp index" into "new index" Unfortunately, at t7 the user would encounter an erroneous duplicate key violation when they attempt their insert because the attempted CPut into "new index" would fail as there is an existing value (a=1). The correct value of a=2 will not be written into "new index" until a follow-up merge pass that occurs at t8 The MERGING phase is intended to solve this problem by allowing for the following sequence: t0: `CREATE UNIQUE INDEX` creates "new index" in BACKFILLING state and a delete-preserving "temp index" t1: `INSERT INTO` with a = 1 (new index does not see this write) t2: Backfill starts at t1 (a=1 included in backfill, ends up in "new index") t3: UPDATE a = 2 where a = 1 (new index still in backfilling does not see these writes) t4: Backfilling completes t5: "new index" to DELETE-ONLY t5: "new index" to MERGING t7: `INSERT INTO` into with a = 1 t8: a = 2 merged from "temp index" into "new index" t9: "new index" to DELETE-AND-WRITE-ONLY Note that while in the MERGING state, the new index may accumulate duplicate keys. This is OK because it will be checked for uniqueness before being brought online. The tests added here are nearly identical to the tests added for DeletePreservingEncoding but have been written using a small data-drive test that supports putting indexes into a mutated state. Release note: None --- pkg/sql/BUILD.bazel | 1 + pkg/sql/catalog/descpb/structured.proto | 14 ++ pkg/sql/catalog/table_elements.go | 22 +++ pkg/sql/catalog/tabledesc/index.go | 8 + pkg/sql/catalog/tabledesc/mutation.go | 6 + pkg/sql/force_put_index_test.go | 120 +++++++++++++ pkg/sql/row/inserter.go | 4 +- pkg/sql/row/updater.go | 20 +-- pkg/sql/testdata/index_mutations/merging | 217 +++++++++++++++++++++++ 9 files changed, 398 insertions(+), 14 deletions(-) create mode 100644 pkg/sql/force_put_index_test.go create mode 100644 pkg/sql/testdata/index_mutations/merging diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 1aa07cb9d66d..ba86e494e32e 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -485,6 +485,7 @@ go_test( "explain_bundle_test.go", "explain_test.go", "explain_tree_test.go", + "force_put_index_test.go", "indexbackfiller_test.go", "instrumentation_test.go", "internal_test.go", diff --git a/pkg/sql/catalog/descpb/structured.proto b/pkg/sql/catalog/descpb/structured.proto index 00610515c139..847094d177a1 100644 --- a/pkg/sql/catalog/descpb/structured.proto +++ b/pkg/sql/catalog/descpb/structured.proto @@ -696,6 +696,20 @@ message DescriptorMutation { // TODO(ssd): This is currently unused and is being added to // facilitate the new temporary-index-based backfilling process. BACKFILLING = 3; + + // Operations can use this invisible descriptor to implicitly + // write and delete entries. This is used by the MVCC-compatible + // index backfiller to ensure that unique indexes do not produce + // erroneous conflicts. + // + // Columns: Columns do not use this state. A column descriptor in + // this state is a programming error. + // + // Index: INSERT, UPDATE and DELETE treat this index like a + // DELETE_AND_WRITE_ONLY index, but use Put in instead of CPut or + // InitPut, effectively ignoring unique constraints. + MERGING = 4; + } optional State state = 3 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/catalog/table_elements.go b/pkg/sql/catalog/table_elements.go index f8ec82b129d7..df7bc09da00c 100644 --- a/pkg/sql/catalog/table_elements.go +++ b/pkg/sql/catalog/table_elements.go @@ -45,6 +45,10 @@ type TableElementMaybeMutation interface { // mutation in the backfilling state. Backfilling() bool + // Mergin returns true iff the table element is in a + // mutation in the merging state. + Merging() bool + // Adding returns true iff the table element is in an add mutation. Adding() bool @@ -186,6 +190,24 @@ type Index interface { NumCompositeColumns() int GetCompositeColumnID(compositeColumnOrdinal int) descpb.ColumnID UseDeletePreservingEncoding() bool + // ForcePut, if true, forces all writes to use Put rather than CPut or InitPut. + // + // Users of this options should take great care as it + // effectively mean unique constraints are not respected. + // + // Currently (2022-01-19) this two users: delete preserving + // indexes and merging indexes. + // + // Delete preserving encoding indexes are used only as a log of + // index writes during backfill, thus we can blindly put values into + // them. + // + // New indexes will be checked for uniqueness at the end of the may miss + // updates during the backfilling process that would lead to CPut + // failures until the missed updates are merged into the + // index. Uniqueness for such indexes is checked by the schema changer + // before they are brought back online. + ForcePut() bool } // Column is an interface around the column descriptor types. diff --git a/pkg/sql/catalog/tabledesc/index.go b/pkg/sql/catalog/tabledesc/index.go index 1e1c90f73d3b..9545ece32832 100644 --- a/pkg/sql/catalog/tabledesc/index.go +++ b/pkg/sql/catalog/tabledesc/index.go @@ -338,6 +338,14 @@ func (w index) UseDeletePreservingEncoding() bool { return w.desc.UseDeletePreservingEncoding && !w.maybeMutation.DeleteOnly() } +// ForcePut returns true if writes to the index should only use Put (rather than +// CPut or InitPut). This is used by indexes currently being built by the +// MVCC-compliant index backfiller and the temporary indexes that support that +// process. +func (w index) ForcePut() bool { + return w.Merging() || w.desc.UseDeletePreservingEncoding +} + // partitioning is the backing struct for a catalog.Partitioning interface. type partitioning struct { desc *descpb.PartitioningDescriptor diff --git a/pkg/sql/catalog/tabledesc/mutation.go b/pkg/sql/catalog/tabledesc/mutation.go index 39fd4d350788..27cbc06070e7 100644 --- a/pkg/sql/catalog/tabledesc/mutation.go +++ b/pkg/sql/catalog/tabledesc/mutation.go @@ -68,6 +68,12 @@ func (mm maybeMutation) Backfilling() bool { return mm.mutationState == descpb.DescriptorMutation_BACKFILLING } +// Merging returns true iff the table element is a mutation in the +// merging state. +func (mm maybeMutation) Merging() bool { + return mm.mutationState == descpb.DescriptorMutation_MERGING +} + // Adding returns true iff the table element is in an add mutation. func (mm maybeMutation) Adding() bool { return mm.mutationDirection == descpb.DescriptorMutation_ADD diff --git a/pkg/sql/force_put_index_test.go b/pkg/sql/force_put_index_test.go new file mode 100644 index 000000000000..cbb27184a54f --- /dev/null +++ b/pkg/sql/force_put_index_test.go @@ -0,0 +1,120 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql_test + +import ( + "context" + gosql "database/sql" + "fmt" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" +) + +// TestMerginIndexKVOps is another poor-man's logictest that assert +// that indexes in the MERGING state do not see CPuts or InitPuts. +func TestMergingIndexKVOps(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + defer lease.TestingDisableTableLeases()() + + params, _ := tests.CreateTestServerParams() + // Decrease the adopt loop interval so that retries happen quickly. + params.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + s, sqlDB, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec("CREATE DATABASE t; USE t") + require.NoError(t, err) + datadriven.Walk(t, testutils.TestDataPath(t, "index_mutations"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "mutate-index": + if len(td.CmdArgs) < 3 { + td.Fatalf(t, "mutate-index requires at least an index name and a state") + } + tableName := td.CmdArgs[0].Key + name := td.CmdArgs[1].Key + stateStr := strings.ToUpper(td.CmdArgs[2].Key) + state := descpb.DescriptorMutation_State(descpb.DescriptorMutation_State_value[stateStr]) + + codec := s.ExecutorConfig().(sql.ExecutorConfig).Codec + tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "t", tableName) + err = mutateIndexByName(kvDB, codec, tableDesc, name, nil, state) + require.NoError(t, err) + case "statement": + _, err := sqlDB.Exec(td.Input) + require.NoError(t, err) + case "kvtrace": + _, err := sqlDB.Exec("SET TRACING=on,kv") + require.NoError(t, err) + _, err = sqlDB.Exec(td.Input) + require.NoError(t, err) + _, err = sqlDB.Exec("SET TRACING=off") + require.NoError(t, err) + return getKVTrace(t, sqlDB) + default: + td.Fatalf(t, "unknown directive: %s", td.Cmd) + } + return "" + }) + }) +} + +func getKVTrace(t *testing.T, db *gosql.DB) string { + // These are the same KVOps looked at by logictest. + allowedKVOpTypes := []string{ + "CPut", + "Put", + "InitPut", + "Del", + "DelRange", + "ClearRange", + "Get", + "Scan", + } + var sb strings.Builder + sb.WriteString("SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE ") + for i, op := range allowedKVOpTypes { + if i != 0 { + sb.WriteString("OR ") + } + sb.WriteString(fmt.Sprintf("message like '%s%%'", op)) + } + traceMessagesQuery := sb.String() + + rows, err := db.Query(traceMessagesQuery) + require.NoError(t, err) + + var trace strings.Builder + for rows.Next() { + var s string + require.NoError(t, rows.Scan(&s)) + trace.WriteString(s) + trace.WriteRune('\n') + } + require.NoError(t, rows.Err()) + return trace.String() +} diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 8cda4572c676..407970067441 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -192,8 +192,8 @@ func (ri *Inserter) InsertRow( for i := range entries { e := &entries[i] - // We don't want to check any conflicts when trying to preserve deletes. - if ri.Helper.Indexes[idx].UseDeletePreservingEncoding() { + if ri.Helper.Indexes[idx].ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, b, &e.Key, &e.Value, traceKV) } else { putFn(ctx, b, &e.Key, &e.Value, traceKV) diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 4aa25addaeae..c5d35b3f4e56 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -421,10 +421,8 @@ func (ru *Updater) UpdateRow( continue } - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of - // index writes during backfill, thus we can blindly put values into - // them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { if traceKV { @@ -459,10 +457,8 @@ func (ru *Updater) UpdateRow( ) } - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of - // index writes during backfill, thus we can blindly put values into - // them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { // In this case, the index now has a k/v that did not exist in the @@ -497,7 +493,8 @@ func (ru *Updater) UpdateRow( // and the old row values do not match the partial index // predicate. newEntry := &newEntries[newIdx] - if index.UseDeletePreservingEncoding() { + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &newEntry.Key, &newEntry.Value, traceKV) } else { if traceKV { @@ -518,9 +515,8 @@ func (ru *Updater) UpdateRow( } // We're adding all of the inverted index entries from the row being updated. for j := range ru.newIndexEntries[i] { - if index.UseDeletePreservingEncoding() { - // Delete preserving encoding indexes are used only as a log of index - // writes during backfill, thus we can blindly put values into them. + if index.ForcePut() { + // See the comemnt on (catalog.Index).ForcePut() for more details. insertPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV) } else { insertInvertedPutFn(ctx, batch, &ru.newIndexEntries[i][j].Key, &ru.newIndexEntries[i][j].Value, traceKV) diff --git a/pkg/sql/testdata/index_mutations/merging b/pkg/sql/testdata/index_mutations/merging new file mode 100644 index 000000000000..d7971ecf0583 --- /dev/null +++ b/pkg/sql/testdata/index_mutations/merging @@ -0,0 +1,217 @@ +# The following tests are similar to +# pkg/sql/opt/exec/execbuilder/testdata/ but are here as they depend +# on index mutations. + +# +# Ensure MERGING indexes forces Puts even on unique indexes. +# +statement +CREATE TABLE ti ( + a INT PRIMARY KEY, + b INT, + FAMILY (a, b) +); +CREATE UNIQUE INDEX test_index_to_mutate ON ti (b); +INSERT INTO ti VALUES (1, 1), (2, 2), (4, 4) +---- + +mutate-index ti test_index_to_mutate MERGING +---- + +# Insert that would conflict in DELETE_AND_WRITE_ONLY does not conflict +kvtrace +INSERT INTO ti VALUES (3, 1) +---- +CPut /Table/56/1/3/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8b + +kvtrace +UPDATE ti SET b = 2 WHERE a = 4 +---- +Scan /Table/56/1/4/0 +Put /Table/56/1/4/0 -> /TUPLE/2:2:Int/2 +Del /Table/56/2/4/0 +Put /Table/56/2/2/0 -> /BYTES/0x8c + +kvtrace +UPSERT INTO ti VALUES (5, 1) +---- +Scan /Table/56/1/5/0 +CPut /Table/56/1/5/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8d + +kvtrace +UPSERT INTO ti VALUES (2, 1) +---- +Scan /Table/56/1/2/0 +Put /Table/56/1/2/0 -> /TUPLE/2:2:Int/1 +Del /Table/56/2/2/0 +Put /Table/56/2/1/0 -> /BYTES/0x8a + +kvtrace +INSERT INTO ti VALUES (6, 1) ON CONFLICT DO NOTHING +---- +Scan /Table/56/1/6/0 +CPut /Table/56/1/6/0 -> /TUPLE/2:2:Int/1 +Put /Table/56/2/1/0 -> /BYTES/0x8e + +kvtrace +INSERT INTO ti VALUES (1, 2) ON CONFLICT (a) DO UPDATE SET b = excluded.b +---- +Scan /Table/56/1/1/0 +Put /Table/56/1/1/0 -> /TUPLE/2:2:Int/2 +Del /Table/56/2/1/0 +Put /Table/56/2/2/0 -> /BYTES/0x89 + +# --------------------------------------------------------- +# Partial Index With ForcePut +# --------------------------------------------------------- +statement +CREATE TABLE tpfp ( + a INT PRIMARY KEY, + b INT, + c STRING, + FAMILY (a, b, c), + UNIQUE INDEX partial (c) WHERE a > b AND c IN ('foo', 'foobar') +) +---- + +mutate-index tpfp partial MERGING +---- + +statement +INSERT INTO tpfp VALUES (3, 4, 'bar') +---- + +# Update a row that doesn't match the partial index. +kvtrace +UPDATE tpfp SET b = b + 1 +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/5/1:3:Bytes/bar + +# Update a row that didn't match the partial index before but matches after. +kvtrace +UPDATE tpfp SET b = b - 3, c = 'foo' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/foo +Put /Table/57/2/"foo"/0 -> /BYTES/0x8b + +# Update a row that matches the partial index before and after, but the index +# entry doesn't change. +kvtrace +UPDATE tpfp SET b = b - 1 +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/1/1:3:Bytes/foo + +# Update a row that matches the partial index before and after, and the index +# entry changes. +kvtrace +UPDATE tpfp SET b = b + 1, c = 'foobar' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/foobar +Del /Table/57/2/"foo"/0 +Put /Table/57/2/"foobar"/0 -> /BYTES/0x8b + +# Update a row that matches the partial index before but not after. +kvtrace +UPDATE tpfp SET c = 'baz' +---- +Scan /Table/57/{1-2} +Put /Table/57/1/3/0 -> /TUPLE/2:2:Int/2/1:3:Bytes/baz +Del /Table/57/2/"foobar"/0 + +# --------------------------------------------------------- +# Expression Index With ForcePut +# --------------------------------------------------------- +statement +CREATE TABLE tefp ( + k INT PRIMARY KEY, + a INT, + b INT, + FAMILY (k, a, b), + UNIQUE INDEX t_a_plus_b_idx ((a + b)) +) +---- + +statement +INSERT INTO tefp VALUES (1, 2, 100) +---- + +# Update a row which changes the index entry. +kvtrace +UPDATE tefp SET a = a + 1, b = b + 100 +---- +Scan /Table/58/{1-2} +Put /Table/58/1/1/0 -> /TUPLE/2:2:Int/3/1:3:Int/200 +Del /Table/58/2/102/0 +CPut /Table/58/2/203/0 -> /BYTES/0x89 (expecting does not exist) + +# Update a row with different values without changing the index entry. +kvtrace +UPDATE tefp SET a = a + 1, b = b - 1 +---- +Scan /Table/58/{1-2} +Put /Table/58/1/1/0 -> /TUPLE/2:2:Int/4/1:3:Int/199 + +# --------------------------------------------------------- +# Inverted Index With ForcePut +# --------------------------------------------------------- + +statement +CREATE TABLE tifp ( + a INT PRIMARY KEY, + b INT[], + FAMILY (a,b), + INVERTED INDEX inverted (b) + ) +---- + +mutate-index tifp inverted MERGING +---- + +statement +INSERT INTO tifp VALUES (1, ARRAY[1, 2, 3, 2, 2, NULL, 3]) +---- + +# Update a row that has 1 new entry and 1 removed entry in the index. +kvtrace +UPDATE tifp SET b = ARRAY[1, 2, 2, NULL, 4, 4] +---- +Scan /Table/59/{1-2} +Put /Table/59/1/1/0 -> /TUPLE/ +Del /Table/59/2/3/1/0 +Put /Table/59/2/4/1/0 -> /BYTES/ + +# --------------------------------------------------------- +# Multicolumn Inverted Index With ForcePut +# --------------------------------------------------------- +statement ok +CREATE TABLE tmfp ( + a INT PRIMARY KEY, + b INT, + c JSON, + FAMILY (a, b, c), + INVERTED INDEX inverted (b, c) +) +---- + +mutate-index tmfp inverted MERGING +---- + +statement +INSERT INTO tmfp VALUES (1, 2, '{"a": "foo", "b": "bar"}'::json) +---- + +kvtrace +UPDATE tmfp SET b = 3, c = '{"a": "foobar", "c": "baz"}'::json +---- +Scan /Table/60/{1-2} +Put /Table/60/1/1/0 -> /TUPLE/2:2:Int/3/ +Del /Table/60/2/2/"a"/"foo"/1/0 +Del /Table/60/2/2/"b"/"bar"/1/0 +Put /Table/60/2/3/"a"/"foobar"/1/0 -> /BYTES/ +Put /Table/60/2/3/"c"/"baz"/1/0 -> /BYTES/