diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 1ed0d478d892..94428bb2279d 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -93,38 +93,16 @@ func writeMutation( } } -func writeGCMutation( - t *testing.T, - kvDB *kv.DB, - tableDesc *tabledesc.Mutable, - m descpb.TableDescriptor_GCDescriptorMutation, -) { - tableDesc.GCMutations = append(tableDesc.GCMutations, m) - tableDesc.Version++ - if err := catalog.ValidateSelf(tableDesc); err != nil { - t.Fatal(err) - } - if err := kvDB.Put( - context.Background(), - catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()), - tableDesc.DescriptorProto(), - ); err != nil { - t.Fatal(err) - } -} - type mutationOptions struct { // Set if the desc should have any mutations of any sort. hasMutation bool - // Set if the mutation being inserted is a GCMutation. - hasGCMutation bool // Set if the desc should have a job that is dropping it. hasDropJob bool } func (m mutationOptions) string() string { - return fmt.Sprintf("hasMutation=%s_hasGCMutation=%s_hasDropJob=%s", - strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasGCMutation), + return fmt.Sprintf("hasMutation=%s_hasDropJob=%s", + strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasDropJob)) } @@ -172,7 +150,7 @@ func TestRegistryGC(t *testing.T) { writeJob := func(name string, created, finished time.Time, status Status, mutOptions mutationOptions) string { tableName := constructTableName(name, mutOptions) if _, err := sqlDB.Exec(fmt.Sprintf(` -CREATE DATABASE IF NOT EXISTS t; +CREATE DATABASE IF NOT EXISTS t; CREATE TABLE t."%s" (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR,i VARCHAR NOT NULL DEFAULT 'i'); INSERT INTO t."%s" VALUES('a', 'foo'); `, tableName, tableName)); err != nil { @@ -187,10 +165,6 @@ INSERT INTO t."%s" VALUES('a', 'foo'); writeColumnMutation(t, kvDB, tableDesc, "i", descpb.DescriptorMutation{State: descpb. DescriptorMutation_DELETE_AND_WRITE_ONLY, Direction: descpb.DescriptorMutation_DROP}) } - if mutOptions.hasGCMutation { - writeGCMutation(t, kvDB, tableDesc, descpb.TableDescriptor_GCDescriptorMutation{}) - } - payload, err := protoutil.Marshal(&jobspb.Payload{ Description: name, // register a mutation on the table so that jobs that reference @@ -222,58 +196,54 @@ INSERT INTO t."%s" VALUES('a', 'foo'); // Test the descriptor when any of the following are set. // 1. Mutations - // 2. GC Mutations - // 3. A drop job + // 2. A drop job for _, hasMutation := range []bool{true, false} { - for _, hasGCMutation := range []bool{true, false} { - for _, hasDropJob := range []bool{true, false} { - if !hasMutation && !hasGCMutation && !hasDropJob { - continue - } - mutOptions := mutationOptions{ - hasMutation: hasMutation, - hasGCMutation: hasGCMutation, - hasDropJob: hasDropJob, - } - oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions) - oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions) - oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute), - StatusFailed, mutOptions) - oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute), - StatusRevertFailed, mutOptions) - oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute), - StatusCanceled, mutOptions) - newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning, - mutOptions) - newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions) - newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions) - newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions) - newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute), - StatusCanceled, mutOptions) - - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, - {newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) - - if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil { - t.Fatal(err) - } - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob}, - {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) + for _, hasDropJob := range []bool{true, false} { + if !hasMutation && !hasDropJob { + continue + } + mutOptions := mutationOptions{ + hasMutation: hasMutation, + hasDropJob: hasDropJob, + } + oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions) + oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions) + oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute), + StatusFailed, mutOptions) + oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute), + StatusRevertFailed, mutOptions) + oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute), + StatusCanceled, mutOptions) + newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning, + mutOptions) + newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions) + newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions) + newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions) + newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute), + StatusCanceled, mutOptions) + + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob}, + {newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) + + if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil { + t.Fatal(err) + } + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob}, + {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}}) - if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil { - t.Fatal(err) - } - db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ - {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) - - // Delete the revert failed, and running jobs for the next run of the - // test. - _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`, - oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob) - require.NoError(t, err) + if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil { + t.Fatal(err) } + db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{ + {oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}}) + + // Delete the revert failed, and running jobs for the next run of the + // test. + _, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`, + oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob) + require.NoError(t, err) } } } diff --git a/pkg/sql/catalog/descpb/structured.proto b/pkg/sql/catalog/descpb/structured.proto index 29f7deb8bd81..491835e2c6df 100644 --- a/pkg/sql/catalog/descpb/structured.proto +++ b/pkg/sql/catalog/descpb/structured.proto @@ -1108,6 +1108,7 @@ message TableDescriptor { (gogoproto.customname) = "JobID", deprecated = true]; } + // Before 22.1: // The schema elements that have been dropped and whose underlying // data needs to be gc-ed. These schema elements have already transitioned // through the drop state machine when they were in the above mutations @@ -1115,8 +1116,12 @@ message TableDescriptor { // can be reused. This list is separate because mutations can // lie in this list for a long time (gc deadline) and should not block // the execution of other schema changes on the table. + // + // Since 22.1 this is field is deprecated and no longer maintained. + // The index GC job still removes mutations it finds in this list. repeated GCDescriptorMutation gc_mutations = 33 [(gogoproto.nullable) = false, - (gogoproto.customname) = "GCMutations"]; + (gogoproto.customname) = "GCMutations", + deprecated = true]; optional string create_query = 34 [(gogoproto.nullable) = false]; diff --git a/pkg/sql/catalog/descriptor.go b/pkg/sql/catalog/descriptor.go index 2f7943667a47..0f8ac459968e 100644 --- a/pkg/sql/catalog/descriptor.go +++ b/pkg/sql/catalog/descriptor.go @@ -533,8 +533,6 @@ type TableDescriptor interface { MakePublic() TableDescriptor // AllMutations returns all of the table descriptor's mutations. AllMutations() []Mutation - // GetGCMutations returns the table descriptor's GC mutations. - GetGCMutations() []descpb.TableDescriptor_GCDescriptorMutation // GetMutationJobs returns the table descriptor's mutation jobs. GetMutationJobs() []descpb.TableDescriptor_MutationJob diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index c33098b8483b..07a972323db4 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -1118,14 +1118,9 @@ INSERT INTO t.kv VALUES ('a', 'b'); } testutils.SucceedsSoon(t, func() error { - if tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "kv"); len(tableDesc.GetGCMutations()) != 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil + return tests.CheckKeyCountE(t, kvDB, tableSpan, 2) }) - tests.CheckKeyCount(t, kvDB, tableSpan, 2) - // TODO(erik, vivek): Transactions using old descriptors should fail and // rollback when the index keys have been removed by ClearRange // and the consistency issue is resolved. See #31563. diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go index 9fa4ff77453d..ba17203ec493 100644 --- a/pkg/sql/catalog/tabledesc/structured.go +++ b/pkg/sql/catalog/tabledesc/structured.go @@ -63,11 +63,6 @@ var ErrMissingColumns = errors.New("table must contain at least 1 column") // ErrMissingPrimaryKey indicates a table with no primary key. var ErrMissingPrimaryKey = errors.New("table must contain a primary key") -// ErrIndexGCMutationsList is returned by FindIndexWithID to signal that the -// index with the given ID does not have a descriptor and is in the garbage -// collected mutations list. -var ErrIndexGCMutationsList = errors.New("index in GC mutations list") - // PostDeserializationTableDescriptorChanges are a set of booleans to indicate // which types of upgrades or fixes occurred when filling in the descriptor // after deserialization. diff --git a/pkg/sql/catalog/tabledesc/table_desc.go b/pkg/sql/catalog/tabledesc/table_desc.go index f3b4d254ccbc..ea65b8579aef 100644 --- a/pkg/sql/catalog/tabledesc/table_desc.go +++ b/pkg/sql/catalog/tabledesc/table_desc.go @@ -334,11 +334,6 @@ func (desc *wrapper) FindIndexWithID(id descpb.IndexID) (catalog.Index, error) { }); idx != nil { return idx, nil } - for _, m := range desc.GCMutations { - if m.IndexID == id { - return nil, ErrIndexGCMutationsList - } - } return nil, errors.Errorf("index-id \"%d\" does not exist", id) } diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index 81f98cc9e2a2..b5bb960810ad 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -23,15 +23,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +// updateDescriptorGCMutations removes the GCMutation for the given +// index ID. We no longer populate this field, but we still search it +// to remove existing entries. func updateDescriptorGCMutations( ctx context.Context, execCfg *sql.ExecutorConfig, tableID descpb.ID, garbageCollectedIndexID descpb.IndexID, ) error { - log.Infof(ctx, "updating GCMutations for table %d after removing index %d", - tableID, garbageCollectedIndexID) - // Remove the mutation from the table descriptor. return sql.DescsTxn(ctx, execCfg, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { @@ -39,18 +39,26 @@ func updateDescriptorGCMutations( if err != nil { return err } + found := false for i := 0; i < len(tbl.GCMutations); i++ { other := tbl.GCMutations[i] if other.IndexID == garbageCollectedIndexID { tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...) + found = true break } } - b := txn.NewBatch() - if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil { - return err + if found { + log.Infof(ctx, "updating GCMutations for table %d after removing index %d", + tableID, garbageCollectedIndexID) + // Remove the mutation from the table descriptor. + b := txn.NewBatch() + if err := descsCol.WriteDescToBatch(ctx, false /* kvTrace */, tbl, b); err != nil { + return err + } + return txn.Run(ctx, b) } - return txn.Run(ctx, b) + return nil }) } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic index bf58bbfb2217..da0d5688be4d 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic +++ b/pkg/sql/opt/exec/execbuilder/testdata/show_trace_nonmetamorphic @@ -222,7 +222,7 @@ WHERE message NOT LIKE '%Z/%' AND message NOT LIKE 'querying next range at%' AND operation != 'dist sender send' ---- batch flow coordinator Del /NamespaceTable/30/1/56/57/"kv"/4/1 -batch flow coordinator Put /Table/3/1/58/2/1 -> table: parent_id:56 unexposed_parent_schema_id:57 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 gc_mutations: create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false > +batch flow coordinator Put /Table/3/1/58/2/1 -> table: parent_id:56 unexposed_parent_schema_id:57 columns: nullable:false hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > columns: nullable:true hidden:false inaccessible:false generated_as_identity_type:NOT_IDENTITY_COLUMN virtual:false pg_attribute_num:0 alter_column_type_in_progress:false system_column_kind:NONE > next_column_id:3 families: next_family_id:1 primary_index: interleave:<> partitioning: type:FORWARD created_explicitly:false encoding_type:1 sharded: disabled:false geo_config:<> predicate:"" use_delete_preserving_encoding:false > next_index_id:3 privileges: users: users: owner_proto:"root" version:2 > next_mutation_id:3 format_version:3 state:DROP offline_reason:"" view_query:"" is_materialized_view:false new_schema_change_job_id:0 drop_time:... replacement_of: > audit_mode:DISABLED drop_job_id:0 create_query:"" create_as_of_time:<...> temporary:false partition_all_by:false > exec stmt rows affected: 0 # Check that session tracing does not inhibit the fast path for inserts & diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 47f6314d9609..aa0247cd033e 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -1090,15 +1090,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } isRollback = m.IsRollback() if idx := m.AsIndex(); m.Dropped() && idx != nil { - // how we keep track of dropped index names (for, e.g., zone config - // lookups), even though in the absence of a GC job there's nothing to - // clean them up. - scTable.GCMutations = append( - scTable.GCMutations, - descpb.TableDescriptor_GCDescriptorMutation{ - IndexID: idx.GetID(), - }) - description := sc.job.Payload().Description if isRollback { description = "ROLLBACK of " + description @@ -1107,7 +1098,6 @@ func (sc *SchemaChanger) done(ctx context.Context) error { if err := sc.createIndexGCJob(ctx, idx.GetID(), txn, description); err != nil { return err } - } if constraint := m.AsConstraint(); constraint != nil && constraint.Adding() { if constraint.IsForeignKey() && constraint.ForeignKey().Validity == descpb.ConstraintValidity_Unvalidated { diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 39ccf62627cb..3f26c2773b90 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1538,15 +1538,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); return nil }) - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - - // There is still a DROP INDEX mutation waiting for GC. - if e := 1; len(tableDesc.GetGCMutations()) != e { - t.Fatalf("the table has %d instead of %d GC mutations", len(tableDesc.GetGCMutations()), e) - } else if m := tableDesc.GetGCMutations()[0]; m.IndexID != 2 && m.DropTime == 0 && m.JobID == 0 { - t.Fatalf("unexpected GC mutation %v", m) - } - // There is still some garbage index data that needs to be purged. All the // rows from k = 0 to k = chunkSize - 1 have index values. numGarbageValues := chunkSize @@ -1565,20 +1556,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // above garbage left behind. atomic.StoreUint32(&enableAsyncSchemaChanges, 1) + // No garbage left behind. testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d GC mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil + numGarbageValues = 0 + return sqltestutils.CheckTableKeyCount(ctx, kvDB, 1, maxValue+1+numGarbageValues) }) - // No garbage left behind. - numGarbageValues = 0 - if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, 1, maxValue+1+numGarbageValues); err != nil { - t.Fatal(err) - } - // A new attempt cleans up a chunk of data. if attempts != expectedAttempts+1 { t.Fatalf("%d chunk ops, despite allowing only (schema change + reverse) = %d", attempts, expectedAttempts) @@ -1908,20 +1891,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); t.Fatal(err) } - testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil - }) - ctx := context.Background() - // Check that the number of k-v pairs is accurate. - if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, 3, maxValue); err != nil { - t.Fatal(err) - } + testutils.SucceedsSoon(t, func() error { + // Check that the number of k-v pairs is accurate. + return sqltestutils.CheckTableKeyCount(ctx, kvDB, 3, maxValue) + }) // State of jobs table skip.WithIssue(t, 51796, "TODO(pbardea): The following fails due to causes seemingly unrelated to GC") @@ -2273,7 +2248,7 @@ func TestVisibilityDuringPrimaryKeyChange(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test (x INT PRIMARY KEY, y INT NOT NULL, z INT, INDEX i (z)); -INSERT INTO t.test VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); +INSERT INTO t.test VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3); `); err != nil { t.Fatal(err) } @@ -2850,11 +2825,11 @@ func TestPrimaryKeyChangeKVOps(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test ( - x INT PRIMARY KEY, - y INT NOT NULL, - z INT, - a INT, - b INT, + x INT PRIMARY KEY, + y INT NOT NULL, + z INT, + a INT, + b INT, c INT, FAMILY (x), FAMILY (y), FAMILY (z, a), FAMILY (b), FAMILY (c) ) @@ -3184,7 +3159,7 @@ func TestMultiplePrimaryKeyChanges(t *testing.T) { if _, err := sqlDB.Exec(` CREATE DATABASE t; CREATE TABLE t.test (x INT NOT NULL, y INT NOT NULL, z INT NOT NULL, w int, INDEX i (w)); -INSERT INTO t.test VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3); +INSERT INTO t.test VALUES (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3); `); err != nil { t.Fatal(err) } @@ -6016,103 +5991,6 @@ ALTER TABLE t.test2 ADD FOREIGN KEY (k) REFERENCES t.test; } } -// TestOrphanedGCMutationsRemoved tests that if a table descriptor has a -// GCMutations which references a job that does not exist anymore, that it will -// eventually be cleaned up anyway. One way this can arise is when a table -// was backed up right after an index deletion. -func TestOrphanedGCMutationsRemoved(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - skip.WithIssue(t, 51796, "TODO (lucy): get rid of this test once GCMutations goes away") - params, _ := tests.CreateTestServerParams() - const chunkSize = 200 - // Disable synchronous schema change processing so that the mutations get - // processed asynchronously. - var enableAsyncSchemaChanges uint32 - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - BackfillChunkSize: chunkSize, - }, - } - s, sqlDB, kvDB := serverutils.StartServer(t, params) - defer s.Stopper().Stop(context.Background()) - - // Disable strict GC TTL enforcement because we're going to shove a zero-value - // TTL into the system with AddImmediateGCZoneConfig. - defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)() - - retryOpts := retry.Options{ - InitialBackoff: 20 * time.Millisecond, - MaxBackoff: 200 * time.Millisecond, - Multiplier: 2, - } - - // Create a k-v table. - if _, err := sqlDB.Exec(` -CREATE DATABASE t; -CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); -`); err != nil { - t.Fatal(err) - } - if _, err := sqlDB.Exec(`CREATE INDEX t_v ON t.test(v)`); err != nil { - t.Fatal(err) - } - - // Add some data. - const maxValue = chunkSize + 1 - if err := sqltestutils.BulkInsertIntoTable(sqlDB, maxValue); err != nil { - t.Fatal(err) - } - - tableDesc := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - // Wait until indexes are created. - for r := retry.Start(retryOpts); r.Next(); { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.PublicNonPrimaryIndexes()) == 1 { - break - } - } - - if _, err := sqlDB.Exec(`DROP INDEX t.t_v`); err != nil { - t.Fatal(err) - } - - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if e := 1; e != len(tableDesc.GetGCMutations()) { - t.Fatalf("e = %d, v = %d", e, len(tableDesc.GetGCMutations())) - } - - // Delete the associated job. - jobID := tableDesc.GetGCMutations()[0].JobID - if _, err := sqlDB.Exec(fmt.Sprintf("DELETE FROM system.jobs WHERE id=%d", jobID)); err != nil { - t.Fatal(err) - } - - // Ensure the GCMutations has not yet been completed. - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if e := 1; e != len(tableDesc.GetGCMutations()) { - t.Fatalf("e = %d, v = %d", e, len(tableDesc.GetGCMutations())) - } - - // Enable async schema change processing for purged schema changes. - atomic.StoreUint32(&enableAsyncSchemaChanges, 1) - - // Add immediate GC TTL to allow index creation purge to complete. - if _, err := sqltestutils.AddImmediateGCZoneConfig(sqlDB, tableDesc.GetID()); err != nil { - t.Fatal(err) - } - - // Ensure that GC mutations that cannot find their job will eventually be - // cleared. - testutils.SucceedsSoon(t, func() error { - tableDesc = catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "t", "test") - if len(tableDesc.GetGCMutations()) > 0 { - return errors.Errorf("%d gc mutations remaining", len(tableDesc.GetGCMutations())) - } - return nil - }) -} - // TestMultipleRevert starts a schema change then cancels it. After the canceled // job, after reversing the mutations the job is set up to throw an error so // that mutations are attempted to be reverted again. The mutation shouldn't be @@ -6612,7 +6490,7 @@ func TestFailureToMarkCanceledReversalLeadsToCanceledStatus(t *testing.T) { jobsErrGroup.Go(func() error { return testutils.SucceedsSoonError(func() error { return sqlDB.QueryRow(` -SELECT job_id FROM crdb_internal.jobs +SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%` + idxName + `%'`).Scan(&jobIDs[i]) }) }) @@ -6700,7 +6578,7 @@ func TestCancelMultipleQueued(t *testing.T) { jobsErrGroup.Go(func() error { return testutils.SucceedsSoonError(func() error { return sqlDB.QueryRow(` -SELECT job_id FROM crdb_internal.jobs +SELECT job_id FROM crdb_internal.jobs WHERE description LIKE '%` + idxName + `%'`).Scan(&jobIDs[i]) }) }) diff --git a/pkg/sql/tests/data.go b/pkg/sql/tests/data.go index 37e41865e2f9..35f5c6854470 100644 --- a/pkg/sql/tests/data.go +++ b/pkg/sql/tests/data.go @@ -19,17 +19,28 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/errors" ) // CheckKeyCount checks that the number of keys in the provided span matches // numKeys. func CheckKeyCount(t *testing.T, kvDB *kv.DB, span roachpb.Span, numKeys int) { t.Helper() - if kvs, err := kvDB.Scan(context.TODO(), span.Key, span.EndKey, 0); err != nil { + if err := CheckKeyCountE(t, kvDB, span, numKeys); err != nil { t.Fatal(err) + } +} + +// CheckKeyCountE returns an error if the the number of keys in the +// provided span does not match numKeys. +func CheckKeyCountE(t *testing.T, kvDB *kv.DB, span roachpb.Span, numKeys int) error { + t.Helper() + if kvs, err := kvDB.Scan(context.TODO(), span.Key, span.EndKey, 0); err != nil { + return err } else if l := numKeys; len(kvs) != l { - t.Fatalf("expected %d key value pairs, but got %d", l, len(kvs)) + return errors.Newf("expected %d key value pairs, but got %d", l, len(kvs)) } + return nil } // CreateKVTable creates a basic table named t. that stores key/value