From 942330118de8ed2809594a6ddbf893bf5e16c574 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Mon, 1 Nov 2021 14:00:12 +0000 Subject: [PATCH] sql: introduce MVCC-compliant index backfiller Previously, the index backfilling process depended upon non-MVCC compliant AddSSTable calls which potentially rewrote previously read historical values. To support an MVCC-compliant AddSSTable that writes at the _current_ timestamp, this change implements a new backfilling process described in the following RFC: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20211004_incremental_index_backfiller.md In summary, the new index backfilling process depends on backfilling the new index when it is in a BACKFILLING state (added in #72281). In this state it receives no writes or deletes. Writes that occur during the backfilling process are captured by a "temporary index." This temporary index uses the DeletePreservingEncoding to ensure it captures deletes as well as writes. After the of bulk backfill using the MVCC-compliant AddSSTable, the index is moved into a MERGING state (added in #75663) in which it receives writes and deletes. Writes previously captured by the temporary index are then transactionally merged into the newly added index. This feature is currently behind a new boolean cluster setting which default to true. Schema changes that contains both old and new-style backfills are rejected. Reverting the default to false will require updating various tests since many tests depend on the exact index IDs of newly added indexes. Release note: None Co-authored-by: Rui Hu --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/ccl/backupccl/restore_job.go | 2 +- pkg/ccl/multiregionccl/BUILD.bazel | 1 + .../multiregionccl/regional_by_row_test.go | 31 +- pkg/ccl/partitionccl/drop_test.go | 6 +- pkg/ccl/partitionccl/partition_test.go | 6 +- pkg/clusterversion/cockroach_versions.go | 11 +- pkg/clusterversion/key_string.go | 5 +- pkg/jobs/errors.go | 6 + pkg/sql/add_column.go | 2 +- pkg/sql/alter_primary_key.go | 27 +- pkg/sql/alter_table.go | 2 +- pkg/sql/backfill.go | 232 ++++++++++- pkg/sql/backfill/backfill.go | 3 +- pkg/sql/catalog/table_elements.go | 42 ++ pkg/sql/catalog/tabledesc/BUILD.bazel | 1 + pkg/sql/catalog/tabledesc/index.go | 10 + pkg/sql/catalog/tabledesc/index_test.go | 27 +- pkg/sql/catalog/tabledesc/structured.go | 120 +++++- pkg/sql/create_index.go | 11 +- pkg/sql/delete_preserving_index_test.go | 47 +-- pkg/sql/descriptor_mutation_test.go | 12 +- pkg/sql/distsql_plan_backfill.go | 14 +- pkg/sql/drop_index.go | 2 +- pkg/sql/drop_test.go | 14 +- pkg/sql/execinfrapb/processors_bulk_io.proto | 17 +- pkg/sql/index_backfiller.go | 2 +- pkg/sql/indexbackfiller_test.go | 44 ++- .../testdata/logic_test/alter_primary_key | 28 +- .../logictest/testdata/logic_test/alter_table | 15 +- .../testdata/logic_test/dependencies | 18 +- pkg/sql/logictest/testdata/logic_test/jobs | 70 ++-- .../logictest/testdata/logic_test/pg_catalog | 20 +- pkg/sql/logictest/testdata/logic_test/ranges | Bin 31555 -> 32078 bytes .../testdata/logic_test/schema_change_in_txn | 6 +- .../logictest/testdata/logic_test/zigzag_join | 4 + .../opt/exec/execbuilder/testdata/geospatial | 4 + ...ndary_index_column_families_nonmetamorphic | 4 + .../testdata/show_trace_nonmetamorphic | 6 +- pkg/sql/region_util.go | 4 + pkg/sql/rowexec/indexbackfiller.go | 1 + pkg/sql/schema_changer.go | 243 +++++++++++- pkg/sql/schema_changer_helpers_test.go | 3 +- pkg/sql/schema_changer_test.go | 364 +++++++++++++++--- .../scexec/exec_backfill_test.go | 2 +- .../scexec/scmutationexec/helpers.go | 4 +- pkg/sql/truncate.go | 5 +- 48 files changed, 1246 insertions(+), 256 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 3cb80096fc09..af325df1f071 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -176,4 +176,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-66 set the active cluster version in the format '.' +version version 21.2-68 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 2c38db0a9e83..e7e06b56857a 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -189,6 +189,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-66set the active cluster version in the format '.' +versionversion21.2-68set the active cluster version in the format '.' diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 509049a7e784..28bf6cff574f 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1902,7 +1902,7 @@ func (r *restoreResumer) publishDescriptors( } newIdx := found.IndexDescDeepCopy() mutTable.RemovePublicNonPrimaryIndex(found.Ordinal()) - if err := mutTable.AddIndexMutation(&newIdx, descpb.DescriptorMutation_ADD); err != nil { + if err := mutTable.AddIndexMutation(ctx, &newIdx, descpb.DescriptorMutation_ADD, r.settings); err != nil { return err } } diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 1165e54f31e7..9c6cedb3979b 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -42,6 +42,7 @@ go_test( "//pkg/ccl/testutilsccl", "//pkg/ccl/utilccl", "//pkg/jobs", + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvbase", diff --git a/pkg/ccl/multiregionccl/regional_by_row_test.go b/pkg/ccl/multiregionccl/regional_by_row_test.go index 95d6b5b2470e..379949222865 100644 --- a/pkg/ccl/multiregionccl/regional_by_row_test.go +++ b/pkg/ccl/multiregionccl/regional_by_row_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/testutilsccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" @@ -568,6 +569,10 @@ func TestIndexCleanupAfterAlterFromRegionalByRow(t *testing.T) { {locality: "REGIONAL BY ROW AS region_col"}, } { t.Run(tc.locality, func(t *testing.T) { + // Don't allow gc jobs to complete so that we + // can validate that they were created. + blockGC := make(chan struct{}) + knobs := base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ // Disable the merge queue because it makes this test flakey @@ -583,6 +588,7 @@ func TestIndexCleanupAfterAlterFromRegionalByRow(t *testing.T) { }, // Decrease the adopt loop interval so that retries happen quickly. JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { <-blockGC; return nil }}, } _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( @@ -680,21 +686,32 @@ CREATE TABLE regional_by_row ( return nil } + expectedGCJobsForDrops := 4 + expectedGCJobsForTempIndexes := 4 // Now check that we have the right number of index GC jobs pending. - err = queryIndexGCJobsAndValidateCount(`running`, 4) + err = queryIndexGCJobsAndValidateCount(`running`, expectedGCJobsForDrops+expectedGCJobsForTempIndexes) require.NoError(t, err) err = queryIndexGCJobsAndValidateCount(`succeeded`, 0) require.NoError(t, err) + queryAndEnsureThatIndexGCJobsSucceeded := func(count int) func() error { + return func() error { return queryIndexGCJobsAndValidateCount(`succeeded`, count) } + } + + // Unblock GC jobs. + close(blockGC) + // The GC jobs for the temporary indexes should be cleaned up immediately. + testutils.SucceedsSoon(t, queryAndEnsureThatIndexGCJobsSucceeded(expectedGCJobsForTempIndexes)) + // The GC jobs for the drops should still be waiting out the GC TTL. + err = queryIndexGCJobsAndValidateCount(`running`, expectedGCJobsForDrops) + require.NoError(t, err) + // Change gc.ttlseconds to speed up the cleanup. _, err = sqlDB.Exec(`ALTER TABLE regional_by_row CONFIGURE ZONE USING gc.ttlseconds = 1`) require.NoError(t, err) // Validate that indexes are cleaned up. - queryAndEnsureThatFourIndexGCJobsSucceeded := func() error { - return queryIndexGCJobsAndValidateCount(`succeeded`, 4) - } - testutils.SucceedsSoon(t, queryAndEnsureThatFourIndexGCJobsSucceeded) + testutils.SucceedsSoon(t, queryAndEnsureThatIndexGCJobsSucceeded(expectedGCJobsForDrops+expectedGCJobsForTempIndexes)) err = queryIndexGCJobsAndValidateCount(`running`, 0) require.NoError(t, err) }) @@ -918,7 +935,7 @@ func TestIndexDescriptorUpdateForImplicitColumns(t *testing.T) { t.Run("primary index", func(t *testing.T) { tdb.Exec(t, `CREATE TABLE test.t1 ( - a INT PRIMARY KEY, + a INT PRIMARY KEY, b test.public.crdb_internal_region NOT NULL ) LOCALITY GLOBAL`) indexes := fetchIndexes("t1") @@ -944,7 +961,7 @@ func TestIndexDescriptorUpdateForImplicitColumns(t *testing.T) { t.Run("secondary index", func(t *testing.T) { tdb.Exec(t, `CREATE TABLE test.t2 ( - a INT PRIMARY KEY, + a INT PRIMARY KEY, b test.public.crdb_internal_region NOT NULL, c INT NOT NULL, d INT NOT NULL, diff --git a/pkg/ccl/partitionccl/drop_test.go b/pkg/ccl/partitionccl/drop_test.go index e1f7e6751f8f..0b5e807ed1f5 100644 --- a/pkg/ccl/partitionccl/drop_test.go +++ b/pkg/ccl/partitionccl/drop_test.go @@ -104,8 +104,8 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) { partition string }{ {1, ""}, - {3, ""}, - {3, "p2"}, + {4, ""}, + {4, "p2"}, } for _, target := range subzones { if exists := subzoneExists(cfg, target.index, target.partition); !exists { @@ -157,7 +157,7 @@ func TestDropIndexPartitionedByUserDefinedTypeCCL(t *testing.T) { t.Helper() var id int tdb.QueryRow(t, ` -SELECT job_id +SELECT job_id FROM crdb_internal.jobs WHERE description LIKE $1 `, description).Scan(&id) diff --git a/pkg/ccl/partitionccl/partition_test.go b/pkg/ccl/partitionccl/partition_test.go index 2715f1a42bc1..750ec41049ea 100644 --- a/pkg/ccl/partitionccl/partition_test.go +++ b/pkg/ccl/partitionccl/partition_test.go @@ -1462,12 +1462,12 @@ ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (y) // Our subzones should be spans prefixed with dropped copy of i1, // dropped copy of i2, new copy of i1, and new copy of i2. - // These have ID's 2, 3, 6 and 7 respectively. + // These have ID's 2, 3, 8 and 10 respectively. expectedSpans := []roachpb.Key{ table.IndexSpan(keys.SystemSQLCodec, 2 /* indexID */).Key, table.IndexSpan(keys.SystemSQLCodec, 3 /* indexID */).Key, - table.IndexSpan(keys.SystemSQLCodec, 6 /* indexID */).Key, - table.IndexSpan(keys.SystemSQLCodec, 7 /* indexID */).Key, + table.IndexSpan(keys.SystemSQLCodec, 8 /* indexID */).Key, + table.IndexSpan(keys.SystemSQLCodec, 10 /* indexID */).Key, } if len(zone.SubzoneSpans) != len(expectedSpans) { t.Fatalf("expected subzones to have length %d", len(expectedSpans)) diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index bb22e5b3b23f..a72286e77f74 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -283,6 +283,12 @@ const ( // to disable the data propagation mechanism it and the entire spanconfig // infrastructure obviates. DisableSystemConfigGossipTrigger + // MVCCIndexBackfiller supports MVCC-compliant index + // backfillers via a new BACKFILLING index state, delete + // preserving temporary indexes, and a post-backfill merging + // processing. + MVCCIndexBackfiller + // ************************************************* // Step (1): Add new versions here. // Do not add new versions to a patch release. @@ -450,7 +456,10 @@ var versionsSingleton = keyedVersions{ Key: DisableSystemConfigGossipTrigger, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 66}, }, - + { + Key: MVCCIndexBackfiller, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 68}, + }, // ************************************************* // Step (2): Add new versions here. // Do not add new versions to a patch release. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index b48bc5a4f3d7..59d8a966076e 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -42,11 +42,12 @@ func _() { _ = x[TenantSettingsTable-31] _ = x[EnablePebbleFormatVersionBlockProperties-32] _ = x[DisableSystemConfigGossipTrigger-33] + _ = x[MVCCIndexBackfiller-34] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTrigger" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenantDeleteCommentsWithDroppedIndexesRemoveIncompatibleDatabasePrivilegesAddRaftAppliedIndexTermMigrationPostAddRaftAppliedIndexTermMigrationDontProposeWriteTimestampForLeaseTransfersTenantSettingsTableEnablePebbleFormatVersionBlockPropertiesDisableSystemConfigGossipTriggerMVCCIndexBackfiller" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642, 674, 710, 742, 778, 820, 839, 879, 911, 930} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/jobs/errors.go b/pkg/jobs/errors.go index 6ec709683cc0..8ad50f3da7ea 100644 --- a/pkg/jobs/errors.go +++ b/pkg/jobs/errors.go @@ -48,6 +48,12 @@ func IsPermanentJobError(err error) bool { return errors.Is(err, errJobPermanentSentinel) } +// IsPauseSelfError checks whether the given error is a +// PauseRequestError. +func IsPauseSelfError(err error) bool { + return errors.Is(err, errPauseSelfSentinel) +} + // errPauseSelfSentinel exists so the errors returned from PauseRequestErr can // be marked with it. var errPauseSelfSentinel = errors.New("job requested it be paused") diff --git a/pkg/sql/add_column.go b/pkg/sql/add_column.go index fcfc13608783..f492056a8c1f 100644 --- a/pkg/sql/add_column.go +++ b/pkg/sql/add_column.go @@ -128,7 +128,7 @@ func (p *planner) addColumnImpl( n.tableDesc.AddColumnMutation(col, descpb.DescriptorMutation_ADD) if idx != nil { - if err := n.tableDesc.AddIndexMutation(idx, descpb.DescriptorMutation_ADD); err != nil { + if err := n.tableDesc.AddIndexMutation(params.ctx, idx, descpb.DescriptorMutation_ADD, params.p.ExecCfg().Settings); err != nil { return err } } diff --git a/pkg/sql/alter_primary_key.go b/pkg/sql/alter_primary_key.go index 9d2a0c8ebab2..546d28d4badb 100644 --- a/pkg/sql/alter_primary_key.go +++ b/pkg/sql/alter_primary_key.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" @@ -238,7 +239,7 @@ func (p *planner) AlterPrimaryKey( } } - if err := tableDesc.AddIndexMutation(newPrimaryIndexDesc, descpb.DescriptorMutation_ADD); err != nil { + if err := tableDesc.AddIndexMutation(ctx, newPrimaryIndexDesc, descpb.DescriptorMutation_ADD, p.ExecCfg().Settings); err != nil { return err } version := p.ExecCfg().Settings.Version.ActiveVersion(ctx) @@ -357,7 +358,7 @@ func (p *planner) AlterPrimaryKey( // Set correct version and encoding type. newUniqueIdx.Version = descpb.PrimaryIndexWithStoredColumnsVersion newUniqueIdx.EncodingType = descpb.SecondaryIndexEncoding - if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newUniqueIdx, newPrimaryIndexDesc); err != nil { + if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newUniqueIdx, newPrimaryIndexDesc, p.ExecCfg().Settings); err != nil { return err } // Copy the old zone configuration into the newly created unique index for PARTITION ALL BY. @@ -484,7 +485,7 @@ func (p *planner) AlterPrimaryKey( newIndex.Name = tabledesc.GenerateUniqueName(basename, nameExists) newIndex.Version = descpb.PrimaryIndexWithStoredColumnsVersion newIndex.EncodingType = descpb.SecondaryIndexEncoding - if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newIndex, newPrimaryIndexDesc); err != nil { + if err := addIndexMutationWithSpecificPrimaryKey(ctx, tableDesc, &newIndex, newPrimaryIndexDesc, p.ExecCfg().Settings); err != nil { return err } @@ -683,17 +684,30 @@ func addIndexMutationWithSpecificPrimaryKey( table *tabledesc.Mutable, toAdd *descpb.IndexDescriptor, primary *descpb.IndexDescriptor, + settings *cluster.Settings, ) error { // Reset the ID so that a call to AllocateIDs will set up the index. toAdd.ID = 0 - if err := table.AddIndexMutation(toAdd, descpb.DescriptorMutation_ADD); err != nil { + if err := table.AddIndexMutation(ctx, toAdd, descpb.DescriptorMutation_ADD, settings); err != nil { return err } if err := table.AllocateIDsWithoutValidation(ctx); err != nil { return err } - // Use the columns in the given primary index to construct this indexes - // KeySuffixColumnIDs list. + + setKeySuffixColumnIDsFromPrimary(toAdd, primary) + if tempIdx := catalog.FindCorrespondingTemporaryIndexByID(table, toAdd.ID); tempIdx != nil { + setKeySuffixColumnIDsFromPrimary(tempIdx.IndexDesc(), primary) + } + + return nil +} + +// setKeySuffixColumnIDsFromPrimary uses the columns in the given +// primary index to construct this toAdd's KeySuffixColumnIDs list. +func setKeySuffixColumnIDsFromPrimary( + toAdd *descpb.IndexDescriptor, primary *descpb.IndexDescriptor, +) { presentColIDs := catalog.MakeTableColSet(toAdd.KeyColumnIDs...) presentColIDs.UnionWith(catalog.MakeTableColSet(toAdd.StoreColumnIDs...)) toAdd.KeySuffixColumnIDs = nil @@ -702,5 +716,4 @@ func addIndexMutationWithSpecificPrimaryKey( toAdd.KeySuffixColumnIDs = append(toAdd.KeySuffixColumnIDs, colID) } } - return nil } diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 2cd6d136d7ff..f5c2d526994d 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -305,7 +305,7 @@ func (n *alterTableNode) startExec(params runParams) error { "index %q being dropped, try again later", d.Name) } } - if err := n.tableDesc.AddIndexMutation(&idx, descpb.DescriptorMutation_ADD); err != nil { + if err := n.tableDesc.AddIndexMutation(params.ctx, &idx, descpb.DescriptorMutation_ADD, params.p.ExecCfg().Settings); err != nil { return err } diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 44fa4050c71c..4e8d49bc216a 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -193,6 +193,7 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error { // mutations. Collect the elements that are part of the mutation. var addedIndexSpans []roachpb.Span var addedIndexes []descpb.IndexID + var temporaryIndexes []descpb.IndexID var constraintsToDrop []catalog.ConstraintToUpdate var constraintsToAddBeforeValidation []catalog.ConstraintToUpdate @@ -247,8 +248,12 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error { // that don't, so preserve the flag if its already been flipped. needColumnBackfill = needColumnBackfill || catalog.ColumnNeedsBackfill(col) } else if idx := m.AsIndex(); idx != nil { - addedIndexSpans = append(addedIndexSpans, tableDesc.IndexSpan(sc.execCfg.Codec, idx.GetID())) - addedIndexes = append(addedIndexes, idx.GetID()) + if idx.IsTemporaryIndexForBackfill() { + temporaryIndexes = append(temporaryIndexes, idx.GetID()) + } else { + addedIndexSpans = append(addedIndexSpans, tableDesc.IndexSpan(sc.execCfg.Codec, idx.GetID())) + addedIndexes = append(addedIndexes, idx.GetID()) + } } else if c := m.AsConstraint(); c != nil { isValidating := c.IsCheck() && c.Check().Validity == descpb.ConstraintValidity_Validating || c.IsForeignKey() && c.ForeignKey().Validity == descpb.ConstraintValidity_Validating || @@ -317,7 +322,7 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error { // Add new indexes. if len(addedIndexSpans) > 0 { // Check if bulk-adding is enabled and supported by indexes (ie non-unique). - if err := sc.backfillIndexes(ctx, version, addedIndexSpans, addedIndexes); err != nil { + if err := sc.backfillIndexes(ctx, version, addedIndexSpans, addedIndexes, temporaryIndexes); err != nil { return err } } @@ -869,6 +874,7 @@ func (sc *SchemaChanger) distIndexBackfill( version descpb.DescriptorVersion, targetSpans []roachpb.Span, addedIndexes []descpb.IndexID, + writeAtRequestTimestamp bool, filter backfill.MutationFilter, ) error { @@ -880,6 +886,7 @@ func (sc *SchemaChanger) distIndexBackfill( // Gather the initial resume spans for the table. var todoSpans []roachpb.Span var mutationIdx int + if err := DescsTxn(ctx, sc.execCfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) { todoSpans, _, mutationIdx, err = rowexec.GetResumeSpans( ctx, sc.jobRegistry, txn, sc.execCfg.Codec, col, sc.descID, sc.mutationID, filter) @@ -975,7 +982,7 @@ func (sc *SchemaChanger) distIndexBackfill( true /* distribute */) indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV) chunkSize := sc.getChunkSize(indexBatchSize) - spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, chunkSize, addedIndexes) + spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, readAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes) if err != nil { return err } @@ -1381,7 +1388,8 @@ func (sc *SchemaChanger) validateIndexes(ctx context.Context) error { break } idx := m.AsIndex() - if idx == nil || idx.Dropped() { + // NB: temporary indexes should be Dropped by the point. + if idx == nil || idx.Dropped() || idx.IsTemporaryIndexForBackfill() { continue } switch idx.GetType() { @@ -1842,6 +1850,79 @@ func ValidateForwardIndexes( // backfillIndexes fills the missing columns in the indexes of the // leased tables. // +// +// If temporaryIndexes is non-empty, we assume that we are using the +// MVCC-compatible backfilling process. This mutation has already been +// checked to ensure all newly added indexes are using one type of +// index backfill. +// +// The MVCC-compatible index backfilling process has a goal of not +// having to issue AddSStable requests with backdated timestamps. +// +// To do this, we backfill new indexes while they are in a BACKFILLING +// state in which they do not see writes or deletes. While the +// backfill is running a temporary index captures all inflight rights. +// +// When the backfill is completed, the backfilling index is stepped up +// to MERGING and then writes and deletes missed during +// the backfill are merged from the temporary index. +// +// Finally, the new index is brought into the DELETE_AND_WRITE_ONLY +// state for validation. +// +// ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +// │ │ │ │ │ │ +// │ PrimaryIndex │ │ NewIndex │ │ TempIndex │ +// t0 │ (PUBLIC) │ │ (BACKFILLING) │ │ (DELETE_ONLY) │ +// │ │ │ │ │ │ +// └─────────────────┘ └─────────────────┘ └────────┬────────┘ +// │ +// ┌────────▼────────┐ +// │ │ +// │ TempIndex │ +// t1 │(DELETE_AND_WRITE) │ +// │ │ │ +// └────────┬────────┘ │ +// │ │ +// ┌─────────────────┐ ┌─────────────────┐ ┌────────▼────────┐ │ TempIndex receiving writes +// │ │ │ │ │ │ │ +// │ PrimaryIndex ├────────►│ NewIndex │ │ TempIndex │ │ +// t2 │ (PUBLIC) │ Backfill│ (BACKFILLING) │ │(DELETE_AND_WRITE│ │ +// │ │ │ │ │ │ │ +// └─────────────────┘ └────────┬────────┘ └─────────────────┘ │ +// │ │ +// ┌────────▼────────┐ │ +// │ │ │ +// │ NewIndex │ │ +// t3 │ (DELETE_ONLY) │ │ +// │ │ │ +// └────────┬────────┘ │ +// │ │ +// ┌────────▼────────┐ │ +// │ │ │ +// │ NewIndex │ │ │ +// │ (MERGING) │ │ │ +// t4 │ │ │ │ NewIndex receiving writes +// └─────────────────┘ │ │ +// │ │ +// ┌─────────────────┐ ┌─────────────────┐ │ │ +// │ │ │ │ │ │ +// │ NewIndex │◄────────────┤ TempIndex │ │ │ +// t5 │ (MERGING) │ BatchMerge │(DELETE_AND_WRITE│ │ │ +// │ │ │ │ │ │ +// └────────┬────────┘ └───────┬─────────┘ │ │ +// │ │ │ │ +// ┌────────▼────────┐ ┌───────▼─────────┐ │ │ +// │ │ │ │ │ │ +// │ NewIndex │ │ TempIndex │ │ +// t6 │(DELETE_AND_WRITE) │ (DELETE_ONLY) │ │ +// │ │ │ │ │ +// └───────┬─────────┘ └───────┬─────────┘ │ +// │ │ +// │ │ +// ▼ ▼ +// [validate and make public] [ dropped ] +// // This operates over multiple goroutines concurrently and is thus not // able to reuse the original kv.Txn safely. func (sc *SchemaChanger) backfillIndexes( @@ -1849,12 +1930,13 @@ func (sc *SchemaChanger) backfillIndexes( version descpb.DescriptorVersion, addingSpans []roachpb.Span, addedIndexes []descpb.IndexID, + temporaryIndexes []descpb.IndexID, ) error { - log.Infof(ctx, "backfilling %d indexes", len(addingSpans)) - - if fn := sc.testingKnobs.RunBeforeIndexBackfill; fn != nil { - fn() - } + // If temporary indexes is non-empty, we want a MVCC-compliant + // backfill. If it is empty, we assume this is an older schema + // change using the non-MVCC-compliant flow. + writeAtRequestTimestamp := len(temporaryIndexes) != 0 + log.Infof(ctx, "backfilling %d indexes: %v (writeAtRequestTimestamp: %v)", len(addingSpans), addingSpans, writeAtRequestTimestamp) // Split off a new range for each new index span. But only do so for the // system tenant. Secondary tenants do not have mandatory split points @@ -1868,16 +1950,137 @@ func (sc *SchemaChanger) backfillIndexes( } } + if fn := sc.testingKnobs.RunBeforeIndexBackfill; fn != nil { + fn() + } + + // NB: The index backfilling process and index merging process + // use different ResumeSpans to track their progress, so it is + // safe to pass addedIndexes here even if the merging has + // already started. if err := sc.distIndexBackfill( - ctx, version, addingSpans, addedIndexes, backfill.IndexMutationFilter, + ctx, version, addingSpans, addedIndexes, writeAtRequestTimestamp, backfill.IndexMutationFilter, ); err != nil { return err } + if writeAtRequestTimestamp { + if fn := sc.testingKnobs.RunBeforeTempIndexMerge; fn != nil { + fn() + } + + // Steps backfilled adding indexes from BACKFILLING to + // MERGING. + if err := sc.RunStateMachineAfterIndexBackfill(ctx); err != nil { + return err + } + + if err := sc.mergeFromTemporaryIndex(ctx, version, addedIndexes, temporaryIndexes); err != nil { + return err + } + + if fn := sc.testingKnobs.RunAfterTempIndexMerge; fn != nil { + fn() + } + + if err := sc.runStateMachineAfterTempIndexMerge(ctx); err != nil { + return err + } + } + + if fn := sc.testingKnobs.RunAfterIndexBackfill; fn != nil { + fn() + } + log.Info(ctx, "finished backfilling indexes") return sc.validateIndexes(ctx) } +func (sc *SchemaChanger) mergeFromTemporaryIndex( + ctx context.Context, + version descpb.DescriptorVersion, + addingIndexes []descpb.IndexID, + temporaryIndexes []descpb.IndexID, +) error { + var tbl *tabledesc.Mutable + if err := sc.txn(ctx, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + var err error + tbl, err = descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) + return err + }); err != nil { + return err + } + table := tabledesc.NewBuilder(&tbl.ClusterVersion).BuildImmutableTable() + for i, addIdx := range addingIndexes { + tempIdx := temporaryIndexes[i] + log.Infof(ctx, "merging from %d -> %d on %v", tempIdx, addIdx, table) + sourceSpan := table.IndexSpan(sc.execCfg.Codec, tempIdx) + err := sc.Merge(ctx, sc.execCfg.Codec, table, tempIdx, addIdx, sourceSpan) + if err != nil { + return err + } + } + return nil +} + +// runStateMachineAfterTempIndexMerge steps any DELETE_AND_WRITE_ONLY +// temporary indexes to DELETE_ONLY and changes their direction to +// DROP and steps any MERGING indexes to DELETE_AND_WRITE_ONLY +func (sc *SchemaChanger) runStateMachineAfterTempIndexMerge(ctx context.Context) error { + var runStatus jobs.RunningStatus + return sc.txn(ctx, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + tbl, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) + if err != nil { + return err + } + runStatus = "" + // Apply mutations belonging to the same version. + for _, m := range tbl.AllMutations() { + if m.MutationID() != sc.mutationID { + // Mutations are applied in a FIFO order. Only apply the first set of + // mutations if they have the mutation ID we're looking for. + break + } + idx := m.AsIndex() + if idx == nil { + // Don't touch anything but indexes. + continue + } + + if idx.IsTemporaryIndexForBackfill() && m.Adding() && m.WriteAndDeleteOnly() { + log.Infof(ctx, "dropping temporary index: %d", idx.IndexDesc().ID) + tbl.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_DELETE_ONLY + tbl.Mutations[m.MutationOrdinal()].Direction = descpb.DescriptorMutation_DROP + runStatus = RunningStatusDeleteOnly + } else if m.Merging() { + tbl.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + } + } + if runStatus == "" || tbl.Dropped() { + return nil + } + if err := descsCol.WriteDesc( + ctx, true /* kvTrace */, tbl, txn, + ); err != nil { + return err + } + if sc.job != nil { + if err := sc.job.RunningStatus(ctx, txn, func( + ctx context.Context, details jobspb.Details, + ) (jobs.RunningStatus, error) { + return runStatus, nil + }); err != nil { + return errors.Wrap(err, "failed to update job status") + } + } + return nil + }) +} + // truncateAndBackfillColumns performs the backfill operation on the given leased // table descriptors. // @@ -1959,6 +2162,13 @@ func runSchemaChangesInTxn( continue } + // Skip mutations related to temporary mutations since + // an index creation inside a transaction doesn't use + // the AddSSTable based backfiller. + if idx := m.AsIndex(); idx != nil && idx.IsTemporaryIndexForBackfill() { + continue + } + immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable() if m.Adding() { diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 527f2554a51a..4ba3887d1ab3 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -63,7 +63,8 @@ func ColumnMutationFilter(m catalog.Mutation) bool { // IndexMutationFilter is a filter that allows mutations that add indexes. func IndexMutationFilter(m catalog.Mutation) bool { - return m.AsIndex() != nil && m.Adding() + idx := m.AsIndex() + return idx != nil && !idx.IsTemporaryIndexForBackfill() && m.Adding() } // ColumnBackfiller is capable of running a column backfill for all diff --git a/pkg/sql/catalog/table_elements.go b/pkg/sql/catalog/table_elements.go index 4f1440b78e6f..d1719dfd07e3 100644 --- a/pkg/sql/catalog/table_elements.go +++ b/pkg/sql/catalog/table_elements.go @@ -220,6 +220,11 @@ type Index interface { // It is derived from the statement time at which the relevant statement // was issued. CreatedAt() time.Time + + // IsTemporaryIndexForBackfill() returns true iff the index is + // an index being used as the temporary index being used by an + // in-progress index backfill. + IsTemporaryIndexForBackfill() bool } // Column is an interface around the column descriptor types. @@ -680,6 +685,43 @@ func FindDeleteOnlyNonPrimaryIndex(desc TableDescriptor, test func(idx Index) bo return findIndex(desc.DeleteOnlyNonPrimaryIndexes(), test) } +// FindCorrespondingTemporaryIndexByID finds the temporary index that +// corresponds to the currently mutated index identified by ID. It +// assumes that the temporary index for a given index ID exists +// directly after it in the mutations array. +// +// Callers should take care that AllocateIDs() has been called before +// using this function. +func FindCorrespondingTemporaryIndexByID(desc TableDescriptor, id descpb.IndexID) Index { + mutations := desc.AllMutations() + var ord int + for _, m := range mutations { + idx := m.AsIndex() + if idx != nil && idx.IndexDesc().ID == id { + // We want the mutation after this mutation + // since the temporary index is added directly + // after. + ord = m.MutationOrdinal() + 1 + } + } + + // A temporary index will never be found at index 0 since we + // always add them _after_ the index they correspond to. + if ord == 0 { + return nil + } + + if len(mutations) >= ord+1 { + candidateMutation := mutations[ord] + if idx := candidateMutation.AsIndex(); idx != nil { + if idx.IsTemporaryIndexForBackfill() { + return idx + } + } + } + return nil +} + // UserDefinedTypeColsHaveSameVersion returns whether one table descriptor's // columns with user defined type metadata have the same versions of metadata // as in the other descriptor. Note that this function is only valid on two diff --git a/pkg/sql/catalog/tabledesc/BUILD.bazel b/pkg/sql/catalog/tabledesc/BUILD.bazel index 81ce91234e10..e049819464a2 100644 --- a/pkg/sql/catalog/tabledesc/BUILD.bazel +++ b/pkg/sql/catalog/tabledesc/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//pkg/geo/geoindex", "//pkg/keys", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", "//pkg/sql/catalog/catconstants", diff --git a/pkg/sql/catalog/tabledesc/index.go b/pkg/sql/catalog/tabledesc/index.go index e8c57d7cedd0..1c9aced84586 100644 --- a/pkg/sql/catalog/tabledesc/index.go +++ b/pkg/sql/catalog/tabledesc/index.go @@ -360,6 +360,16 @@ func (w index) CreatedAt() time.Time { return timeutil.Unix(0, w.desc.CreatedAtNanos) } +// IsTemporaryIndexForBackfill() returns true iff the index is +// an index being used as the temporary index being used by an +// in-progress index backfill. +// +// TODO(ssd): This could be its own boolean or we could store the ID +// of the index it is a temporary index for. +func (w index) IsTemporaryIndexForBackfill() bool { + return w.desc.UseDeletePreservingEncoding +} + // partitioning is the backing struct for a catalog.Partitioning interface. type partitioning struct { desc *catpb.PartitioningDescriptor diff --git a/pkg/sql/catalog/tabledesc/index_test.go b/pkg/sql/catalog/tabledesc/index_test.go index 20754df27620..db64510da33b 100644 --- a/pkg/sql/catalog/tabledesc/index_test.go +++ b/pkg/sql/catalog/tabledesc/index_test.go @@ -460,7 +460,7 @@ func TestLatestIndexDescriptorVersionValues(t *testing.T) { switch desc.GetName() { case "t": - require.Equal(t, 6, len(nonPrimaries)) + require.Equal(t, 10, len(nonPrimaries)) for _, np := range nonPrimaries { switch np.GetName() { case "tsec": @@ -493,6 +493,31 @@ func TestLatestIndexDescriptorVersionValues(t *testing.T) { require.Equal(t, descpb.SecondaryIndexEncoding, np.GetEncodingType()) require.Equal(t, descpb.PrimaryIndexWithStoredColumnsVersion, np.GetVersion()) + case "t_a_crdb_internal_dpe_key": + // Temporary index for new index based on old primary index (t_a_key) + require.True(t, np.IsMutation()) + require.Equal(t, descpb.SecondaryIndexEncoding, np.GetEncodingType()) + require.Equal(t, descpb.PrimaryIndexWithStoredColumnsVersion, np.GetVersion()) + + case "t_b_crdb_internal_dpe_idx": + // Temporary index for tsec_rewrite_for_primary_key_change + require.True(t, np.IsMutation()) + require.Equal(t, descpb.SecondaryIndexEncoding, np.GetEncodingType()) + require.Equal(t, descpb.PrimaryIndexWithStoredColumnsVersion, np.GetVersion()) + + case "t_c_crdb_internal_dpe_key": + // Temporary index for t_c_key_rewrite_for_primary_key_change + require.True(t, np.IsMutation()) + require.True(t, np.IsUnique()) + require.Equal(t, descpb.SecondaryIndexEncoding, np.GetEncodingType()) + require.Equal(t, descpb.PrimaryIndexWithStoredColumnsVersion, np.GetVersion()) + + case "t_d_crdb_internal_dpe_key": + // Temporary index for new_primary_key + require.True(t, np.IsMutation()) + require.Equal(t, descpb.PrimaryIndexEncoding, np.GetEncodingType()) + require.Equal(t, descpb.PrimaryIndexWithStoredColumnsVersion, np.GetVersion()) + default: t.Fatalf("unexpected index or index mutation %q", np.GetName()) } diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 1609a4a80899..1edc9fe12063 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" @@ -66,6 +67,16 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column") // ErrMissingPrimaryKey indicates a table with no primary key. var ErrMissingPrimaryKey = errors.New("table must contain a primary key") +// UseMVCCCompliantIndexCreation controls whether index additions will +// use the MVCC compliant scheme which requires both temporary indexes +// and a different initial state. +var UseMVCCCompliantIndexCreation = settings.RegisterBoolSetting( + settings.TenantWritable, + "sql.mvcc_compliant_index_creation.enabled", + "if true, schema changes will use the an index backfiller designed for MVCC-compliant bulk operations", + true, +) + // DescriptorType returns the type of this descriptor. func (desc *wrapper) DescriptorType() catalog.DescriptorType { return catalog.Table @@ -168,6 +179,14 @@ func BuildIndexName(tableDesc *Mutable, idx *descpb.IndexDescriptor) (string, er segments = append(segments, segmentName) } + // Add a segment for delete preserving indexes so that + // temporary indexes used by the index backfiller are easily + // identifiable and so that we don't cause too many changes in + // the index names generated by a series of operations. + if idx.UseDeletePreservingEncoding { + segments = append(segments, "crdb_internal_dpe") + } + // Add the final segment. if idx.Unique { segments = append(segments, "key") @@ -1680,7 +1699,7 @@ func (desc *Mutable) MakeMutationComplete(m descpb.DescriptorMutation) error { primaryIndexCopy := desc.GetPrimaryIndex().IndexDescDeepCopy() // Move the old primary index from the table descriptor into the mutations queue // to schedule it for deletion. - if err := desc.AddIndexMutation(&primaryIndexCopy, descpb.DescriptorMutation_DROP); err != nil { + if err := desc.AddDropIndexMutation(&primaryIndexCopy); err != nil { return err } @@ -1741,7 +1760,7 @@ func (desc *Mutable) MakeMutationComplete(m descpb.DescriptorMutation) error { desc.RemovePublicNonPrimaryIndex(oldIndexIdx) // Add a drop mutation for the old index. The code that calls this function will schedule // a schema change job to pick up all of these index drop mutations. - if err := desc.AddIndexMutation(oldIndexCopy, descpb.DescriptorMutation_DROP); err != nil { + if err := desc.AddDropIndexMutation(oldIndexCopy); err != nil { return err } } @@ -2009,11 +2028,60 @@ func (desc *Mutable) AddColumnMutation( desc.addMutation(m) } +// AddDropIndexMutation adds a a dropping index mutation for the given +// index descriptor. +func (desc *Mutable) AddDropIndexMutation(idx *descpb.IndexDescriptor) error { + if err := desc.checkValidIndex(idx); err != nil { + return err + } + m := descpb.DescriptorMutation{ + Descriptor_: &descpb.DescriptorMutation_Index{Index: idx}, + Direction: descpb.DescriptorMutation_DROP, + } + desc.addMutation(m) + return nil +} + // AddIndexMutation adds an index mutation to desc.Mutations. func (desc *Mutable) AddIndexMutation( + ctx context.Context, + idx *descpb.IndexDescriptor, + direction descpb.DescriptorMutation_Direction, + settings *cluster.Settings, +) error { + if !settings.Version.IsActive(ctx, clusterversion.MVCCIndexBackfiller) || !UseMVCCCompliantIndexCreation.Get(&settings.SV) { + return desc.DeprecatedAddIndexMutation(idx, direction) + } + + if err := desc.checkValidIndex(idx); err != nil { + return err + } + m := descpb.DescriptorMutation{ + Descriptor_: &descpb.DescriptorMutation_Index{Index: idx}, + Direction: direction, + } + desc.addMutation(m) + return nil +} + +// DeprecatedAddIndexMutation adds an index mutation to desc.Mutations that +// assumes that the first state an added index should be placed into +// is DELETE_ONLY rather than BACKFILLING. +func (desc *Mutable) DeprecatedAddIndexMutation( idx *descpb.IndexDescriptor, direction descpb.DescriptorMutation_Direction, ) error { + if err := desc.checkValidIndex(idx); err != nil { + return err + } + m := descpb.DescriptorMutation{ + Descriptor_: &descpb.DescriptorMutation_Index{Index: idx}, + Direction: direction, + } + desc.deprecatedAddMutation(m) + return nil +} +func (desc *Mutable) checkValidIndex(idx *descpb.IndexDescriptor) error { switch idx.Type { case descpb.IndexDescriptor_FORWARD: if err := checkColumnsValidForIndex(desc, idx.KeyColumnNames); err != nil { @@ -2024,12 +2092,6 @@ func (desc *Mutable) AddIndexMutation( return err } } - - m := descpb.DescriptorMutation{ - Descriptor_: &descpb.DescriptorMutation_Index{Index: idx}, - Direction: direction, - } - desc.addMutation(m) return nil } @@ -2064,11 +2126,51 @@ func (desc *Mutable) AddComputedColumnSwapMutation(swap *descpb.ComputedColumnSw func (desc *Mutable) addMutation(m descpb.DescriptorMutation) { switch m.Direction { case descpb.DescriptorMutation_ADD: - m.State = descpb.DescriptorMutation_DELETE_ONLY + switch m.Descriptor_.(type) { + case *descpb.DescriptorMutation_Index: + m.State = descpb.DescriptorMutation_BACKFILLING + default: + m.State = descpb.DescriptorMutation_DELETE_ONLY + } + case descpb.DescriptorMutation_DROP: + m.State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY + } + desc.addMutationWithNextID(m) + // If we are adding an index, we add another mutation for the + // temporary index used by the index backfiller. + // + // The index backfiller code currently assumes that it can + // always find the temporary indexes in the Mutations array, + // in same order as the adding indexes. + if idxMut, ok := m.Descriptor_.(*descpb.DescriptorMutation_Index); ok { + if m.Direction == descpb.DescriptorMutation_ADD { + tempIndex := *protoutil.Clone(idxMut.Index).(*descpb.IndexDescriptor) + tempIndex.UseDeletePreservingEncoding = true + tempIndex.ID = 0 + tempIndex.Name = "" + m2 := descpb.DescriptorMutation{ + Descriptor_: &descpb.DescriptorMutation_Index{Index: &tempIndex}, + Direction: descpb.DescriptorMutation_ADD, + State: descpb.DescriptorMutation_DELETE_ONLY, + } + desc.addMutationWithNextID(m2) + } + } +} +// deprecatedAddMutation assumes that new indexes are added in the +// DELETE_ONLY state. +func (desc *Mutable) deprecatedAddMutation(m descpb.DescriptorMutation) { + switch m.Direction { + case descpb.DescriptorMutation_ADD: + m.State = descpb.DescriptorMutation_DELETE_ONLY case descpb.DescriptorMutation_DROP: m.State = descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY } + desc.addMutationWithNextID(m) +} + +func (desc *Mutable) addMutationWithNextID(m descpb.DescriptorMutation) { // For tables created in the same transaction the next mutation ID will // not have been allocated and the added mutation will use an invalid ID. // This is fine because the mutation will be processed immediately. diff --git a/pkg/sql/create_index.go b/pkg/sql/create_index.go index f144728f92ed..4da4f752523c 100644 --- a/pkg/sql/create_index.go +++ b/pkg/sql/create_index.go @@ -648,13 +648,14 @@ func (n *createIndexNode) startExec(params runParams) error { } mutationIdx := len(n.tableDesc.Mutations) - if err := n.tableDesc.AddIndexMutation(indexDesc, descpb.DescriptorMutation_ADD); err != nil { + if err := n.tableDesc.AddIndexMutation(params.ctx, indexDesc, descpb.DescriptorMutation_ADD, params.p.ExecCfg().Settings); err != nil { return err } version := params.ExecCfg().Settings.Version.ActiveVersion(params.ctx) if err := n.tableDesc.AllocateIDs(params.ctx, version); err != nil { return err } + if err := params.p.configureZoneConfigForNewIndexPartitioning( params.ctx, n.tableDesc, @@ -759,13 +760,19 @@ func (p *planner) configureZoneConfigForNewIndexPartitioning( if err != nil { return err } + + indexIDs := []descpb.IndexID{indexDesc.ID} + if idx := catalog.FindCorrespondingTemporaryIndexByID(tableDesc, indexDesc.ID); idx != nil { + indexIDs = append(indexIDs, idx.GetID()) + } + if err := ApplyZoneConfigForMultiRegionTable( ctx, p.txn, p.ExecCfg(), regionConfig, tableDesc, - applyZoneConfigForMultiRegionTableOptionNewIndexes(indexDesc.ID), + applyZoneConfigForMultiRegionTableOptionNewIndexes(indexIDs...), ); err != nil { return err } diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index a29394ea0b0e..252ed1f657fb 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -57,16 +57,15 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) params, _ := tests.CreateTestServerParams() - startBackfill := make(chan bool) - atBackfillStage := make(chan bool) + mergeFinished := make(chan struct{}) + completeSchemaChange := make(chan struct{}) errorChan := make(chan error, 1) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - RunBeforeIndexBackfill: func() { - // Wait until we get a signal to begin backfill. - atBackfillStage <- true - <-startBackfill + RunAfterTempIndexMerge: func() { + mergeFinished <- struct{}{} + <-completeSchemaChange }, }, // Disable backfill migrations, we still need the jobs table migration. @@ -93,38 +92,22 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { finishedSchemaChange.Done() }() + <-mergeFinished - <-atBackfillStage - // Find the descriptors for the indices. + // Find the descriptor for the temporary index mutation. codec := keys.SystemSQLCodec tableDesc := desctestutils.TestingGetMutableExistingTableDescriptor(kvDB, codec, "d", "t") var index *descpb.IndexDescriptor - var ord int - for idx, i := range tableDesc.Mutations { - if i.GetIndex() != nil { + for _, i := range tableDesc.Mutations { + if i.GetIndex() != nil && i.GetIndex().UseDeletePreservingEncoding == deletePreservingEncoding { index = i.GetIndex() - ord = idx + break } } - if index == nil { return nil, nil, errors.Newf("Could not find index mutation") } - if deletePreservingEncoding { - // Mutate index descriptor to use the delete-preserving encoding. - index.UseDeletePreservingEncoding = true - tableDesc.Mutations[ord].Descriptor_ = &descpb.DescriptorMutation_Index{Index: index} - - if err := kvDB.Put( - context.Background(), - catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()), - tableDesc.DescriptorProto(), - ); err != nil { - return nil, nil, err - } - } - // Make some transactions. now := kvDB.Clock().Now() if _, err := sqlDB.Exec(dataSQL); err != nil { @@ -132,7 +115,7 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { } end := kvDB.Clock().Now() - // Grab the revision histories for both indices. + // Grab the revision histories for the index. prefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.ID) prefixEnd := append(prefix, []byte("\xff")...) @@ -141,7 +124,7 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { return nil, nil, err } - startBackfill <- true + completeSchemaChange <- struct{}{} finishedSchemaChange.Wait() if err := <-errorChan; err != nil { t.Logf("Schema change with delete_preserving=%v encountered an error: %s, continuing...", deletePreservingEncoding, err) @@ -219,16 +202,13 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { if err := resetTestData(); err != nil { t.Fatalf("error while resetting test data %s", err) } - delEncRevisions, delEncPrefix, err := getRevisionsForTest(test.setupSQL, test.schemaChangeSQL, test.dataSQL, true) if err != nil { t.Fatalf("error while getting delete encoding revisions %s", err) } - if err := resetTestData(); err != nil { t.Fatalf("error while resetting test data %s", err) } - defaultRevisions, defaultPrefix, err := getRevisionsForTest(test.setupSQL, test.schemaChangeSQL, test.dataSQL, false) if err != nil { t.Fatalf("error while getting default revisions %s", err) @@ -682,7 +662,8 @@ func TestMergeProcess(t *testing.T) { tableDesc, srcIndex.GetID(), dstIndex.GetID(), - tableDesc.IndexSpan(codec, srcIndex.GetID())); err != nil { + tableDesc.IndexSpan(codec, srcIndex.GetID()), + ); err != nil { t.Fatal(err) } diff --git a/pkg/sql/descriptor_mutation_test.go b/pkg/sql/descriptor_mutation_test.go index ce88508d62ac..5eb633056731 100644 --- a/pkg/sql/descriptor_mutation_test.go +++ b/pkg/sql/descriptor_mutation_test.go @@ -1177,15 +1177,19 @@ CREATE TABLE t.test (k CHAR PRIMARY KEY, v CHAR UNIQUE); state descpb.DescriptorMutation_State }{ {"d", 1, descpb.DescriptorMutation_DELETE_ONLY}, - {"test_d_key", 1, descpb.DescriptorMutation_DELETE_ONLY}, + {"test_d_key", 1, descpb.DescriptorMutation_BACKFILLING}, + {"test_d_crdb_internal_dpe_key", 1, descpb.DescriptorMutation_DELETE_ONLY}, {"e", 1, descpb.DescriptorMutation_DELETE_ONLY}, - {"test_e_key", 1, descpb.DescriptorMutation_DELETE_ONLY}, + {"test_e_key", 1, descpb.DescriptorMutation_BACKFILLING}, + {"test_e_crdb_internal_dpe_key", 1, descpb.DescriptorMutation_DELETE_ONLY}, {"f", 1, descpb.DescriptorMutation_DELETE_ONLY}, // Second schema change. {"g", 2, descpb.DescriptorMutation_DELETE_ONLY}, - {"idx_f", 2, descpb.DescriptorMutation_DELETE_ONLY}, + {"idx_f", 2, descpb.DescriptorMutation_BACKFILLING}, + {"test_f_crdb_internal_dpe_key", 2, descpb.DescriptorMutation_DELETE_ONLY}, // Third. - {"idx_g", 3, descpb.DescriptorMutation_DELETE_ONLY}, + {"idx_g", 3, descpb.DescriptorMutation_BACKFILLING}, + {"test_g_crdb_internal_dpe_key", 3, descpb.DescriptorMutation_DELETE_ONLY}, // Drop mutations start off in the DELETE_AND_WRITE_ONLY state. // UNIQUE column deletion gets split into two mutations with the same ID. {"test_v_key", 4, descpb.DescriptorMutation_DELETE_AND_WRITE_ONLY}, diff --git a/pkg/sql/distsql_plan_backfill.go b/pkg/sql/distsql_plan_backfill.go index fe75a1297edb..6f559ef4fd99 100644 --- a/pkg/sql/distsql_plan_backfill.go +++ b/pkg/sql/distsql_plan_backfill.go @@ -36,16 +36,18 @@ func initColumnBackfillerSpec( func initIndexBackfillerSpec( desc descpb.TableDescriptor, writeAsOf, readAsOf hlc.Timestamp, + writeAtRequestTimestamp bool, chunkSize int64, indexesToBackfill []descpb.IndexID, ) (execinfrapb.BackfillerSpec, error) { return execinfrapb.BackfillerSpec{ - Table: desc, - WriteAsOf: writeAsOf, - ReadAsOf: readAsOf, - Type: execinfrapb.BackfillerSpec_Index, - ChunkSize: chunkSize, - IndexesToBackfill: indexesToBackfill, + Table: desc, + WriteAsOf: writeAsOf, + WriteAtRequestTimestamp: writeAtRequestTimestamp, + ReadAsOf: readAsOf, + Type: execinfrapb.BackfillerSpec_Index, + ChunkSize: chunkSize, + IndexesToBackfill: indexesToBackfill, }, nil } diff --git a/pkg/sql/drop_index.go b/pkg/sql/drop_index.go index 53365249c85e..302dc675b8d2 100644 --- a/pkg/sql/drop_index.go +++ b/pkg/sql/drop_index.go @@ -517,7 +517,7 @@ func (p *planner) dropIndexByName( // contain the same field any more due to other schema changes // intervening since the initial lookup. So we send the recent // copy idxEntry for drop instead. - if err := tableDesc.AddIndexMutation(&idxEntry, descpb.DescriptorMutation_DROP); err != nil { + if err := tableDesc.AddDropIndexMutation(&idxEntry); err != nil { return err } tableDesc.RemovePublicNonPrimaryIndex(idxOrdinal) diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index c1a5e91de1ef..10560eb2c878 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -489,7 +489,17 @@ func TestDropIndex(t *testing.T) { }) testutils.SucceedsSoon(t, func() error { - return jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + Username: security.RootUserName(), + Description: `GC for temporary index used during index backfill`, + DescriptorIDs: descpb.IDs{ + tableDesc.GetID(), + }, + }); err != nil { + return err + } + + return jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUserName(), Description: `GC for DROP INDEX t.public.kv@foo`, DescriptorIDs: descpb.IDs{ @@ -765,7 +775,7 @@ func TestDropTableDeleteData(t *testing.T) { // Ensure that the job is marked as succeeded. testutils.SucceedsSoon(t, func() error { - return jobutils.VerifySystemJob(t, sqlRun, i, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + return jobutils.VerifySystemJob(t, sqlRun, (i*2)+1, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUserName(), Description: fmt.Sprintf(`GC for DROP TABLE t.public.%s`, descs[i].GetName()), DescriptorIDs: descpb.IDs{ diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index e9a8632c9029..557713708c66 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -74,6 +74,15 @@ message BackfillerSpec { reserved 6; optional int32 initial_splits = 11 [(gogoproto.nullable) = false]; + + // WriteAtRequestTimestamp controls the corresponding AddSSTable request + // option which updates all MVCC timestamps in the SST to the request + // timestamp, even if the request gets pushed. This ensures the writes + // comply with the timestamp cache and closed timestamp. + // + // Note that older nodes do not respect this flag so callers should + // check MVCCAddSSTable before setting this option. + optional bool write_at_request_timestamp = 12 [(gogoproto.nullable) = false]; } // JobProgress identifies the job to report progress on. This reporting @@ -161,10 +170,10 @@ message ReadImportDataSpec { message StreamIngestionDataSpec { reserved 1; - + // StreamID is the ID of the stream (which is shared across the producer and consumer). optional uint64 stream_id = 5 [(gogoproto.nullable) = false, (gogoproto.customname) = "StreamID"]; - + // PartitionSpecs specify how to subscribe to the i'th partition. repeated string partition_ids = 6; // PartitionSpecs specify how to subscribe to the i'th partition. @@ -172,14 +181,14 @@ message StreamIngestionDataSpec { // PartitionAddresses locate the partitions that produce events to be // ingested. We don't set the casttype to avoid depending on ccl packages. repeated string partition_addresses = 8; - + // The processor will ingest events from StartTime onwards. optional util.hlc.Timestamp start_time = 2 [(gogoproto.nullable) = false]; // StreamAddress locate the stream so that a stream client can be initialized. optional string stream_address = 3 [(gogoproto.nullable) = false]; // JobID is the job ID of the stream ingestion job. optional int64 job_id = 4 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; - + } message StreamIngestionFrontierSpec { diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index f2a1ffdfaa4e..ec90367fc2de 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -173,7 +173,7 @@ func (ib *IndexBackfillPlanner) plan( // TODO(ajwerner): Adopt util.ConstantWithMetamorphicTestRange for the // batch size. Also plumb in a testing knob. chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV) - spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, chunkSize, indexesToBackfill) + spec, err := initIndexBackfillerSpec(*td.TableDesc(), writeAsOf, readAsOf, false /* writeAtRequestTimestamp */, chunkSize, indexesToBackfill) if err != nil { return err } diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 0f718b1020bb..214835487022 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/backfill" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -61,6 +62,9 @@ func TestIndexBackfiller(t *testing.T) { moveToTScan := make(chan bool) moveToBackfill := make(chan bool) + moveToTMerge := make(chan bool) + backfillDone := make(chan bool) + params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ RunBeforePublishWriteAndDelete: func() { @@ -74,6 +78,10 @@ func TestIndexBackfiller(t *testing.T) { <-moveToTScan return nil }, + RunBeforeTempIndexMerge: func() { + backfillDone <- true + <-moveToTMerge + }, RunBeforeIndexBackfill: func() { // Wait until we get a signal to begin backfill. <-moveToBackfill @@ -104,7 +112,6 @@ func TestIndexBackfiller(t *testing.T) { // The sequence of events here exactly matches the test cases in // docs/tech-notes/index-backfill.md. If you update this, please remember to // update the tech note as well. - execOrFail("CREATE DATABASE t") execOrFail("CREATE TABLE t.kv (k int PRIMARY KEY, v char)") execOrFail("INSERT INTO t.kv VALUES (1, 'a'), (3, 'c'), (4, 'e'), (6, 'f'), (7, 'g'), (9, 'h')") @@ -117,16 +124,21 @@ func TestIndexBackfiller(t *testing.T) { finishedSchemaChange.Done() }() - // Wait until the schema change has moved the cluster into DELETE_ONLY mode. + // tempIndex: DELETE_ONLY + // newIndex BACKFILLING <-moveToTDelete - execOrFail("DELETE FROM t.kv WHERE k=9") - execOrFail("INSERT INTO t.kv VALUES (9, 'h')") + execOrFail("DELETE FROM t.kv WHERE k=9") // new_index: nothing, temp_index: sees delete + execOrFail("INSERT INTO t.kv VALUES (9, 'h')") // new_index: nothing, temp_index: nothing // Move to WRITE_ONLY mode. + // tempIndex: DELETE_AND_WRITE_ONLY + // newIndex BACKFILLING moveToTWrite <- true - execOrFail("INSERT INTO t.kv VALUES (2, 'b')") + execOrFail("INSERT INTO t.kv VALUES (2, 'b')") // new_index: nothing, temp_index: sees insert // Pick our scan timestamp. + // tempIndex: DELETE_AND_WRITE_ONLY + // newIndex BACKFILLING moveToTScan <- true execOrFail("UPDATE t.kv SET v = 'd' WHERE k = 3") execOrFail("UPDATE t.kv SET k = 5 WHERE v = 'e'") @@ -135,6 +147,10 @@ func TestIndexBackfiller(t *testing.T) { // Begin the backfill. moveToBackfill <- true + <-backfillDone + execOrFail("INSERT INTO t.kv VALUES (10, 'z')") // new_index: nothing, temp_index: sees insert + moveToTMerge <- true + finishedSchemaChange.Wait() pairsPrimary := queryPairs(t, sqlDB, "SELECT k, v FROM t.kv ORDER BY k ASC") @@ -206,7 +222,7 @@ func TestIndexBackfillerComputedAndGeneratedColumns(t *testing.T) { // setupDesc should mutate the descriptor such that the mutation with // id 1 contains an index backfill. - setupDesc func(t *testing.T, mut *tabledesc.Mutable) + setupDesc func(t *testing.T, ctx context.Context, mut *tabledesc.Mutable, settings *cluster.Settings) indexToBackfill descpb.IndexID expectedContents [][]string } @@ -228,7 +244,7 @@ INSERT INTO foo VALUES (1, 2), (2, 3), (3, 4); {"2", "7"}, {"3", "13"}, }, - setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + setupDesc: func(t *testing.T, ctx context.Context, mut *tabledesc.Mutable, settings *cluster.Settings) { indexToBackfill := descpb.IndexDescriptor{ Name: "virtual_column_backed_index", ID: mut.NextIndexID, @@ -252,9 +268,10 @@ INSERT INTO foo VALUES (1, 2), (2, 3), (3, 4); } mut.NextIndexID++ mut.NextConstraintID++ - require.NoError(t, mut.AddIndexMutation( - &indexToBackfill, descpb.DescriptorMutation_ADD, + require.NoError(t, mut.AddIndexMutation(ctx, + &indexToBackfill, descpb.DescriptorMutation_ADD, settings, )) + require.NoError(t, mut.AllocateIDs(context.Background(), settings.Version.ActiveVersion(ctx))) }, }, // This test will inject a new primary index and perform a primary key swap @@ -273,7 +290,7 @@ INSERT INTO foo VALUES (1), (10), (100); {"10", "42", "52"}, {"100", "42", "142"}, }, - setupDesc: func(t *testing.T, mut *tabledesc.Mutable) { + setupDesc: func(t *testing.T, ctx context.Context, mut *tabledesc.Mutable, settings *cluster.Settings) { columnWithDefault := descpb.ColumnDescriptor{ Name: "def", ID: mut.NextColumnID, @@ -337,9 +354,10 @@ INSERT INTO foo VALUES (1), (10), (100); } mut.NextIndexID++ mut.NextConstraintID++ - require.NoError(t, mut.AddIndexMutation( - &indexToBackfill, descpb.DescriptorMutation_ADD, + require.NoError(t, mut.AddIndexMutation(ctx, + &indexToBackfill, descpb.DescriptorMutation_ADD, settings, )) + require.NoError(t, mut.AllocateIDs(context.Background(), settings.Version.ActiveVersion(ctx))) mut.AddPrimaryKeySwapMutation(&descpb.PrimaryKeySwap{ OldPrimaryIndexId: 1, NewPrimaryIndexId: 2, @@ -480,7 +498,7 @@ INSERT INTO foo VALUES (1), (10), (100); if err != nil { return err } - test.setupDesc(t, mut) + test.setupDesc(t, ctx, mut, settings) span := mut.PrimaryIndexSpan(execCfg.Codec) resumeSpanList := make([]jobspb.ResumeSpanList, len(mut.Mutations)) for i := range mut.Mutations { diff --git a/pkg/sql/logictest/testdata/logic_test/alter_primary_key b/pkg/sql/logictest/testdata/logic_test/alter_primary_key index 3b2ee26d07f7..f3edd92b1c0a 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_primary_key +++ b/pkg/sql/logictest/testdata/logic_test/alter_primary_key @@ -229,13 +229,13 @@ SELECT index_id, index_name FROM crdb_internal.table_indexes WHERE descriptor_na ---- 4 i3 9 t_pkey -10 t_x_key -11 i1 -12 i2 -13 i4 -14 i5 -15 i6 -16 i7 +11 t_x_key +13 i1 +15 i2 +17 i4 +19 i5 +21 i6 +23 i7 # Make sure that each index can index join against the new primary key; @@ -384,8 +384,8 @@ query IT SELECT index_id, index_name FROM crdb_internal.table_indexes WHERE descriptor_name = 't' ORDER BY index_id ---- 3 t_pkey -4 t_x_key -5 i1 +5 t_x_key +7 i1 query III SELECT * FROM t@t_pkey @@ -839,7 +839,7 @@ query IT SELECT index_id, index_name FROM crdb_internal.table_indexes WHERE descriptor_name = 't' ORDER BY index_id ---- 3 t_pkey -4 t_y_idx +5 t_y_idx # Repeat the above test using ALTER PRIMARY KEY. @@ -876,7 +876,7 @@ query IT SELECT index_id, index_name FROM crdb_internal.table_indexes WHERE descriptor_name = 't' ORDER BY index_id ---- 3 t_pkey -4 t_y_idx +5 t_y_idx # Test when multiple indexes get created and destroyed. statement ok @@ -920,9 +920,9 @@ query IT SELECT index_id, index_name FROM crdb_internal.table_indexes WHERE descriptor_name = 't' ORDER BY index_id ---- 5 t_pkey -6 i1 -7 i2 -8 i3 +7 i1 +9 i2 +11 i3 # Regression for #45889. # Primary key changes on a hash sharded index that just change the bucket diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 3c74ce98db30..b5b2d1f6e95c 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -79,15 +79,20 @@ statement error pgcode 42703 column "dne" does not exist ALTER TABLE t ADD CONSTRAINT dne_unique UNIQUE (dne) # Test that rollback was successful -query TTTTTR -SELECT job_type, regexp_replace(description, 'JOB \d+', 'JOB ...'), user_name, status, running_status, fraction_completed::decimal(10,2) +# +# We ignore the job status because GC for temporary indexes used +# during backfills may already running rather than waiting for the GC +# TTL depending on the timing. +query TTTR +SELECT job_type, regexp_replace(description, 'JOB \d+', 'JOB ...'), user_name, fraction_completed::decimal(10,2) FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 2 +LIMIT 3 ---- -SCHEMA CHANGE GC GC for ROLLBACK of ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root running waiting for GC TTL 0.00 -SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root failed NULL 0.00 +SCHEMA CHANGE GC GC for temporary index used during index backfill root 0.00 +SCHEMA CHANGE GC GC for ROLLBACK of ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root 0.00 +SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root 0.00 query IIII colnames,rowsort SELECT * FROM t diff --git a/pkg/sql/logictest/testdata/logic_test/dependencies b/pkg/sql/logictest/testdata/logic_test/dependencies index 0d18cbab6770..902d855c3fe7 100644 --- a/pkg/sql/logictest/testdata/logic_test/dependencies +++ b/pkg/sql/logictest/testdata/logic_test/dependencies @@ -50,8 +50,8 @@ ORDER BY descriptor_id, index_id descriptor_id descriptor_name index_id index_name index_type is_unique is_inverted is_sharded 106 test_kv 1 test_kv_pkey primary true false false 106 test_kv 2 test_v_idx secondary true false false -106 test_kv 3 test_v_idx2 secondary false false false -106 test_kv 4 test_v_idx3 secondary false false false +106 test_kv 4 test_v_idx2 secondary false false false +106 test_kv 6 test_v_idx3 secondary false false false 107 test_kvr1 1 test_kvr1_pkey primary true false false 108 test_kvr2 1 test_kvr2_pkey primary true false false 108 test_kvr2 2 test_kvr2_v_key secondary true false false @@ -72,13 +72,13 @@ descriptor_id descriptor_name index_id index_name column_type col 106 test_kv 1 test_kv_pkey key 1 k ASC false 106 test_kv 2 test_v_idx extra 1 NULL NULL false 106 test_kv 2 test_v_idx key 2 v ASC false -106 test_kv 3 test_v_idx2 extra 1 NULL NULL false -106 test_kv 3 test_v_idx2 key 2 v DESC false -106 test_kv 3 test_v_idx2 storing 3 NULL NULL false -106 test_kv 4 test_v_idx3 composite 3 NULL NULL false -106 test_kv 4 test_v_idx3 extra 1 NULL NULL false -106 test_kv 4 test_v_idx3 key 3 w ASC false -106 test_kv 4 test_v_idx3 storing 2 NULL NULL false +106 test_kv 4 test_v_idx2 extra 1 NULL NULL false +106 test_kv 4 test_v_idx2 key 2 v DESC false +106 test_kv 4 test_v_idx2 storing 3 NULL NULL false +106 test_kv 6 test_v_idx3 composite 3 NULL NULL false +106 test_kv 6 test_v_idx3 extra 1 NULL NULL false +106 test_kv 6 test_v_idx3 key 3 w ASC false +106 test_kv 6 test_v_idx3 storing 2 NULL NULL false 107 test_kvr1 1 test_kvr1_pkey key 1 k ASC false 108 test_kvr2 1 test_kvr2_pkey key 3 rowid ASC false 108 test_kvr2 2 test_kvr2_v_key extra 3 NULL NULL false diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index 2e6187aa27e4..2477fd88a3cf 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -17,19 +17,21 @@ CREATE TABLE t(x INT); INSERT INTO t(x) VALUES (1); CREATE INDEX ON t(x) query TTT SELECT job_type, description, user_name FROM [SHOW JOBS] WHERE user_name = 'root' ---- -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 104 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 104 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE GC GC for temporary index used during index backfill root query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs WHERE user_name = 'root' ---- -AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 104 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 104 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE GC GC for temporary index used during index backfill root user testuser @@ -52,12 +54,14 @@ CREATE TABLE u(x INT); INSERT INTO u(x) VALUES (1); CREATE INDEX ON u(x); query TTT SELECT job_type, description, user_name FROM [SHOW JOBS] ---- -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs ---- -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser # And root can see both. @@ -66,21 +70,25 @@ user root query TTT SELECT job_type, description, user_name FROM [SHOW JOBS] WHERE user_name IN ('root', 'testuser') ---- -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 104 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 104 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE GC GC for temporary index used during index backfill root +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs WHERE user_name IN ('root', 'testuser') ---- -AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root -SCHEMA CHANGE updating version for users table root -SCHEMA CHANGE updating version for role options table root -SCHEMA CHANGE updating privileges for database 104 root -SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +AUTO SPAN CONFIG RECONCILIATION reconciling span configurations root +SCHEMA CHANGE updating version for users table root +SCHEMA CHANGE updating version for role options table root +SCHEMA CHANGE updating privileges for database 104 root +SCHEMA CHANGE CREATE INDEX ON test.public.t (x) root +SCHEMA CHANGE GC GC for temporary index used during index backfill root +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser statement ok CREATE USER testuser2 @@ -105,13 +113,15 @@ user testuser query TTT SELECT job_type, description, user_name FROM crdb_internal.jobs ---- -SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser -SCHEMA CHANGE CREATE INDEX ON test.public.t1 (x) testuser2 -SCHEMA CHANGE DROP TABLE test.public.t1 testuser2 -SCHEMA CHANGE GC GC for DROP TABLE test.public.t1 testuser2 +SCHEMA CHANGE CREATE INDEX ON test.public.u (x) testuser +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser +SCHEMA CHANGE CREATE INDEX ON test.public.t1 (x) testuser2 +SCHEMA CHANGE GC GC for temporary index used during index backfill testuser2 +SCHEMA CHANGE DROP TABLE test.public.t1 testuser2 +SCHEMA CHANGE GC GC for DROP TABLE test.public.t1 testuser2 statement ok -PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC') +PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%') user root @@ -120,7 +130,7 @@ CREATE TABLE t2(x INT); DROP TABLE t2 let $job_id -SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'root' AND job_type = 'SCHEMA CHANGE GC' +SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'root' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%' user testuser @@ -134,13 +144,13 @@ statement ok ALTER ROLE testuser NOCONTROLJOB let $job_id -SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' +SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%' user testuser # testuser should no longer have the ability to control jobs. statement error pq: user testuser does not have CONTROLJOB privilege -PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC') +PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%') user root diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 4df73c3a94e3..ae1aeef6bedd 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -559,8 +559,8 @@ oid relname relnamespace reltype reloftype relowner rel 2129466852 t6_pkey 3082627813 0 0 1546506610 2631952481 0 0 2129466855 t6_expr_idx 3082627813 0 0 1546506610 2631952481 0 0 2129466854 t6_expr_expr1_idx 3082627813 0 0 1546506610 2631952481 0 0 -2129466849 t6_expr_key 3082627813 0 0 1546506610 2631952481 0 0 -2129466848 t6_expr_idx1 3082627813 0 0 1546506610 2631952481 0 0 +2129466848 t6_expr_key 3082627813 0 0 1546506610 2631952481 0 0 +2129466850 t6_expr_idx1 3082627813 0 0 1546506610 2631952481 0 0 121 mv1 3082627813 100121 0 1546506610 0 0 0 784389845 mv1_pkey 3082627813 0 0 1546506610 2631952481 0 0 @@ -728,8 +728,8 @@ attrelid relname attname atttypid attstattarget 2129466855 t6_expr_idx crdb_internal_idx_expr 20 0 8 5 0 -1 2129466854 t6_expr_expr1_idx crdb_internal_idx_expr_1 25 0 -1 7 0 -1 2129466854 t6_expr_expr1_idx crdb_internal_idx_expr_2 20 0 8 8 0 -1 -2129466849 t6_expr_key crdb_internal_idx_expr_3 25 0 -1 9 0 -1 -2129466848 t6_expr_idx1 crdb_internal_idx_expr_4 16 0 1 10 0 -1 +2129466848 t6_expr_key crdb_internal_idx_expr_3 25 0 -1 9 0 -1 +2129466850 t6_expr_idx1 crdb_internal_idx_expr_4 16 0 1 10 0 -1 121 mv1 ?column? 20 0 8 1 0 -1 121 mv1 rowid 20 0 8 2 0 -1 784389845 mv1_pkey rowid 20 0 8 2 0 -1 @@ -992,8 +992,8 @@ crdb_oid schemaname tablename indexname tablespace 2129466852 public t6 t6_pkey NULL 2129466855 public t6 t6_expr_idx NULL 2129466854 public t6 t6_expr_expr1_idx NULL -2129466849 public t6 t6_expr_key NULL -2129466848 public t6 t6_expr_idx1 NULL +2129466848 public t6 t6_expr_key NULL +2129466850 public t6 t6_expr_idx1 NULL 784389845 public mv1 mv1_pkey NULL query OTTT colnames @@ -1016,8 +1016,8 @@ crdb_oid tablename indexname indexdef 2129466852 t6 t6_pkey CREATE UNIQUE INDEX t6_pkey ON constraint_db.public.t6 USING btree (rowid ASC) 2129466855 t6 t6_expr_idx CREATE INDEX t6_expr_idx ON constraint_db.public.t6 USING btree ((a + b) ASC) 2129466854 t6 t6_expr_expr1_idx CREATE INDEX t6_expr_expr1_idx ON constraint_db.public.t6 USING btree (lower(c) ASC, (a + b) ASC) -2129466849 t6 t6_expr_key CREATE UNIQUE INDEX t6_expr_key ON constraint_db.public.t6 USING btree (lower(c) ASC) -2129466848 t6 t6_expr_idx1 CREATE INDEX t6_expr_idx1 ON constraint_db.public.t6 USING btree ((m = 'foo'::public.mytype) ASC) WHERE (m = 'foo'::public.mytype) +2129466848 t6 t6_expr_key CREATE UNIQUE INDEX t6_expr_key ON constraint_db.public.t6 USING btree (lower(c) ASC) +2129466850 t6 t6_expr_idx1 CREATE INDEX t6_expr_idx1 ON constraint_db.public.t6 USING btree ((m = 'foo'::public.mytype) ASC) WHERE (m = 'foo'::public.mytype) 784389845 mv1 mv1_pkey CREATE UNIQUE INDEX mv1_pkey ON constraint_db.public.mv1 USING btree (rowid ASC) ## pg_catalog.pg_index @@ -1271,7 +1271,7 @@ oid conname connamespace contype condef 108480825 uwi_b_c 3082627813 u UNIQUE WITHOUT INDEX (b, c) 192087236 fk_b_c 3082627813 f FOREIGN KEY (b, c) REFERENCES t4(b, c) MATCH FULL ON UPDATE RESTRICT 296187876 check_c 3082627813 c CHECK ((c != ''::STRING)) -1002858067 t6_expr_key 3082627813 u UNIQUE (lower(c) ASC) +1002858066 t6_expr_key 3082627813 u UNIQUE (lower(c) ASC) 1034567609 uwi_b_partial 3082627813 u UNIQUE WITHOUT INDEX (b) WHERE (c = 'foo'::STRING) 1568726274 index_key 3082627813 u UNIQUE (b ASC, c ASC) 1568726275 t1_a_key 3082627813 u UNIQUE (a ASC) @@ -1295,7 +1295,7 @@ check_b c false false true 114 0 uwi_b_c u false false true 116 0 0 fk_b_c f false false true 117 0 0 check_c c false false true 114 0 0 -t6_expr_key u false false true 120 0 2129466849 +t6_expr_key u false false true 120 0 2129466848 uwi_b_partial u false false true 116 0 0 index_key u false false true 110 0 3687884464 t1_a_key u false false true 110 0 3687884465 diff --git a/pkg/sql/logictest/testdata/logic_test/ranges b/pkg/sql/logictest/testdata/logic_test/ranges index 093e850986062c289b98c506c7a90fcf010e353e..f35b36022fc866c53a1ec585216d8bce369fe329 100644 GIT binary patch delta 570 zcmY*VO>5gg5GBV7dh4aP2Pv!tirl1$FC~G}7;Gp68bgj^r5#sWt#_r}4Yp4H5&0XX z6!H^#@6YL>r;cp*Bf~Dt*L(A3zW+S>@$2Z@*KT`^MXduOg$PA0Sry{U_L_Cs!iM(vV&&@j|d(A3mXFwo@E1p+R8LnBjtlg;ua`kVl4tqlYK diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn index 13942ad6c93d..abd838312984 100644 --- a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn @@ -734,15 +734,15 @@ SELECT * FROM customers@j_idx query TT SELECT status, - regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description + regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as desc FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' ORDER BY job_id DESC LIMIT 1 ---- failed ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5; ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4; ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3; ALTER TABLE test.public.customers ADD COLUMN m CHAR; ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a'; CREATE INDEX j_idx ON test.public.customers (j); CREATE INDEX l_idx ON test.public.customers (l); CREATE INDEX m_idx ON test.public.customers (m); CREATE UNIQUE INDEX i_idx ON test.public.customers (i); CREATE UNIQUE INDEX n_idx ON test.public.customers (n) query TT SELECT status, - regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description - FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' ORDER BY job_id DESC LIMIT 1 + regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as descr + FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for ROLL%' ORDER BY job_id DESC LIMIT 1 ---- running GC for ROLLBACK of ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5; ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4; ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3; ALTER TABLE test.public.customers ADD COLUMN m CHAR; ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a'; CREATE INDEX j_idx ON test.public.customers (j); CREATE INDEX l_idx ON test.public.customers (l); CREATE INDEX m_idx ON test.public.customers (m); CREATE UNIQUE INDEX i_idx ON test.public.customers (i); CREATE UNIQUE INDEX n_idx ON test.public.customers (n) diff --git a/pkg/sql/logictest/testdata/logic_test/zigzag_join b/pkg/sql/logictest/testdata/logic_test/zigzag_join index e7d4e75825b4..e70e8f794596 100644 --- a/pkg/sql/logictest/testdata/logic_test/zigzag_join +++ b/pkg/sql/logictest/testdata/logic_test/zigzag_join @@ -2,6 +2,10 @@ # Zigzag join tests on non-inverted indexes. # ------------------------------------------------------------------------------ +# TODO(ssd): index id test dependency cleanup +statement ok +SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false + statement ok CREATE TABLE a (n INT PRIMARY KEY, a INT, b INT, c STRING, INDEX a_idx(a), INDEX b_idx(b), INDEX bc_idx(b,c)); INSERT INTO a SELECT a,a,a%3,'foo' FROM generate_series(1,10) AS g(a) ; diff --git a/pkg/sql/opt/exec/execbuilder/testdata/geospatial b/pkg/sql/opt/exec/execbuilder/testdata/geospatial index 9eeeb62384db..46df95ba8c3f 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/geospatial +++ b/pkg/sql/opt/exec/execbuilder/testdata/geospatial @@ -3,6 +3,10 @@ # https://github.com/cockroachdb/cockroach/issues/49582 # LogicTest: !3node-tenant +# TODO(ssd): index id test dependency cleanup +statement ok +SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false + statement ok CREATE TABLE b( a int primary key, diff --git a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families_nonmetamorphic index 83350327cf59..68d48dea0c4d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families_nonmetamorphic @@ -1,5 +1,9 @@ # LogicTest: local !metamorphic +# TODO(ssd): index id test dependency cleanup +statement ok +SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false + statement ok CREATE TABLE t1 ( x INT PRIMARY KEY, y INT, z INT, a INT, b INT, diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index c2f73191d4b3..f00341080305 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -75,7 +75,7 @@ SET tracing = on,kv,results; CREATE UNIQUE INDEX woo ON t.kv(v); SET tracing = o query TT $trace_query ---- -batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:3 > +batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:4 privileges: users: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 > state:BACKFILLING direction:ADD mutation_id:1 rollback:false > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:true created_at_nanos:... constraint_id:3 > state:DELETE_ONLY direction:ADD mutation_id:1 rollback:false > next_mutation_id:2 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 > sql query rows affected: 0 statement ok @@ -173,7 +173,7 @@ SET tracing = on,kv,results; DROP INDEX t.kv@woo CASCADE; SET tracing = off query TT $trace_query ---- -batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:3 > +batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:4 privileges: users: users: owner_proto:"root" version:2 > mutations: interleave:<> partitioning: type:FORWARD created_explicitly:true encoding_type:0 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:2 > state:DELETE_AND_WRITE_ONLY direction:DROP mutation_id:2 rollback:false > next_mutation_id:3 format_version:3 state:PUBLIC offline_reason:"" view_query:"" is_materialized_view:false mutationJobs:<...> drop_time:0 replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 > sql query rows affected: 0 statement ok @@ -183,7 +183,7 @@ query TT $trace_query ---- batch flow coordinator Del /NamespaceTable/30/1/106/107/"kv"/4/1 -batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:3 > +batch flow coordinator Put /Table/3/1/108/2/1 -> table: parent_id:106 unexposed_parent_schema_id:107 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false created_at_nanos:... constraint_id:1 > next_index_id:4 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false exclude_data_from_backup:false next_constraint_id:4 > sql query rows affected: 0 # Check that session tracing does not inhibit the fast path for inserts & diff --git a/pkg/sql/region_util.go b/pkg/sql/region_util.go index ba9bb1022937..a1285368a6cd 100644 --- a/pkg/sql/region_util.go +++ b/pkg/sql/region_util.go @@ -1712,8 +1712,12 @@ func (p *planner) validateZoneConfigForMultiRegionTable( if pkSwap.HasLocalityConfig() { _ = pkSwap.ForEachNewIndexIDs(func(id descpb.IndexID) error { regionalByRowNewIndexes[uint32(id)] = struct{}{} + if idx := catalog.FindCorrespondingTemporaryIndexByID(desc, id); idx != nil { + regionalByRowNewIndexes[uint32(idx.GetID())] = struct{}{} + } return nil }) + } // There can only be one pkSwap at a time, so break now. break diff --git a/pkg/sql/rowexec/indexbackfiller.go b/pkg/sql/rowexec/indexbackfiller.go index 04c00e19be69..c474b9176e3e 100644 --- a/pkg/sql/rowexec/indexbackfiller.go +++ b/pkg/sql/rowexec/indexbackfiller.go @@ -200,6 +200,7 @@ func (ib *indexBackfiller) ingestIndexEntries( SkipDuplicates: ib.ContainsInvertedIndex(), BatchTimestamp: ib.spec.ReadAsOf, InitialSplitsIfUnordered: int(ib.spec.InitialSplits), + WriteAtRequestTime: ib.spec.WriteAtRequestTimestamp, } adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB, ib.spec.WriteAsOf, opts) if err != nil { diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 58dc31dc2dba..2183b39069f3 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -74,6 +74,10 @@ const ( // the cluster to converge to seeing the schema element in the // DELETE_AND_WRITE_ONLY state. RunningStatusDeleteAndWriteOnly jobs.RunningStatus = "waiting in DELETE-AND-WRITE_ONLY" + // RunningStatusMerging is for jobs that are currently waiting on + // the cluster to converge to seeing the schema element in the + // MERGING state. + RunningStatusMerging jobs.RunningStatus = "waiting in MERGING" // RunningStatusBackfill is for jobs that are currently running a backfill // for a schema element. RunningStatusBackfill jobs.RunningStatus = "populating schema" @@ -106,6 +110,11 @@ type SchemaChanger struct { settings *cluster.Settings execCfg *ExecutorConfig ieFactory sqlutil.SessionBoundInternalExecutorFactory + + // mvccCompliantAddIndex is set to true early in exec if we + // find that the schema change was created under the + // mvcc-compliant regime. + mvccCompliantAddIndex bool } // NewSchemaChangerForTesting only for tests. @@ -633,6 +642,52 @@ func (sc *SchemaChanger) getTargetDescriptor(ctx context.Context) (catalog.Descr return desc, nil } +func (sc *SchemaChanger) checkForMVCCCompliantAddIndexMutations( + ctx context.Context, desc catalog.Descriptor, +) error { + tableDesc, ok := desc.(catalog.TableDescriptor) + if !ok { + return nil + } + + nonTempAddingIndexes := 0 + tempIndexes := 0 + + for _, m := range tableDesc.AllMutations() { + if m.MutationID() != sc.mutationID { + break + } + + idx := m.AsIndex() + if idx == nil { + continue + } + + if idx.IsTemporaryIndexForBackfill() { + tempIndexes++ + } else if m.Adding() { + nonTempAddingIndexes++ + } + } + + if tempIndexes > 0 { + sc.mvccCompliantAddIndex = true + + if tempIndexes != nonTempAddingIndexes { + return errors.Newf("expected %d temporary indexes, but found %d; schema change may have been constructed during cluster version upgrade", + tempIndexes, + nonTempAddingIndexes) + } + + settings := sc.execCfg.Settings + mvccCompliantBackfillSupported := settings.Version.IsActive(ctx, clusterversion.MVCCIndexBackfiller) && tabledesc.UseMVCCCompliantIndexCreation.Get(&settings.SV) + if !mvccCompliantBackfillSupported { + return errors.Newf("schema change requires MVCC-compliant backfiller, but MVCC-compliant backfiller is not supported") + } + } + return nil +} + // Execute the entire schema change in steps. // inSession is set to false when this is called from the asynchronous // schema change execution path. @@ -656,6 +711,10 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { return err } + if err := sc.checkForMVCCCompliantAddIndexMutations(ctx, desc); err != nil { + return err + } + log.Infof(ctx, "schema change on %q (v%d) starting execution...", desc.GetName(), desc.GetVersion(), @@ -775,6 +834,10 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { } // Go through the recording motions. See comment above. sqltelemetry.RecordError(ctx, err, &sc.settings.SV) + if jobs.IsPauseSelfError(err) { + // For testing only + return err + } } // Run through mutation state machine and backfill. @@ -1021,7 +1084,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro } // else if DELETE_AND_WRITE_ONLY, then the state change has already moved forward. } else if m.Dropped() { - if m.WriteAndDeleteOnly() { + if m.WriteAndDeleteOnly() || m.Merging() { tbl.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_DELETE_ONLY runStatus = RunningStatusDeleteOnly } @@ -1070,10 +1133,102 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro return nil } +// RunStateMachineAfterIndexBackfill moves the state machine forward and +// wait to ensure that all nodes are seeing the latest version of the +// table. +// +// Adding Mutations in BACKFILLING state move through DELETE -> +// MERGING. +func (sc *SchemaChanger) RunStateMachineAfterIndexBackfill(ctx context.Context) error { + // Step through the state machine twice: + // - BACKFILLING -> DELETE + // - DELETE -> MERGING + log.Info(ctx, "stepping through state machine after index backfill") + if err := sc.stepStateMachineAfterIndexBackfill(ctx); err != nil { + return err + } + if err := sc.stepStateMachineAfterIndexBackfill(ctx); err != nil { + return err + } + log.Info(ctx, "finished stepping through state machine") + return nil +} + +func (sc *SchemaChanger) stepStateMachineAfterIndexBackfill(ctx context.Context) error { + log.Info(ctx, "stepping through state machine") + + var runStatus jobs.RunningStatus + if err := sc.txn(ctx, func( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, + ) error { + tbl, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) + if err != nil { + return err + } + runStatus = "" + for _, m := range tbl.AllMutations() { + if m.MutationID() != sc.mutationID { + // Mutations are applied in a FIFO order. Only apply the first set of + // mutations if they have the mutation ID we're looking for. + break + } + idx := m.AsIndex() + if idx == nil { + // Don't touch anything but indexes + continue + } + + if m.Adding() { + if m.Backfilling() { + tbl.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_DELETE_ONLY + runStatus = RunningStatusDeleteOnly + } else if m.DeleteOnly() { + tbl.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_MERGING + runStatus = RunningStatusMerging + } + } + } + if runStatus == "" || tbl.Dropped() { + return nil + } + if err := descsCol.WriteDesc( + ctx, true /* kvTrace */, tbl, txn, + ); err != nil { + return err + } + if sc.job != nil { + if err := sc.job.RunningStatus(ctx, txn, func( + ctx context.Context, details jobspb.Details, + ) (jobs.RunningStatus, error) { + return runStatus, nil + }); err != nil { + return errors.Wrap(err, "failed to update job status") + } + } + return nil + }); err != nil { + return err + } + return nil +} + +func (sc *SchemaChanger) createTemporaryIndexGCJob( + ctx context.Context, indexID descpb.IndexID, txn *kv.Txn, jobDesc string, +) error { + minimumDropTime := int64(1) + return sc.createIndexGCJobWithDropTime(ctx, indexID, txn, jobDesc, minimumDropTime) +} + func (sc *SchemaChanger) createIndexGCJob( ctx context.Context, indexID descpb.IndexID, txn *kv.Txn, jobDesc string, ) error { dropTime := timeutil.Now().UnixNano() + return sc.createIndexGCJobWithDropTime(ctx, indexID, txn, jobDesc, dropTime) +} + +func (sc *SchemaChanger) createIndexGCJobWithDropTime( + ctx context.Context, indexID descpb.IndexID, txn *kv.Txn, jobDesc string, dropTime int64, +) error { indexGCDetails := jobspb.SchemaChangeGCDetails{ Indexes: []jobspb.SchemaChangeGCDetails_DroppedIndex{ { @@ -1209,9 +1364,14 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if isRollback { description = "ROLLBACK of " + description } - - if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil { - return err + if idx.IsTemporaryIndexForBackfill() { + if err := sc.createTemporaryIndexGCJob(ctx, idx.GetID(), txn, "temporary index used during index backfill"); err != nil { + return err + } + } else { + if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil { + return err + } } } if constraint := m.AsConstraint(); constraint != nil && constraint.Adding() { @@ -1774,8 +1934,14 @@ func (sc *SchemaChanger) maybeReverseMutations(ctx context.Context, causingError continue } - log.Warningf(ctx, "reverse schema change mutation: %+v", scTable.Mutations[m.MutationOrdinal()]) - scTable.Mutations[m.MutationOrdinal()], columns = sc.reverseMutation(scTable.Mutations[m.MutationOrdinal()], false /*notStarted*/, columns) + // Always move temporary indexes to dropping + if idx := m.AsIndex(); idx != nil && idx.IsTemporaryIndexForBackfill() { + scTable.Mutations[m.MutationOrdinal()].State = descpb.DescriptorMutation_DELETE_ONLY + scTable.Mutations[m.MutationOrdinal()].Direction = descpb.DescriptorMutation_DROP + } else { + log.Warningf(ctx, "reverse schema change mutation: %+v", scTable.Mutations[m.MutationOrdinal()]) + scTable.Mutations[m.MutationOrdinal()], columns = sc.reverseMutation(scTable.Mutations[m.MutationOrdinal()], false /*notStarted*/, columns) + } // If the mutation is for validating a constraint that is being added, // drop the constraint because validation has failed. @@ -1949,6 +2115,16 @@ func (sc *SchemaChanger) maybeDropValidatingConstraint( return nil } +// validStateForStartingIndex returns the correct starting state for +// add index mutations based on the whether this schema change is +// using temporary indexes or not. +func (sc *SchemaChanger) startingStateForAddIndexMutations() descpb.DescriptorMutation_State { + if sc.mvccCompliantAddIndex { + return descpb.DescriptorMutation_BACKFILLING + } + return descpb.DescriptorMutation_DELETE_ONLY +} + // deleteIndexMutationsWithReversedColumns deletes mutations with a // different mutationID than the schema changer and with an index that // references one of the reversed columns. Execute this as a breadth @@ -1971,7 +2147,7 @@ func (sc *SchemaChanger) deleteIndexMutationsWithReversedColumns( // DROP. All mutations with the ADD direction start off in // the DELETE_ONLY state. if mutation.Direction != descpb.DescriptorMutation_ADD || - mutation.State != descpb.DescriptorMutation_DELETE_ONLY { + mutation.State != sc.startingStateForAddIndexMutations() { panic(errors.AssertionFailedf("mutation in bad state: %+v", mutation)) } log.Warningf(ctx, "drop schema change mutation: %+v", mutation) @@ -2032,8 +2208,14 @@ func (sc *SchemaChanger) reverseMutation( return mutation, columns } - if notStarted && mutation.State != descpb.DescriptorMutation_DELETE_ONLY { - panic(errors.AssertionFailedf("mutation in bad state: %+v", mutation)) + if notStarted { + startingState := descpb.DescriptorMutation_DELETE_ONLY + if idx := mutation.GetIndex(); idx != nil { + startingState = sc.startingStateForAddIndexMutations() + } + if mutation.State != startingState { + panic(errors.AssertionFailedf("mutation in bad state: %+v", mutation)) + } } case descpb.DescriptorMutation_DROP: @@ -2110,6 +2292,20 @@ type SchemaChangerTestingKnobs struct { // fixing the index backfill scan timestamp. RunBeforeIndexBackfill func() + // RunBeforeIndexBackfill is called after the index backfill + // process is complete (including the temporary index merge) + // but before the final validation of the indexes. + RunAfterIndexBackfill func() + + // RunBeforeTempIndexMerge is called just before starting the + // the merge from the temporary index into the new index, + // after the backfill scan timestamp has been fixed. + RunBeforeTempIndexMerge func() + + // RunAfterTempIndexMerge is called, before validating and + // making the next index public. + RunAfterTempIndexMerge func() + // RunBeforeMaterializedViewRefreshCommit is called before committing a // materialized view refresh. RunBeforeMaterializedViewRefreshCommit func() error @@ -2144,6 +2340,10 @@ type SchemaChangerTestingKnobs struct { // RunBeforeResume runs at the start of the Resume hook. RunBeforeResume func(jobID jobspb.JobID) error + // RunBeforeDescTxn runs at the start of every call to + // (*schemaChanger).txn. + RunBeforeDescTxn func() error + // OldNamesDrainedNotification is called during a schema change, // after all leases on the version of the descriptor with the old // names are gone, and just before the mapping of the old names to the @@ -2190,6 +2390,12 @@ func (*SchemaChangerTestingKnobs) ModuleTestingKnobs() {} func (sc *SchemaChanger) txn( ctx context.Context, f func(context.Context, *kv.Txn, *descs.Collection) error, ) error { + if fn := sc.testingKnobs.RunBeforeDescTxn; fn != nil { + if err := fn(); err != nil { + return err + } + } + return sc.execCfg.CollectionFactory.Txn(ctx, sc.execCfg.InternalExecutor, sc.db, f) } @@ -2768,6 +2974,23 @@ func (sc *SchemaChanger) getDependentMutationsJobs( return dependentJobs, nil } +func (sc *SchemaChanger) shouldSplitAndScatter( + tableDesc *tabledesc.Mutable, m catalog.Mutation, idx catalog.Index, +) bool { + if idx == nil { + return false + } + + if m.Adding() && idx.IsSharded() && !idx.IsTemporaryIndexForBackfill() { + if sc.mvccCompliantAddIndex { + return m.Backfilling() + } + return m.DeleteOnly() + } + return false + +} + func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) error { if err := sc.txn(ctx, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, @@ -2799,7 +3022,7 @@ func (sc *SchemaChanger) preSplitHashShardedIndexRanges(ctx context.Context) err break } - if idx := m.AsIndex(); m.Adding() && m.DeleteOnly() && idx != nil { + if idx := m.AsIndex(); sc.shouldSplitAndScatter(tableDesc, m, idx) { if idx.IsSharded() { splitAtShards := calculateSplitAtShards(maxHashShardedIndexRangePreSplit.Get(&sc.settings.SV), idx.GetSharded().ShardBuckets) for _, shard := range splitAtShards { diff --git a/pkg/sql/schema_changer_helpers_test.go b/pkg/sql/schema_changer_helpers_test.go index b3ce0f0fc79f..ed04edc31d64 100644 --- a/pkg/sql/schema_changer_helpers_test.go +++ b/pkg/sql/schema_changer_helpers_test.go @@ -31,7 +31,8 @@ func (sc *SchemaChanger) TestingDistIndexBackfill( addedIndexes []descpb.IndexID, filter backfill.MutationFilter, ) error { - return sc.distIndexBackfill(ctx, version, targetSpans, addedIndexes, filter) + err := sc.distIndexBackfill(ctx, version, targetSpans, addedIndexes, true, filter) + return err } // SetJob sets the job. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 0b24a74cb9a1..d2287a240a10 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -61,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -229,8 +232,8 @@ INSERT INTO t.test VALUES ('a', 'b'), ('c', 'd'); kvDB, keys.SystemSQLCodec, "t", "test") // A long running schema change operation runs through - // a state machine that increments the version by 3. - expectedVersion := tableDesc.Version + 3 + // a state machine that increments the version by 6. + expectedVersion := tableDesc.Version + 6 // Run some schema change if _, err := sqlDB.Exec(` @@ -396,10 +399,13 @@ func runSchemaChangeWithOperations( wg.Wait() // for schema change to complete. - // Verify the number of keys left behind in the table to validate schema - // change operations. This is wrapped in SucceedsSoon to handle cases where - // dropped indexes are expected to be GC'ed immediately after the schema - // change completes. + // Verify the number of keys left behind in the table to + // validate schema change operations. We wait for any SCHEMA + // CHANGE GC jobs to complete to ensure our key count doesn't + // include keys from a temporary index. + if _, err := sqlDB.Exec(`SHOW JOBS WHEN COMPLETE (SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC')`); err != nil { + t.Fatal(err) + } testutils.SucceedsSoon(t, func() error { return sqltestutils.CheckTableKeyCount(ctx, kvDB, keyMultiple, maxValue+numInserts) }) @@ -505,12 +511,13 @@ func TestRaceWithBackfill(t *testing.T) { backfillNotification = nil } } + params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ BackfillChunkSize: chunkSize, }, // Disable GC job. - GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { select {} }}, + // GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { notifyBackfill() @@ -1041,12 +1048,11 @@ COMMIT; ctx := context.Background() // Verify the number of keys left behind in the table to validate - // schema change operations. - if err := sqltestutils.CheckTableKeyCount( - ctx, kvDB, testCase.expectedNumKeysPerRow, maxValue, - ); err != nil { - t.Fatal(err) - } + // schema change operations. We expect this to fail until garbage + // collection on the temporary index completes. + testutils.SucceedsSoon(t, func() error { + return sqltestutils.CheckTableKeyCount(ctx, kvDB, testCase.expectedNumKeysPerRow, maxValue) + }) if err := sqlutils.RunScrub(sqlDB, "t", "test"); err != nil { t.Fatal(err) @@ -1324,6 +1330,7 @@ func TestSchemaChangeRetryOnVersionChange(t *testing.T) { currChunk := 0 var numBackfills uint32 seenSpan := roachpb.Span{} + unblockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ RunBeforeBackfill: func() error { @@ -1334,6 +1341,16 @@ func TestSchemaChangeRetryOnVersionChange(t *testing.T) { BackfillChunkSize: maxValue / 10, AlwaysUpdateIndexBackfillDetails: true, }, + // Block GC Job during the test. The index we add + // creates a GC job to clean up the temporary index + // used during backfill. If that GC job runs, it will + // bump the table version causing an extra backfill + // that our assertions don't account for. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func(_ jobspb.JobID) error { + <-unblockGC + t.Log("gc unblocked") + return nil + }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { currChunk++ @@ -1372,6 +1389,10 @@ func TestSchemaChangeRetryOnVersionChange(t *testing.T) { s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.Background()) + defer func() { + t.Log("unblocking GC") + close(unblockGC) + }() if _, err := sqlDB.Exec(` CREATE DATABASE t; @@ -1437,21 +1458,21 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); addIndexSchemaChange(t, sqlDB, kvDB, maxValue, 2) if num := atomic.SwapUint32(&numBackfills, 0); num != 2 { - t.Fatalf("expected %d backfills, but seen %d", 2, num) + t.Fatalf("expected %d backfills, but saw %d", 2, num) } currChunk = 0 seenSpan = roachpb.Span{} addColumnSchemaChange(t, sqlDB, kvDB, maxValue, 2) if num := atomic.SwapUint32(&numBackfills, 0); num != 2 { - t.Fatalf("expected %d backfills, but seen %d", 2, num) + t.Fatalf("expected %d backfills, but saw %d", 2, num) } currChunk = 0 seenSpan = roachpb.Span{} dropColumnSchemaChange(t, sqlDB, kvDB, maxValue, 2) if num := atomic.SwapUint32(&numBackfills, 0); num != 2 { - t.Fatalf("expected %d backfills, but seen %d", 2, num) + t.Fatalf("expected %d backfills, but saw %d", 2, num) } } @@ -2951,8 +2972,9 @@ CREATE TABLE t.test ( wg.Done() }() - // Wait for the new primary index to move to the DELETE_AND_WRITE_ONLY - // state, which happens right before backfilling of the index begins. + // Wait for the temporary indexes for the new primary indexes + // to move to the DELETE_AND_WRITE_ONLY state, which happens + // right before backfilling of the index begins. <-backfillNotification scanToArray := func(rows *gosql.Rows) []string { @@ -2973,18 +2995,36 @@ CREATE TABLE t.test ( INSERT INTO t.test VALUES (1, 2, 3, NULL, NULL, 6); SET TRACING=off; SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE - message LIKE 'InitPut /Table/%d/2%%' ORDER BY message;`, tableID)) + message LIKE '%%Put /Table/%d%%' ORDER BY message;`, tableID)) if err != nil { t.Fatal(err) } expected := []string{ - fmt.Sprintf("InitPut /Table/%d/2/2/0 -> /TUPLE/1:1:Int/1", tableID), + // The first CPut's are to the primary index. + fmt.Sprintf("CPut /Table/%d/1/1/0 -> /TUPLE/", tableID), + // TODO (rohany): this k/v is spurious and should be removed + // when #45343 is fixed. + fmt.Sprintf("CPut /Table/%d/1/1/1/1 -> /INT/2", tableID), + fmt.Sprintf("CPut /Table/%d/1/1/2/1 -> /TUPLE/3:3:Int/3", tableID), + fmt.Sprintf("CPut /Table/%d/1/1/4/1 -> /INT/6", tableID), + // Temporary index that exists during the + // backfill. This should have the same number of Puts + // as there are CPuts above. + fmt.Sprintf("Put /Table/%d/3/2/0 -> /BYTES/0x0a030a1302", tableID), // TODO (rohany): this k/v is spurious and should be removed // when #45343 is fixed. - fmt.Sprintf("InitPut /Table/%d/2/2/1/1 -> /INT/2", tableID), - fmt.Sprintf("InitPut /Table/%d/2/2/2/1 -> /TUPLE/3:3:Int/3", tableID), - fmt.Sprintf("InitPut /Table/%d/2/2/4/1 -> /INT/6", tableID), + fmt.Sprintf("Put /Table/%d/3/2/1/1 -> /BYTES/0x0a020104", tableID), + fmt.Sprintf("Put /Table/%d/3/2/2/1 -> /BYTES/0x0a030a3306", tableID), + fmt.Sprintf("Put /Table/%d/3/2/4/1 -> /BYTES/0x0a02010c", tableID), + + // ALTER PRIMARY KEY makes an additional unique index + // based on the old primary key. + fmt.Sprintf("Put /Table/%d/5/1/0 -> /BYTES/0x0a02038a", tableID), + + // Indexes 2 and 4 which are currently being added + // should have no writes because they are in the + // BACKFILLING state at this point. } require.Equal(t, expected, scanToArray(rows)) @@ -2993,18 +3033,31 @@ CREATE TABLE t.test ( SET TRACING=on, kv, results; DELETE FROM t.test WHERE y = 2; SET TRACING=off; - SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE - message LIKE 'Del /Table/%d/2%%' ORDER BY message;`, tableID)) + SELECT message FROM [SHOW KV TRACE FOR SESSION] + WHERE + message LIKE 'Del /Table/%[1]d%%' OR + message LIKE 'Put (delete) /Table/%[1]d%%' + ORDER BY message;`, tableID)) if err != nil { t.Fatal(err) } expected = []string{ - fmt.Sprintf("Del /Table/%d/2/2/0", tableID), - fmt.Sprintf("Del /Table/%d/2/2/1/1", tableID), - fmt.Sprintf("Del /Table/%d/2/2/2/1", tableID), - fmt.Sprintf("Del /Table/%d/2/2/3/1", tableID), - fmt.Sprintf("Del /Table/%d/2/2/4/1", tableID), + // Primary index should see this delete. + fmt.Sprintf("Del /Table/%d/1/1/0", tableID), + fmt.Sprintf("Del /Table/%d/1/1/1/1", tableID), + fmt.Sprintf("Del /Table/%d/1/1/2/1", tableID), + fmt.Sprintf("Del /Table/%d/1/1/3/1", tableID), + fmt.Sprintf("Del /Table/%d/1/1/4/1", tableID), + + // The temporary indexes are delete-preserving -- they + // should see the delete and issue Puts. + fmt.Sprintf("Put (delete) /Table/%d/3/2/0", tableID), + fmt.Sprintf("Put (delete) /Table/%d/3/2/1/1", tableID), + fmt.Sprintf("Put (delete) /Table/%d/3/2/2/1", tableID), + fmt.Sprintf("Put (delete) /Table/%d/3/2/3/1", tableID), + fmt.Sprintf("Put (delete) /Table/%d/3/2/4/1", tableID), + fmt.Sprintf("Put (delete) /Table/%d/5/1/0", tableID), } require.Equal(t, expected, scanToArray(rows)) @@ -3015,24 +3068,25 @@ CREATE TABLE t.test ( UPDATE t.test SET y = 3 WHERE y = 2; SET TRACING=off; SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE - message LIKE 'Put /Table/%d/2%%' OR - message LIKE 'Del /Table/%d/2%%' OR - message LIKE 'CPut /Table/%d/2%%';`, tableID, tableID, tableID)) + message LIKE 'Put /Table/%[1]d/%%' OR + message LIKE 'Del /Table/%[1]d/%%' OR + message LIKE 'CPut /Table/%[1]d/%%';`, tableID)) if err != nil { t.Fatal(err) } expected = []string{ - fmt.Sprintf("Del /Table/%d/2/2/0", tableID), - fmt.Sprintf("CPut /Table/%d/2/3/0 -> /TUPLE/1:1:Int/1 (expecting does not exist)", tableID), - // TODO (rohany): this k/v is spurious and should be removed - // when #45343 is fixed. - fmt.Sprintf("Del /Table/%d/2/2/1/1", tableID), - fmt.Sprintf("CPut /Table/%d/2/3/1/1 -> /INT/3 (expecting does not exist)", tableID), - fmt.Sprintf("Del /Table/%d/2/2/2/1", tableID), - fmt.Sprintf("CPut /Table/%d/2/3/2/1 -> /TUPLE/3:3:Int/3 (expecting does not exist)", tableID), - fmt.Sprintf("Del /Table/%d/2/2/4/1", tableID), - fmt.Sprintf("CPut /Table/%d/2/3/4/1 -> /INT/6 (expecting does not exist)", tableID), + // The primary index should see the update + fmt.Sprintf("Put /Table/%d/1/1/1/1 -> /INT/3", tableID), + // The temporary index for the newly added index sees + // a Put in all families. + fmt.Sprintf("Put /Table/%d/3/3/0 -> /BYTES/0x0a030a1302", tableID), + fmt.Sprintf("Put /Table/%d/3/3/1/1 -> /BYTES/0x0a020106", tableID), + fmt.Sprintf("Put /Table/%d/3/3/2/1 -> /BYTES/0x0a030a3306", tableID), + fmt.Sprintf("Put /Table/%d/3/3/4/1 -> /BYTES/0x0a02010c", tableID), + // TODO(ssd): double-check that this trace makes + // sense. + fmt.Sprintf("Put /Table/%d/5/1/0 -> /BYTES/0x0a02038b", tableID), } require.Equal(t, expected, scanToArray(rows)) @@ -3042,17 +3096,22 @@ CREATE TABLE t.test ( UPDATE t.test SET z = NULL, b = 5, c = NULL WHERE y = 3; SET TRACING=off; SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE - message LIKE 'Put /Table/%d/2%%' OR - message LIKE 'Del /Table/%d/2%%' OR - message LIKE 'CPut /Table/%d/2%%';`, tableID, tableID, tableID)) + message LIKE 'Put /Table/%[1]d/%%' OR + message LIKE 'Del /Table/%[1]d/%%' OR + message LIKE 'CPut /Table/%[1]d/2%%';`, tableID)) if err != nil { t.Fatal(err) } expected = []string{ - fmt.Sprintf("Del /Table/%d/2/3/2/1", tableID), - fmt.Sprintf("CPut /Table/%d/2/3/3/1 -> /INT/5 (expecting does not exist)", tableID), - fmt.Sprintf("Del /Table/%d/2/3/4/1", tableID), + + fmt.Sprintf("Del /Table/%d/1/1/2/1", tableID), + fmt.Sprintf("Put /Table/%d/1/1/3/1 -> /INT/5", tableID), + fmt.Sprintf("Del /Table/%d/1/1/4/1", tableID), + + // TODO(ssd): double-check that this trace makes + // sense. + fmt.Sprintf("Put /Table/%d/3/3/3/1 -> /BYTES/0x0a02010a", tableID), } require.Equal(t, expected, scanToArray(rows)) @@ -3309,7 +3368,10 @@ func TestGrantRevokeWhileIndexBackfill(t *testing.T) { } return nil }, - RunAfterBackfillChunk: func() { + BulkAdderFlushesEveryBatch: true, + }, + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunAfterIndexBackfill: func() { if backfillCompleteNotification != nil { // Close channel to notify that the schema change // backfill is complete and not finalized. @@ -3318,8 +3380,8 @@ func TestGrantRevokeWhileIndexBackfill(t *testing.T) { <-continueSchemaChangeNotification } }, - BulkAdderFlushesEveryBatch: true, }, + // Disable backfill migrations, we still need the jobs table migration. StartupMigrationManager: &startupmigrations.MigrationManagerTestingKnobs{ DisableBackfillMigrations: true, @@ -7674,3 +7736,203 @@ CREATE TABLE t.test (x INT);`, }) } } + +func TestMixedAddIndexStyleFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + params, _ := tests.CreateTestServerParams() + params.Knobs.Server = &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: make(chan struct{}), + BinaryVersionOverride: clusterversion.ByKey(clusterversion.MVCCIndexBackfiller - 1), + } + + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + _, err := sqlDB.Exec("CREATE TABLE t (a INT PRIMARY KEY, b INT, c INT)") + require.NoError(t, err) + + txn, err := sqlDB.Begin() + require.NoError(t, err) + _, err = txn.Exec("CREATE INDEX ON t (b)") + require.NoError(t, err) + + waitOnce := &sync.Once{} + wait := make(chan struct{}) + s.ClusterSettings().Version.SetOnChange(func(_ context.Context, newVersion clusterversion.ClusterVersion) { + if newVersion.IsActive(clusterversion.MVCCIndexBackfiller) { + waitOnce.Do(func() { close(wait) }) + } + }) + close(params.Knobs.Server.(*server.TestingKnobs).DisableAutomaticVersionUpgrade) + t.Log("waiting for version change") + <-wait + _, err = txn.Exec("CREATE INDEX ON t (c)") + require.NoError(t, err) + + err = txn.Commit() + require.Error(t, err, "expected 1 temporary indexes, but found 2; schema change may have been constructed during cluster version upgrade") +} + +func TestAddIndexResumeAfterSettingFlippedFails(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + + changeSetting := make(chan struct{}) + wait := make(chan struct{}) + params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(jobID jobspb.JobID) error { + close(changeSetting) + <-wait + return nil + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + errC := make(chan error) + + go func() { + _, err := sqlDB.Exec("CREATE TABLE t (a INT PRIMARY KEY, b INT, c INT)") + require.NoError(t, err) + _, err = sqlDB.Exec("CREATE INDEX ON t (b)") + errC <- err + }() + + <-changeSetting + _, err := sqlDB.Exec("SET CLUSTER SETTING sql.mvcc_compliant_index_creation.enabled = false") + require.NoError(t, err) + close(wait) + + require.Error(t, <-errC, "schema change requires MVCC-compliant backfiller, but MVCC-compliant backfiller is not supported") +} + +func TestPauseBeforeRandomDescTxn(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + type testCase struct { + name string + setupSQL string + changeSQL string + verify func(t *testing.T, sqlRunner *sqlutils.SQLRunner) + } + + // We run the schema change twice. First, to find out how many + // sc.txn calls there are, and then a second time that pauses + // a random one. By finding the count of txns, we make sure + // that we have an equal probability of pausing after each + // transaction. + getTxnCount := func(t *testing.T, tc testCase) int { + var ( + count int32 // accessed atomically + shouldCount int32 // accessed atomically + ) + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeDescTxn: func() error { + if atomic.LoadInt32(&shouldCount) == 1 { + atomic.AddInt32(&count, 1) + } + return nil + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + sqlRunner := sqlutils.MakeSQLRunner(sqlDB) + defer s.Stopper().Stop(ctx) + + sqlRunner.Exec(t, tc.setupSQL) + atomic.StoreInt32(&shouldCount, 1) + sqlRunner.Exec(t, tc.changeSQL) + return int(atomic.LoadInt32(&count)) + } + + runWithPauseAt := func(t *testing.T, tc testCase, pauseAt int) { + var ( + count int32 // accessed atomically + shouldPause int32 // accessed atomically + jobID jobspb.JobID + ) + + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ + RunBeforeResume: func(id jobspb.JobID) error { + jobID = id + return nil + }, + RunBeforeDescTxn: func() error { + if atomic.LoadInt32(&shouldPause) == 0 { + return nil + } + current := int(atomic.AddInt32(&count, 1)) + if current == pauseAt { + atomic.StoreInt32(&shouldPause, 0) + return jobs.MarkPauseRequestError(errors.Newf("paused sc.txn call %d", current)) + } + return nil + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + sqlRunner := sqlutils.MakeSQLRunner(sqlDB) + defer s.Stopper().Stop(ctx) + + sqlRunner.Exec(t, tc.setupSQL) + atomic.StoreInt32(&shouldPause, 1) + sqlRunner.ExpectErr(t, ".*paused sc.txn call.*", tc.changeSQL) + sqlRunner.Exec(t, "RESUME JOB $1", jobID) + + row := sqlRunner.QueryRow(t, "SELECT status FROM [SHOW JOB WHEN COMPLETE $1]", jobID) + var status string + row.Scan(&status) + require.Equal(t, "succeeded", status) + tc.verify(t, sqlRunner) + } + + rnd, _ := randutil.NewTestRand() + for _, tc := range []testCase{ + { + name: "create index", + setupSQL: ` +CREATE TABLE t (pk INT PRIMARY KEY, b INT); +INSERT INTO t VALUES (1, 1), (2, 2), (3, 3); +`, + changeSQL: "CREATE INDEX on t (b)", + verify: func(t *testing.T, sqlRunner *sqlutils.SQLRunner) { + rows := sqlutils.MatrixToStr(sqlRunner.QueryStr(t, "SELECT * FROM t@t_b_idx")) + require.Equal(t, "1, 1\n2, 2\n3, 3\n", rows) + }, + }, + } { + txnCount := getTxnCount(t, tc) + + const testAll = false + if testAll { + for i := 1; i <= txnCount; i++ { + t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, i), func(t *testing.T) { + runWithPauseAt(t, tc, i) + }) + } + } else { + pauseAt := rnd.Intn(txnCount) + 1 + t.Run(fmt.Sprintf("%s_pause_at_txn_%d", tc.name, pauseAt), func(t *testing.T) { + runWithPauseAt(t, tc, pauseAt) + + }) + } + } + +} diff --git a/pkg/sql/schemachanger/scexec/exec_backfill_test.go b/pkg/sql/schemachanger/scexec/exec_backfill_test.go index dc14c933e012..4e9711f85151 100644 --- a/pkg/sql/schemachanger/scexec/exec_backfill_test.go +++ b/pkg/sql/schemachanger/scexec/exec_backfill_test.go @@ -77,7 +77,7 @@ func TestExecBackfill(t *testing.T) { keySuffixColumnIDs = append(keySuffixColumnIDs, id) } } - require.NoError(t, mut.AddIndexMutation(&descpb.IndexDescriptor{ + require.NoError(t, mut.DeprecatedAddIndexMutation(&descpb.IndexDescriptor{ Name: name, ID: id, Version: descpb.PrimaryIndexWithStoredColumnsVersion, diff --git a/pkg/sql/schemachanger/scexec/scmutationexec/helpers.go b/pkg/sql/schemachanger/scexec/scmutationexec/helpers.go index d6cf1c0a3842..f41bdca66bc1 100644 --- a/pkg/sql/schemachanger/scexec/scmutationexec/helpers.go +++ b/pkg/sql/schemachanger/scexec/scmutationexec/helpers.go @@ -189,7 +189,7 @@ func enqueueDropColumnMutation(tbl *tabledesc.Mutable, col *descpb.ColumnDescrip } func enqueueAddIndexMutation(tbl *tabledesc.Mutable, idx *descpb.IndexDescriptor) error { - if err := tbl.AddIndexMutation(idx, descpb.DescriptorMutation_ADD); err != nil { + if err := tbl.DeprecatedAddIndexMutation(idx, descpb.DescriptorMutation_ADD); err != nil { return err } tbl.NextMutationID-- @@ -197,7 +197,7 @@ func enqueueAddIndexMutation(tbl *tabledesc.Mutable, idx *descpb.IndexDescriptor } func enqueueDropIndexMutation(tbl *tabledesc.Mutable, idx *descpb.IndexDescriptor) error { - if err := tbl.AddIndexMutation(idx, descpb.DescriptorMutation_DROP); err != nil { + if err := tbl.DeprecatedAddIndexMutation(idx, descpb.DescriptorMutation_DROP); err != nil { return err } tbl.NextMutationID-- diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index c3035530888e..72a4cb5126a8 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -324,7 +324,10 @@ func checkTableForDisallowedMutationsWithTruncate(desc *tabledesc.Mutable) error for i, m := range desc.AllMutations() { if idx := m.AsIndex(); idx != nil { // Do not allow dropping indexes. - if !m.Adding() { + // + // TODO(ssd): Are we definitely OK to allow + // truncate with these temporary drops? + if !m.Adding() && !idx.IsTemporaryIndexForBackfill() { return unimplemented.Newf( "TRUNCATE concurrent with ongoing schema change", "cannot perform TRUNCATE on %q which has indexes being dropped", desc.GetName())