From 7ef42352bd356fea34d2c8b255020e9969a6405d Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Sun, 8 Mar 2020 12:22:03 -0400 Subject: [PATCH] sql: re-add GC job on schema element deletion This commit creates GC jobs upon the deletion of an index, table or database. Similarly to the previous implementation, it considers the walltime at which the schema change completed to be the drop time of the schema element. Release note (sql change): Previously, after deleting an index, table, or database the relevant schema change job would change its running status to waiting for GC TTL. The schema change and the GC process are now decoupled into 2 jobs. Release justification: This is a follow up to the migration of turning schema changes into actual jobs. This commit re-adds the ability to properly GC indexes and tables. --- pkg/base/testing_knobs.go | 1 + pkg/ccl/backupccl/backup_test.go | 16 +- pkg/ccl/partitionccl/drop_test.go | 15 +- pkg/server/server.go | 6 + pkg/sql/as_of_test.go | 4 +- pkg/sql/drop_test.go | 126 ++--- pkg/sql/exec_util.go | 1 + pkg/sql/gcjob/descriptor_utils.go | 6 + pkg/sql/gcjob/gc_job.go | 12 +- pkg/sql/gcjob/gc_job_utils.go | 2 +- pkg/sql/gcjob/refresh_statuses.go | 3 +- pkg/sql/gcjob/table_garbage_collection.go | 2 +- pkg/sql/{gcjob => gcjob_test}/gc_job_test.go | 9 +- pkg/sql/{gcjob => gcjob_test}/main_test.go | 2 +- pkg/sql/lease_test.go | 12 +- .../logictest/testdata/logic_test/alter_table | 24 +- .../testdata/logic_test/drop_database | 6 +- .../logictest/testdata/logic_test/drop_table | 6 +- .../testdata/logic_test/schema_change_in_txn | 13 +- .../logictest/testdata/logic_test/truncate | 18 +- pkg/sql/schema_changer.go | 453 +++++------------- pkg/sql/schema_changer_test.go | 81 ++-- 22 files changed, 331 insertions(+), 487 deletions(-) rename pkg/sql/{gcjob => gcjob_test}/gc_job_test.go (97%) rename pkg/sql/{gcjob => gcjob_test}/main_test.go (97%) diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 7f656d3e2e83..d66e700c7c95 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -24,6 +24,7 @@ type TestingKnobs struct { SQLExecutor ModuleTestingKnobs SQLLeaseManager ModuleTestingKnobs SQLSchemaChanger ModuleTestingKnobs + GCJob ModuleTestingKnobs PGWireTestingKnobs ModuleTestingKnobs SQLMigrationManager ModuleTestingKnobs DistSQL ModuleTestingKnobs diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 67c1ecf1c7eb..406f48fcdd83 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1347,12 +1347,10 @@ func TestRestoreFailCleanup(t *testing.T) { defer leaktest.AfterTest(t)() params := base.TestServerArgs{} - // Disable external processing of mutations so that the final check of - // crdb_internal.tables is guaranteed to not be cleaned up. Although this - // was never observed by a stress test, it is here for safety. - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + // Disable GC job so that the final check of crdb_internal.tables is + // guaranteed to not be cleaned up. Although this was never observed by a + // stress test, it is here for safety. + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} /* blocks forever */ }} const numAccounts = 1000 _, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, @@ -1395,9 +1393,8 @@ func TestRestoreFailDatabaseCleanup(t *testing.T) { // Disable external processing of mutations so that the final check of // crdb_internal.tables is guaranteed to not be cleaned up. Although this // was never observed by a stress test, it is here for safety. - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + blockGC := make(chan struct{}) + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }} const numAccounts = 1000 _, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, @@ -1428,6 +1425,7 @@ func TestRestoreFailDatabaseCleanup(t *testing.T) { t, `database "data" does not exist`, `DROP DATABASE data`, ) + close(blockGC) } func TestBackupRestoreInterleaved(t *testing.T) { diff --git a/pkg/ccl/partitionccl/drop_test.go b/pkg/ccl/partitionccl/drop_test.go index 6b7e0f676d91..fe0e03d7af71 100644 --- a/pkg/ccl/partitionccl/drop_test.go +++ b/pkg/ccl/partitionccl/drop_test.go @@ -12,10 +12,12 @@ import ( "context" "fmt" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -40,15 +42,17 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) { const numRows = 100 + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + asyncNotification := make(chan struct{}) params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Currently there's no index GC job implemented. Eventually - // the GC job needs to block until the asyncNotification channel is - // closed, which will probably need to be controlled in a schema change - // knob. + GCJob: &sql.GCJobTestingKnobs{ + RunBeforeResume: func() { + <-asyncNotification + }, }, } s, sqlDBRaw, kvDB := serverutils.StartServer(t, params) @@ -116,7 +120,6 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) { } close(asyncNotification) - t.Skip("skipping last portion of test until schema change GC job is implemented") // Wait for index drop to complete so zone configs are updated. testutils.SucceedsSoon(t, func() error { if kvs, err := kvDB.Scan(context.TODO(), indexSpan.Key, indexSpan.EndKey, 0); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 52afd9165d20..6458cdec6301 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + _ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -821,6 +822,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } else { execCfg.SchemaChangerTestingKnobs = new(sql.SchemaChangerTestingKnobs) } + if gcJobTestingKnobs := s.cfg.TestingKnobs.GCJob; gcJobTestingKnobs != nil { + execCfg.GCJobTestingKnobs = gcJobTestingKnobs.(*sql.GCJobTestingKnobs) + } else { + execCfg.GCJobTestingKnobs = new(sql.GCJobTestingKnobs) + } if distSQLRunTestingKnobs := s.cfg.TestingKnobs.DistSQL; distSQLRunTestingKnobs != nil { execCfg.DistSQLRunTestingKnobs = distSQLRunTestingKnobs.(*execinfra.TestingKnobs) } else { diff --git a/pkg/sql/as_of_test.go b/pkg/sql/as_of_test.go index cbaf2ca9c56a..11ee964137a0 100644 --- a/pkg/sql/as_of_test.go +++ b/pkg/sql/as_of_test.go @@ -33,9 +33,7 @@ func TestAsOfTime(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }} s, db, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index 7eaa5a8f1ca4..10c9457738dd 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -15,8 +15,8 @@ import ( gosql "database/sql" "fmt" "math/rand" - "sync/atomic" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config/zonepb" @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sqlmigrations" @@ -123,11 +124,6 @@ func addDefaultZoneConfig(sqlDB *gosql.DB, id sqlbase.ID) (zonepb.ZoneConfig, er func TestDropDatabase(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -297,12 +293,11 @@ CREATE DATABASE t; // Test that a dropped database's data gets deleted properly. func TestDropDatabaseDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() + + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -392,8 +387,9 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. // Maybe this test API should use an offset starting from the most recent job // instead. + jobsOffset := 4 sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 4, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, jobsOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "DROP DATABASE t CASCADE", DescriptorIDs: sqlbase.IDs{ @@ -403,7 +399,6 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") // Push a new zone config for the table with TTL=0 so the data is // deleted immediately. if _, err := addImmediateGCZoneConfig(sqlDB, tbDesc.ID); err != nil { @@ -427,9 +422,9 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatal(err) } - if err := jobutils.VerifyRunningSystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, sql.RunningStatusWaitingGC, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusRunning, jobs.Record{ Username: security.RootUser, - Description: "DROP DATABASE t CASCADE", + Description: "GC for DROP DATABASE t CASCADE", DescriptorIDs: sqlbase.IDs{ tbDesc.ID, tb2Desc.ID, }, @@ -455,7 +450,7 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); // Table 2 data is deleted. tests.CheckKeyCount(t, kvDB, table2Span, 0) - if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, jobsOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "DROP DATABASE t CASCADE", DescriptorIDs: sqlbase.IDs{ @@ -525,7 +520,6 @@ func TestDropIndex(t *testing.T) { params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ BackfillChunkSize: chunkSize, - // TODO (lucy): Un-skip this test when the GC job is implemented. }, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { @@ -570,8 +564,9 @@ func TestDropIndex(t *testing.T) { // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. // Maybe this test API should use an offset starting from the most recent job // instead. + migrationJobOffset := 4 sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 5, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: `DROP INDEX t.public.kv@foo`, DescriptorIDs: sqlbase.IDs{ @@ -594,7 +589,6 @@ func TestDropIndex(t *testing.T) { tests.CheckKeyCount(t, kvDB, newIdxSpan, numRows) tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 4*numRows) - t.Skip("skipping last portion of test until schema change GC job is implemented") clearIndexAttempt = true // Add a zone config for the table. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { @@ -602,7 +596,7 @@ func TestDropIndex(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - return jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + return jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: `DROP INDEX t.public.kv@foo`, DescriptorIDs: sqlbase.IDs{ @@ -611,6 +605,16 @@ func TestDropIndex(t *testing.T) { }) }) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + Username: security.RootUser, + Description: `GC for DROP INDEX t.public.kv@foo`, + DescriptorIDs: sqlbase.IDs{ + tableDesc.ID, + }, + }) + }) + if !emptySpan { t.Fatalf("tried to clear index with non-empty resume span") } @@ -689,7 +693,6 @@ func TestDropIndexInterleaved(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. BackfillChunkSize: chunkSize, }, } @@ -707,7 +710,6 @@ func TestDropIndexInterleaved(t *testing.T) { if _, err := sqlDB.Exec(`DROP INDEX t.intlv@intlv_idx`); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") tests.CheckKeyCount(t, kvDB, tableSpan, 2*numRows) // Ensure that index is not active. @@ -808,11 +810,10 @@ func TestDropTable(t *testing.T) { func TestDropTableDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } + + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -850,6 +851,11 @@ func TestDropTableDeleteData(t *testing.T) { } } + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + schemaChangeJobOffset := 4 + // Data hasn't been GC-ed. sqlRun := sqlutils.MakeSQLRunner(sqlDB) for i := 0; i < numTables; i++ { @@ -859,10 +865,7 @@ func TestDropTableDeleteData(t *testing.T) { tableSpan := descs[i].TableSpan() tests.CheckKeyCount(t, kvDB, tableSpan, numKeys) - // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. - // Maybe this test API should use an offset starting from the most recent job - // instead. - if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+4, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+schemaChangeJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()), DescriptorIDs: sqlbase.IDs{ @@ -873,7 +876,6 @@ func TestDropTableDeleteData(t *testing.T) { } } - t.Skip("skipping last portion of test until schema change GC job is implemented") // The closure pushes a zone config reducing the TTL to 0 for descriptor i. pushZoneCfg := func(i int) { if _, err := addImmediateGCZoneConfig(sqlDB, descs[i].ID); err != nil { @@ -893,7 +895,7 @@ func TestDropTableDeleteData(t *testing.T) { tests.CheckKeyCount(t, kvDB, tableSpan, 0) // Ensure that the job is marked as succeeded. - if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+schemaChangeJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()), DescriptorIDs: sqlbase.IDs{ @@ -902,6 +904,17 @@ func TestDropTableDeleteData(t *testing.T) { }); err != nil { t.Fatal(err) } + + // Ensure that the job is marked as succeeded. + if err := jobutils.VerifySystemJob(t, sqlRun, i, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + Username: security.RootUser, + Description: fmt.Sprintf(`GC for DROP TABLE t.public.%s`, descs[i].GetName()), + DescriptorIDs: sqlbase.IDs{ + descs[i].ID, + }, + }); err != nil { + t.Fatal(err) + } } // Push a new zone config for a few tables with TTL=0 so the data @@ -951,16 +964,11 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - blockSchemaChanges := make(chan struct{}) + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - SchemaChangeJobNoOp: func() bool { - return true - }, - // TODO (lucy): Un-skip this test when the GC job is implemented, and set - // the knob to block until we're ready to GC - }, SQLMigrationManager: &sqlmigrations.MigrationManagerTestingKnobs{ DisableBackfillMigrations: true, }, @@ -970,12 +978,13 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + // Disable strict GC TTL enforcement because we're going to shove a zero-value + // TTL into the system with addImmediateGCZoneConfig. + defer disableGCTTLStrictEnforcement(t, sqlDBRaw)() + const numRows = 100 sqlutils.CreateTable(t, sqlDBRaw, "t", "a INT", numRows, sqlutils.ToRowFn(sqlutils.RowIdxFn)) - // Set TTL so the data is deleted immediately. - sqlDB.Exec(t, `ALTER TABLE test.t CONFIGURE ZONE USING gc.ttlseconds = 1`) - // Give the table an old format version. tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", "t") tableDesc.FormatVersion = sqlbase.FamilyFormatVersion @@ -991,7 +1000,11 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { // Simulate a migration upgrading the table descriptor's format version after // the table has been dropped but before the truncation has occurred. - tableDesc = sqlbase.GetTableDescriptor(kvDB, "test", "t") + var err error + tableDesc, err = sqlbase.GetTableDescFromID(ctx, kvDB.NewTxn(ctx, ""), tableDesc.ID) + if err != nil { + t.Fatal(err) + } if !tableDesc.Dropped() { t.Fatalf("expected descriptor to be in DROP state, but was in %s", tableDesc.State) } @@ -1001,10 +1014,13 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") + // Set TTL so the data is deleted immediately. + if _, err := addImmediateGCZoneConfig(sqlDBRaw, tableDesc.ID); err != nil { + t.Fatal(err) + } + // Allow the schema change to proceed and verify that the data is eventually // deleted, despite the interleaved modification to the table descriptor. - close(blockSchemaChanges) testutils.SucceedsSoon(t, func() error { return descExists(sqlDBRaw, false, tableDesc.ID) }) @@ -1016,13 +1032,10 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { func TestDropTableInterleavedDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - var enableAsync uint32 - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented, and set - // the knob to block until we're ready to GC - }, - } + + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -1046,8 +1059,9 @@ func TestDropTableInterleavedDeleteData(t *testing.T) { t.Fatalf("different error than expected: %v", err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") - atomic.StoreUint32(&enableAsync, 1) + if _, err := addImmediateGCZoneConfig(sqlDB, tableDescInterleaved.ID); err != nil { + t.Fatal(err) + } testutils.SucceedsSoon(t, func() error { return descExists(sqlDB, false, tableDescInterleaved.ID) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 111815cfa577..4344d55e1cfe 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -572,6 +572,7 @@ type ExecutorConfig struct { TestingKnobs ExecutorTestingKnobs PGWireTestingKnobs *PGWireTestingKnobs SchemaChangerTestingKnobs *SchemaChangerTestingKnobs + GCJobTestingKnobs *GCJobTestingKnobs DistSQLRunTestingKnobs *execinfra.TestingKnobs EvalContextTestingKnobs tree.EvalContextTestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index c5335cb5ec7e..11f818443b67 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -59,6 +59,9 @@ func dropTableDesc(ctx context.Context, db *kv.DB, tableDesc *sqlbase.TableDescr zoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(tableDesc.ID)) return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetSystemConfigTrigger(); err != nil { + return err + } b := &kv.Batch{} // Delete the descriptor. b.Del(descKey) @@ -75,6 +78,9 @@ func dropTableDesc(ctx context.Context, db *kv.DB, tableDesc *sqlbase.TableDescr // deleteDatabaseZoneConfig removes the zone config for a given database ID. func deleteDatabaseZoneConfig(ctx context.Context, db *kv.DB, databaseID sqlbase.ID) error { return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetSystemConfigTrigger(); err != nil { + return err + } b := &kv.Batch{} // Delete the zone config entry for the dropped database associated with the diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index f69c27b93f4f..a55c65ceb667 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -38,7 +38,6 @@ type schemaChangeGCResumer struct { func performGC( ctx context.Context, execCfg *sql.ExecutorConfig, - jobID int64, details *jobspb.SchemaChangeGCDetails, progress *jobspb.SchemaChangeGCProgress, ) error { @@ -58,8 +57,6 @@ func performGC( } } } - - persistProgress(ctx, execCfg, jobID, progress) return nil } @@ -70,6 +67,9 @@ func (r schemaChangeGCResumer) Resume( p := phs.(sql.PlanHookState) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() + if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { + fn() + } details, progress, err := initDetailsAndProgress(ctx, execCfg, r.jobID) if err != nil { return err @@ -79,7 +79,7 @@ func (r schemaChangeGCResumer) Resume( allTables := getAllTablesWaitingForGC(details, progress) expired, earliestDeadline := refreshTables(ctx, execCfg, allTables, tableDropTimes, indexDropTimes, r.jobID, progress) - timerDuration := time.Until(earliestDeadline) + timerDuration := timeutil.Until(earliestDeadline) if expired { timerDuration = 0 } else if timerDuration > MaxSQLGCInterval { @@ -118,9 +118,11 @@ func (r schemaChangeGCResumer) Resume( remainingTables := getAllTablesWaitingForGC(details, progress) _, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress) - if err := performGC(ctx, execCfg, r.jobID, details, progress); err != nil { + if err := performGC(ctx, execCfg, details, progress); err != nil { return err } + + persistProgress(ctx, execCfg, r.jobID, progress) if isDoneGC(progress) { return nil } diff --git a/pkg/sql/gcjob/gc_job_utils.go b/pkg/sql/gcjob/gc_job_utils.go index c85be92b175d..18b853b2b5a1 100644 --- a/pkg/sql/gcjob/gc_job_utils.go +++ b/pkg/sql/gcjob/gc_job_utils.go @@ -176,7 +176,7 @@ func persistProgress( if err := job.SetProgress(ctx, *progress); err != nil { return err } - log.Infof(ctx, "updated progress payload %+v", progress) + log.Infof(ctx, "updated progress payload: %+v", progress) return nil }); err != nil { log.Warningf(ctx, "failed to update job's progress payload err: %+v", err) diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 35d50542ce5d..8dcdf2234d10 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -143,7 +143,6 @@ func updateTableStatus( progress *jobspb.SchemaChangeGCProgress, ) time.Time { deadline := timeutil.Unix(0, int64(math.MaxInt64)) - lifetime := ttlSeconds * time.Second.Nanoseconds() sp := table.TableSpan() for i, t := range progress.Tables { @@ -152,7 +151,7 @@ func updateTableStatus( continue } - deadlineNanos := tableDropTimes[t.ID] + lifetime + deadlineNanos := tableDropTimes[t.ID] + ttlSeconds*time.Second.Nanoseconds() deadline = timeutil.Unix(0, deadlineNanos) if isProtected(ctx, protectedtsCache, tableDropTimes[t.ID], sp) { log.Infof(ctx, "a timestamp protection delayed GC of table %d", t.ID) diff --git a/pkg/sql/gcjob/table_garbage_collection.go b/pkg/sql/gcjob/table_garbage_collection.go index f77e0a6df662..d939c8ec0ce0 100644 --- a/pkg/sql/gcjob/table_garbage_collection.go +++ b/pkg/sql/gcjob/table_garbage_collection.go @@ -137,7 +137,7 @@ func clearTableData( }) log.VEventf(ctx, 2, "ClearRange %s - %s", lastKey, endKey) if err := db.Run(ctx, &b); err != nil { - return err + return errors.Wrapf(err, "clear range %s - %s", lastKey, endKey) } n = 0 lastKey = endKey diff --git a/pkg/sql/gcjob/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go similarity index 97% rename from pkg/sql/gcjob/gc_job_test.go rename to pkg/sql/gcjob_test/gc_job_test.go index e5e8acee8619..d6e44eb69887 100644 --- a/pkg/sql/gcjob/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package gcjob +package gcjob_test import ( "context" @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -38,10 +39,10 @@ func TestSchemaChangeGCJob(t *testing.T) { defer func(oldAdoptInterval, oldGCInterval time.Duration) { jobs.DefaultAdoptInterval = oldAdoptInterval - MaxSQLGCInterval = oldGCInterval - }(jobs.DefaultAdoptInterval, MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = oldGCInterval + }(jobs.DefaultAdoptInterval, gcjob.MaxSQLGCInterval) jobs.DefaultAdoptInterval = 100 * time.Millisecond - MaxSQLGCInterval = 100 * time.Millisecond + gcjob.MaxSQLGCInterval = 500 * time.Millisecond type DropItem int const ( diff --git a/pkg/sql/gcjob/main_test.go b/pkg/sql/gcjob_test/main_test.go similarity index 97% rename from pkg/sql/gcjob/main_test.go rename to pkg/sql/gcjob_test/main_test.go index f94d78246416..9f986e156bb4 100644 --- a/pkg/sql/gcjob/main_test.go +++ b/pkg/sql/gcjob_test/main_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package gcjob +package gcjob_test import ( "os" diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 1ee321eaa187..4ea8483c2ca3 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -526,8 +526,9 @@ func TestCantLeaseDeletedTable(testingT *testing.T) { defer mu.Unlock() return clearSchemaChangers }, - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, } t := newLeaseTest(testingT, params) @@ -616,8 +617,9 @@ func TestLeasesOnDeletedTableAreReleasedImmediately(t *testing.T) { defer mu.Unlock() return clearSchemaChangers }, - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, } s, db, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -908,11 +910,6 @@ CREATE TABLE t.foo (v INT); func TestTxnObeysTableModificationTime(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -1090,7 +1087,6 @@ INSERT INTO t.kv VALUES ('a', 'b'); tableSpan := tableDesc.TableSpan() tests.CheckKeyCount(t, kvDB, tableSpan, 4) - t.Skip("skipping last portion of test until schema change GC job is implemented") // Allow async schema change waiting for GC to complete (when dropping an // index) and clear the index keys. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 7bd72885fc89..10ea4219e6a7 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -67,15 +67,15 @@ statement error pgcode 23505 violates unique constraint "bar" ALTER TABLE t ADD CONSTRAINT bar UNIQUE (c) # Test that rollback was successful -# TODO (lucy): Update once we have a separate GC job query TTTTTR SELECT job_type, regexp_replace(description, 'JOB \d+', 'JOB ...'), user_name, status, running_status, fraction_completed::decimal(10,2) FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root failed NULL 0.00 +SCHEMA CHANGE GC GC for ROLLBACK of ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root running NULL 0.00 +SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root failed NULL 0.00 query IIII colnames,rowsort SELECT * FROM t @@ -188,15 +188,15 @@ ALTER TABLE t DROP CONSTRAINT foo statement ok DROP INDEX foo CASCADE -# TODO (lucy): Update once we have a separate GC job query TTTTTRT SELECT job_type, description, user_name, status, running_status, fraction_completed, error FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE DROP INDEX test.public.t@foo CASCADE root succeeded NULL 1 · +SCHEMA CHANGE GC GC for DROP INDEX test.public.t@foo CASCADE root running NULL 0 · +SCHEMA CHANGE DROP INDEX test.public.t@foo CASCADE root succeeded NULL 1 · query TTBITTBB colnames SHOW INDEXES FROM t @@ -273,15 +273,15 @@ INSERT INTO t (a, d, x, y, z) VALUES (33, 34, DECIMAL '2.0', DECIMAL '2.1', DECI statement ok DROP INDEX t@t_f_idx -# TODO (lucy): Update once we have a separate GC job query TTTTTRT SELECT job_type, description, user_name, status, running_status, fraction_completed, error FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE DROP INDEX test.public.t@t_f_idx root succeeded NULL 1 · +SCHEMA CHANGE GC GC for DROP INDEX test.public.t@t_f_idx root running NULL 0 · +SCHEMA CHANGE DROP INDEX test.public.t@t_f_idx root succeeded NULL 1 · statement ok ALTER TABLE t DROP COLUMN f diff --git a/pkg/sql/logictest/testdata/logic_test/drop_database b/pkg/sql/logictest/testdata/logic_test/drop_database index 10b004e28e56..d42237b0ddc3 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_database +++ b/pkg/sql/logictest/testdata/logic_test/drop_database @@ -34,15 +34,15 @@ postgres system test -# TODO (lucy): Update this once we have GC jobs. # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE description != 'updating privileges' +SELECT job_type, status FROM [SHOW JOBS] WHERE description != 'updating privileges' ---- -succeeded NULL +SCHEMA CHANGE succeeded +SCHEMA CHANGE GC running statement ok CREATE DATABASE "foo bar" diff --git a/pkg/sql/logictest/testdata/logic_test/drop_table b/pkg/sql/logictest/testdata/logic_test/drop_table index 17d7a8fdd2e6..ffaa39b64d26 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_table +++ b/pkg/sql/logictest/testdata/logic_test/drop_table @@ -25,15 +25,15 @@ SELECT * FROM a statement ok DROP TABLE a -# TODO (lucy): Update this once we have GC jobs. # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +SELECT job_type, status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' OR (job_type = 'SCHEMA CHANGE' AND description != 'updating privileges') ---- -succeeded NULL +SCHEMA CHANGE succeeded +SCHEMA CHANGE GC running query T SHOW TABLES FROM test diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn index 478da67c0c80..57e8115409e0 100644 --- a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn @@ -771,14 +771,19 @@ k CHAR false NULL · {primary} false query error pq: index "j_idx" not found SELECT * FROM customers@j_idx -# TODO (lucy): Update once we have a separate GC job -query TTT +query TT SELECT status, - running_status, regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' ORDER BY job_id DESC LIMIT 1 ---- -failed NULL ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) +failed ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) + +query TT +SELECT status, + regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description + FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' ORDER BY job_id DESC LIMIT 1 +---- +running GC for ROLLBACK of ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) subtest add_multiple_computed_elements diff --git a/pkg/sql/logictest/testdata/logic_test/truncate b/pkg/sql/logictest/testdata/logic_test/truncate index f926f598d1a4..23e9ce456a0c 100644 --- a/pkg/sql/logictest/testdata/logic_test/truncate +++ b/pkg/sql/logictest/testdata/logic_test/truncate @@ -48,18 +48,22 @@ query II SELECT * FROM kview ---- -# TODO (lucy): update with GC job # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. -query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +query T +SELECT status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +---- +succeeded +succeeded +succeeded +succeeded + +query T +SELECT status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' ---- -succeeded NULL -succeeded NULL -succeeded NULL -succeeded NULL +running # Ensure that TRUNCATE works with a self referential FK. statement ok diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 5197cfe1aa17..8a74e4089776 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -17,10 +17,8 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -46,15 +44,9 @@ import ( ) const ( - // RunningStatusDrainingNames is for jobs that are currently in progress and - // are draining names. - RunningStatusDrainingNames jobs.RunningStatus = "draining names" // RunningStatusWaitingGC is for jobs that are currently in progress and // are waiting for the GC interval to expire RunningStatusWaitingGC jobs.RunningStatus = "waiting for GC TTL" - // RunningStatusCompaction is for jobs that are currently in progress and - // undergoing RocksDB compaction - RunningStatusCompaction jobs.RunningStatus = "RocksDB compaction" // RunningStatusDeleteOnly is for jobs that are currently waiting on // the cluster to converge to seeing the schema element in the DELETE_ONLY // state. @@ -71,28 +63,14 @@ const ( RunningStatusValidation jobs.RunningStatus = "validating schema" ) -// TODO (lucy): After refactoring MaybeGCMutations to be in its own job, this -// will probably go away. For now, preserve it the way it is even though nothing -// is using it. -type droppedIndex struct { - indexID sqlbase.IndexID - //lint:ignore U1001 see above comment - dropTime int64 - deadline int64 -} - // SchemaChanger is used to change the schema on a table. type SchemaChanger struct { - tableID sqlbase.ID - mutationID sqlbase.MutationID - nodeID roachpb.NodeID - db *kv.DB - leaseMgr *LeaseManager - - // TODO (lucy): Replace this with job state once we have a GC job. This is - // only still here because the (currently unused) MaybeGCMutations depends on - // it. - dropIndexTimes []droppedIndex + tableID sqlbase.ID + mutationID sqlbase.MutationID + droppedDatabaseID sqlbase.ID + nodeID roachpb.NodeID + db *kv.DB + leaseMgr *LeaseManager testingKnobs *SchemaChangerTestingKnobs distSQLPlanner *DistSQLPlanner @@ -212,171 +190,6 @@ func (e errTableVersionMismatch) Error() string { return fmt.Sprintf("table version mismatch: %d, expected: %d", e.version, e.expected) } -// DropTableDesc removes a descriptor from the KV database. -func (sc *SchemaChanger) DropTableDesc( - ctx context.Context, tableDesc *sqlbase.TableDescriptor, traceKV bool, -) error { - descKey := sqlbase.MakeDescMetadataKey(tableDesc.ID) - zoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(tableDesc.ID)) - - // Finished deleting all the table data, now delete the table meta data. - return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // Delete table descriptor - b := &kv.Batch{} - if traceKV { - log.VEventf(ctx, 2, "Del %s", descKey) - log.VEventf(ctx, 2, "DelRange %s", zoneKeyPrefix) - } - // Delete the descriptor. - b.Del(descKey) - // Delete the zone config entry for this table. - b.DelRange(zoneKeyPrefix, zoneKeyPrefix.PrefixEnd(), false /* returnKeys */) - if err := txn.SetSystemConfigTrigger(); err != nil { - return err - } - - if tableDesc.GetDropJobID() != 0 { - if err := sc.updateDropTableJob( - ctx, - txn, - tableDesc.GetDropJobID(), - tableDesc.ID, - jobspb.Status_DONE, - func(ctx context.Context, txn *kv.Txn, job *jobs.Job) error { - // Delete the zone config entry for the dropped database associated - // with the job, if it exists. - details := job.Details().(jobspb.SchemaChangeDetails) - if details.DroppedDatabaseID == sqlbase.InvalidID { - return nil - } - dbZoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(details.DroppedDatabaseID)) - if traceKV { - log.VEventf(ctx, 2, "DelRange %s", zoneKeyPrefix) - } - b.DelRange(dbZoneKeyPrefix, dbZoneKeyPrefix.PrefixEnd(), false /* returnKeys */) - return nil - }); err != nil { - log.Warningf(ctx, "failed to update job status: %+v", err) - } - } - return txn.Run(ctx, b) - }) -} - -// truncateTable deletes all of the data in the specified table. -func (sc *SchemaChanger) truncateTable(ctx context.Context, table *sqlbase.TableDescriptor) error { - // If DropTime isn't set, assume this drop request is from a version - // 1.1 server and invoke legacy code that uses DeleteRange and range GC. - if table.DropTime == 0 { - return ClearTableDataInChunks(ctx, table, sc.db, false /* traceKV */) - } - - tableKey := roachpb.RKey(keys.MakeTablePrefix(uint32(table.ID))) - tableSpan := roachpb.RSpan{Key: tableKey, EndKey: tableKey.PrefixEnd()} - - // ClearRange requests lays down RocksDB range deletion tombstones that have - // serious performance implications (#24029). The logic below attempts to - // bound the number of tombstones in one store by sending the ClearRange - // requests to each range in the table in small, sequential batches rather - // than letting DistSender send them all in parallel, to hopefully give the - // compaction queue time to compact the range tombstones away in between - // requests. - // - // As written, this approach has several deficiencies. It does not actually - // wait for the compaction queue to compact the tombstones away before - // sending the next request. It is likely insufficient if multiple DROP - // TABLEs are in flight at once. It does not save its progress in case the - // coordinator goes down. These deficiences could be addressed, but this code - // was originally a stopgap to avoid the range tombstone performance hit. The - // RocksDB range tombstone implementation has since been improved and the - // performance implications of many range tombstones has been reduced - // dramatically making this simplistic throttling sufficient. - - // These numbers were chosen empirically for the clearrange roachtest and - // could certainly use more tuning. - const batchSize = 100 - const waitTime = 500 * time.Millisecond - - var n int - lastKey := tableSpan.Key - ri := kvcoord.NewRangeIterator(sc.execCfg.DistSender) - for ri.Seek(ctx, tableSpan.Key, kvcoord.Ascending); ; ri.Next(ctx) { - if !ri.Valid() { - return ri.Error().GoError() - } - - if n++; n >= batchSize || !ri.NeedAnother(tableSpan) { - endKey := ri.Desc().EndKey - if tableSpan.EndKey.Less(endKey) { - endKey = tableSpan.EndKey - } - var b kv.Batch - b.AddRawRequest(&roachpb.ClearRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: lastKey.AsRawKey(), - EndKey: endKey.AsRawKey(), - }, - }) - log.VEventf(ctx, 2, "ClearRange %s - %s", lastKey, endKey) - if err := sc.db.Run(ctx, &b); err != nil { - return err - } - n = 0 - lastKey = endKey - time.Sleep(waitTime) - } - - if !ri.NeedAnother(tableSpan) { - break - } - } - - return nil -} - -// Silence unused warning. -var _ = (*SchemaChanger).maybeDropTable - -// maybe Drop a table. Return nil if successfully dropped. -func (sc *SchemaChanger) maybeDropTable(ctx context.Context, table *sqlbase.TableDescriptor) error { - if !table.Dropped() { - return nil - } - - // This can happen if a change other than the drop originally - // scheduled the changer for this table. If that's the case, - // we still need to wait for the deadline to expire. - if table.DropTime != 0 { - var timeRemaining time.Duration - if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - timeRemaining = 0 - _, zoneCfg, _, err := GetZoneConfigInTxn(ctx, txn, uint32(table.ID), - &sqlbase.IndexDescriptor{}, "", false /* getInheritedDefault */) - if err != nil { - return err - } - deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds() - timeRemaining = timeutil.Since(timeutil.Unix(0, deadline)) - return nil - }); err != nil { - return err - } - if timeRemaining < 0 { - return errNotHitGCTTLDeadline - } - } - - // Do all the hard work of deleting the table data and the table ID. - if err := sc.truncateTable(ctx, table); err != nil { - return err - } - - if err := sc.DropTableDesc(ctx, table, false /* traceKV */); err != nil { - return err - } - return nil -} - // maybe backfill a created table by executing the AS query. Return nil if // successfully backfilled. // @@ -513,133 +326,6 @@ func (sc *SchemaChanger) maybeMakeAddTablePublic( return nil } -// Silence unused warning. -var _ = (*SchemaChanger).maybeGCMutations - -func (sc *SchemaChanger) maybeGCMutations( - ctx context.Context, table *sqlbase.TableDescriptor, -) error { - if len(table.GCMutations) == 0 || len(sc.dropIndexTimes) == 0 { - return nil - } - - // Don't perform GC work if there are non-GC mutations waiting. - if len(table.Mutations) > 0 { - return nil - } - - // Find dropped index with earliest GC deadline. - dropped := sc.dropIndexTimes[0] - for i := 1; i < len(sc.dropIndexTimes); i++ { - if other := sc.dropIndexTimes[i]; other.deadline < dropped.deadline { - dropped = other - } - } - - var mutation sqlbase.TableDescriptor_GCDescriptorMutation - found := false - for _, gcm := range table.GCMutations { - if gcm.IndexID == sc.dropIndexTimes[0].indexID { - found = true - mutation = gcm - break - } - } - if !found { - return errors.AssertionFailedf("no GC mutation for index %d", errors.Safe(sc.dropIndexTimes[0].indexID)) - } - - // Check if the deadline for GC'd dropped index expired because - // a change other than the drop could have scheduled the changer - // for this table. - timeRemaining := timeutil.Since(timeutil.Unix(0, dropped.deadline)) - if timeRemaining < 0 { - // Return nil to allow other any mutations to make progress. - return nil - } - if err := sc.truncateIndexes(ctx, table.Version, []sqlbase.IndexDescriptor{{ID: mutation.IndexID}}); err != nil { - return err - } - - _, err := sc.leaseMgr.Publish( - ctx, - table.ID, - func(tbl *sqlbase.MutableTableDescriptor) error { - found := false - for i := 0; i < len(tbl.GCMutations); i++ { - if other := tbl.GCMutations[i]; other.IndexID == mutation.IndexID { - tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...) - found = true - break - } - } - - if !found { - return errDidntUpdateDescriptor - } - - return nil - }, - nil, - ) - - return err -} - -func (sc *SchemaChanger) updateDropTableJob( - ctx context.Context, - txn *kv.Txn, - jobID int64, - tableID sqlbase.ID, - status jobspb.Status, - onSuccess func(context.Context, *kv.Txn, *jobs.Job) error, -) error { - job, err := sc.jobRegistry.LoadJobWithTxn(ctx, jobID, txn) - if err != nil { - return err - } - - schemaDetails, ok := job.Details().(jobspb.SchemaChangeDetails) - if !ok { - return errors.AssertionFailedf("unexpected details for job: %T", job.Details()) - } - - lowestStatus := jobspb.Status_DONE - for i := range schemaDetails.DroppedTables { - if tableID == schemaDetails.DroppedTables[i].ID { - schemaDetails.DroppedTables[i].Status = status - } - - if lowestStatus > schemaDetails.DroppedTables[i].Status { - lowestStatus = schemaDetails.DroppedTables[i].Status - } - } - - var runningStatus jobs.RunningStatus - switch lowestStatus { - case jobspb.Status_DRAINING_NAMES: - runningStatus = RunningStatusDrainingNames - case jobspb.Status_WAIT_FOR_GC_INTERVAL: - runningStatus = RunningStatusWaitingGC - case jobspb.Status_ROCKSDB_COMPACTION: - runningStatus = RunningStatusCompaction - case jobspb.Status_DONE: - return job.WithTxn(txn).Succeeded(ctx, func(ctx context.Context, txn *kv.Txn) error { - return onSuccess(ctx, txn, job) - }) - default: - return errors.AssertionFailedf("unexpected dropped table status %d", errors.Safe(lowestStatus)) - } - - if err := job.WithTxn(txn).SetDetails(ctx, schemaDetails); err != nil { - return err - } - - return job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, _ jobspb.Details) (jobs.RunningStatus, error) { - return runningStatus, nil - }) -} - // Drain old names from the cluster. func (sc *SchemaChanger) drainNames(ctx context.Context) error { // Publish a new version with all the names drained after everyone @@ -674,6 +360,62 @@ func (sc *SchemaChanger) drainNames(ctx context.Context) error { return err } +func startGCJob( + ctx context.Context, + db *kv.DB, + jobRegistry *jobs.Registry, + username string, + schemaChangeDescription string, + details jobspb.SchemaChangeGCDetails, +) error { + var sj *jobs.StartableJob + descriptorIDs := make([]sqlbase.ID, 0) + if len(details.Indexes) > 0 { + if len(descriptorIDs) == 0 { + descriptorIDs = []sqlbase.ID{details.ParentID} + } + } else if len(details.Tables) > 0 { + for _, table := range details.Tables { + descriptorIDs = append(descriptorIDs, table.ID) + } + } else { + // Nothing to GC. + return nil + } + + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + jobRecord := jobs.Record{ + Description: fmt.Sprintf("GC for %s", schemaChangeDescription), + Username: username, + DescriptorIDs: descriptorIDs, + Details: details, + Progress: jobspb.SchemaChangeGCProgress{}, + NonCancelable: true, + } + var err error + if sj, err = jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn, nil /* resultCh */); err != nil { + return err + } + return nil + }); err != nil { + return err + } + if _, err := sj.Start(ctx); err != nil { + return err + } + return nil +} + +func (sc *SchemaChanger) startGCJob( + ctx context.Context, details jobspb.SchemaChangeGCDetails, isRollback bool, +) error { + description := sc.job.Payload().Description + if isRollback { + description = "ROLLBACK of " + description + } + return startGCJob(ctx, sc.db, sc.jobRegistry, sc.job.Payload().Username, description, details) +} + // Execute the entire schema change in steps. // inSession is set to false when this is called from the asynchronous // schema change execution path. @@ -711,9 +453,24 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { } } - // TODO (lucy): Skip table GC for now so we can return results to the client - // immediately after draining names when a table is dropped. Eventually we - // will queue a separate job to do this. + if tableDesc.Dropped() && sc.droppedDatabaseID == sqlbase.InvalidID { + // We've dropped this table, let's kick off a GC job. + dropTime := timeutil.Now().UnixNano() + if tableDesc.DropTime > 0 { + dropTime = tableDesc.DropTime + } + gcDetails := jobspb.SchemaChangeGCDetails{ + Tables: []jobspb.SchemaChangeGCDetails_DroppedID{ + { + ID: tableDesc.ID, + DropTime: dropTime, + }, + }, + } + if err := sc.startGCJob(ctx, gcDetails, false /* isRollback */); err != nil { + return err + } + } if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc); err != nil { return err @@ -723,8 +480,6 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { return err } - // TODO (lucy): Skip index GC for now, see above - // Wait for the schema change to propagate to all nodes after this function // returns, so that the new schema is live everywhere. This is not needed for // correctness but is done to make the UI experience/tests predictable. @@ -762,6 +517,9 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { // Run through mutation state machine and backfill. err = sc.runStateMachineAndBackfill(ctx) + if err != nil { + return err + } defer func() { if err := waitToUpdateLeases(err == nil /* refreshStats */); err != nil && !errors.Is(err, sqlbase.ErrDescriptorNotFound) { @@ -997,6 +755,7 @@ func (sc *SchemaChanger) waitToUpdateLeases(ctx context.Context, tableID sqlbase // done finalizes the mutations (adds new cols/indexes to the table). // It ensures that all nodes are on the current (pre-update) version of the // schema. +// It also kicks off GC jobs as needed. // Returns the updated descriptor. func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescriptor, error) { isRollback := false @@ -1080,7 +839,6 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr if indexDesc := mutation.GetIndex(); mutation.Direction == sqlbase.DescriptorMutation_DROP && indexDesc != nil { if canClearRangeForDrop(indexDesc) { - // We continue adding dropped indexes to GCMutations because that's // how we keep track of dropped index names (for, e.g., zone config // lookups), even though in the absence of a GC job there's nothing to // clean them up. @@ -1089,6 +847,20 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr sqlbase.TableDescriptor_GCDescriptorMutation{ IndexID: indexDesc.ID, }) + + dropTime := timeutil.Now().UnixNano() + indexGCDetails := jobspb.SchemaChangeGCDetails{ + Indexes: []jobspb.SchemaChangeGCDetails_DroppedIndex{ + { + IndexID: indexDesc.ID, + DropTime: dropTime, + }, + }, + ParentID: sc.tableID, + } + if err := sc.startGCJob(ctx, indexGCDetails, isRollback); err != nil { + return err + } } } if constraint := mutation.GetConstraint(); constraint != nil && @@ -1625,6 +1397,16 @@ func (sc *SchemaChanger) reverseMutation( return mutation, columns } +// GCJobTestingKnobs is for testing the Schema Changer GC job. +// Note that this is defined here for testing purposes to avoid cyclic +// dependencies. +type GCJobTestingKnobs struct { + RunBeforeResume func() +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (*GCJobTestingKnobs) ModuleTestingKnobs() {} + // SchemaChangerTestingKnobs for testing the schema change execution path // through both the synchronous and asynchronous paths. type SchemaChangerTestingKnobs struct { @@ -1740,7 +1522,6 @@ type schemaChangeResumer struct { job *jobs.Job } -// Resume is part of the jobs.Resumer interface. func (r schemaChangeResumer) Resume( ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, ) error { @@ -1751,10 +1532,11 @@ func (r schemaChangeResumer) Resume( return nil } - execSchemaChange := func(tableID sqlbase.ID, mutationID sqlbase.MutationID) error { + execSchemaChange := func(tableID sqlbase.ID, mutationID sqlbase.MutationID, droppedDatabaseID sqlbase.ID) error { sc := SchemaChanger{ tableID: tableID, mutationID: mutationID, + droppedDatabaseID: droppedDatabaseID, nodeID: p.ExecCfg().NodeID.Get(), db: p.ExecCfg().DB, leaseMgr: p.ExecCfg().LeaseManager, @@ -1807,16 +1589,25 @@ func (r schemaChangeResumer) Resume( if details.DroppedDatabaseID != sqlbase.InvalidID { for i := range details.DroppedTables { droppedTable := &details.DroppedTables[i] - if err := execSchemaChange(droppedTable.ID, sqlbase.InvalidMutationID); err != nil { + if err := execSchemaChange(droppedTable.ID, sqlbase.InvalidMutationID, details.DroppedDatabaseID); err != nil { return err } } - return nil + dropTime := timeutil.Now().UnixNano() + tablesToGC := make([]jobspb.SchemaChangeGCDetails_DroppedID, len(details.DroppedTables)) + for i, table := range details.DroppedTables { + tablesToGC[i] = jobspb.SchemaChangeGCDetails_DroppedID{ID: table.ID, DropTime: dropTime} + } + databaseGCDetails := jobspb.SchemaChangeGCDetails{ + Tables: tablesToGC, + ParentID: details.DroppedDatabaseID, + } + return startGCJob(ctx, p.ExecCfg().DB, p.ExecCfg().JobRegistry, r.job.Payload().Username, r.job.Payload().Description, databaseGCDetails) } if details.TableID == sqlbase.InvalidID { return errors.AssertionFailedf("job has no database ID or table ID") } - return execSchemaChange(details.TableID, details.MutationID) + return execSchemaChange(details.TableID, details.MutationID, details.DroppedDatabaseID) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 539ff838b389..eb15c617cfce 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -454,9 +455,10 @@ func TestRaceWithBackfill(t *testing.T) { } params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { notifyBackfill() @@ -619,9 +621,10 @@ func TestDropWhileBackfill(t *testing.T) { } params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { if partialBackfillDone.Load().(bool) { @@ -730,11 +733,12 @@ func TestBackfillErrors(t *testing.T) { const numNodes, chunkSize, maxValue = 5, 100, 4000 params, _ := tests.CreateTestServerParams() + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, } tc := serverutils.StartTestCluster(t, numNodes, @@ -823,6 +827,7 @@ CREATE UNIQUE INDEX vidx ON t.test (v); if err := checkTableKeyCountExact(ctx, kvDB, keyCount); err != nil { t.Fatal(err) } + close(blockGC) } // Test aborting a schema change backfill transaction and check that the @@ -1514,14 +1519,16 @@ func TestSchemaChangeFailureAfterCheckpointing(t *testing.T) { // attempt 2: writing the third chunk returns a permanent failure // purge the schema change. expectedAttempts := 3 + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, // Aggressively checkpoint, so that a schema change // failure happens after a checkpoint has been written. WriteCheckpointInterval: time.Nanosecond, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { attempts++ @@ -1588,6 +1595,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); if checks := tableDesc.AllActiveAndInactiveChecks(); len(checks) > 0 { t.Fatalf("found checks %+v", checks) } + close(blockGC) } // TestSchemaChangeReverseMutations tests that schema changes get reversed @@ -1803,7 +1811,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") // Add immediate GC TTL to allow index creation purge to complete. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { t.Fatal(err) @@ -1825,13 +1832,18 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); } // State of jobs table + t.Skip("TODO(pbardea): The following fails due to causes seemingly unrelated to GC") runner := sqlutils.SQLRunner{DB: sqlDB} + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + schemaChangeJobOffset := 4 for i, tc := range testCases { status := jobs.StatusSucceeded if tc.errString != "" { status = jobs.StatusFailed } - if err := jobutils.VerifySystemJob(t, &runner, i, jobspb.TypeSchemaChange, status, jobs.Record{ + if err := jobutils.VerifySystemJob(t, &runner, schemaChangeJobOffset+i, jobspb.TypeSchemaChange, status, jobs.Record{ Username: security.RootUser, Description: tc.sql, DescriptorIDs: sqlbase.IDs{ @@ -2059,12 +2071,13 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) { const maxValue = (expectedColumnBackfillAttempts/2+1)*chunkSize + 1 params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, // Aggressively checkpoint, so that a schema change // failure happens after a checkpoint has been written. WriteCheckpointInterval: time.Nanosecond, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { attempts++ @@ -2292,7 +2305,6 @@ func TestPrimaryKeyChangeWithPrecedingIndexCreation(t *testing.T) { wg.Wait() - t.Skip("skipping last portion of job until schema change GC job is implemented") // There should be 4 k/v pairs per row: // * the original rowid index. // * the old index on v. @@ -2347,7 +2359,6 @@ CREATE TABLE t.test (k INT NOT NULL, v INT, v2 INT NOT NULL)`); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of job until schema change GC job is implemented") // There should be 4 k/v pairs per row: // * the original rowid index. // * the new primary index on v2. @@ -2822,12 +2833,10 @@ func TestPrimaryKeyIndexRewritesGetRemoved(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() + defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = 10 * time.Millisecond + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -2844,7 +2853,7 @@ ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (v); `); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") + // Wait for the async schema changer to run. tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { @@ -3713,12 +3722,9 @@ func TestTruncateInternals(t *testing.T) { const maxValue = 2000 params, _ := tests.CreateTestServerParams() // Disable schema changes. + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - SchemaChangeJobNoOp: func() bool { - return true - }, - }, + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, } s, sqlDB, kvDB := serverutils.StartServer(t, params) @@ -3801,21 +3807,31 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL DEFAULT (DECIMAL '3.14 if !droppedDesc.Dropped() { t.Fatalf("bad state = %s", droppedDesc.State) } - // TODO (lucy): Test that the GC job is correctly queued, once it's - // implemented. This test should actually just disable GC instead of disabling - // the initial schema change. + + close(blockGC) + + sqlRun := sqlutils.MakeSQLRunner(sqlDB) + if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusRunning, jobs.Record{ + Description: "GC for TRUNCATE TABLE t.public.test", + Username: security.RootUser, + DescriptorIDs: sqlbase.IDs{tableDesc.ID}, + }); err != nil { + t.Fatal(err) + } } // Test that a table truncation completes properly. func TestTruncateCompletion(t *testing.T) { defer leaktest.AfterTest(t)() const maxValue = 2000 + + defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = 10 * time.Millisecond + + defer func(i time.Duration) { gcjob.MaxSQLGCInterval = i }(gcjob.MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = 500 * time.Millisecond + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) ctx := context.TODO() @@ -3851,7 +3867,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL REFERENCES t.pi (d) DE tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") - t.Skip("skipping last portion of test until schema change GC job is implemented") // Add a zone config. var cfg zonepb.ZoneConfig cfg, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID) @@ -3938,7 +3953,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL REFERENCES t.pi (d) DE // Ensure that the job is marked as succeeded. sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + schemaChangeJobOffset := 4 + if err := jobutils.VerifySystemJob(t, sqlRun, schemaChangeJobOffset+2, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "TRUNCATE TABLE t.public.test", DescriptorIDs: sqlbase.IDs{ @@ -5639,7 +5659,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); atomic.StoreUint32(&enableAsyncSchemaChanges, 1) // Add immediate GC TTL to allow index creation purge to complete. - // TODO (lucy): if this test were't skipped we'd need to GC quickly here if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { t.Fatal(err) }