diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 7f656d3e2e83..d66e700c7c95 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -24,6 +24,7 @@ type TestingKnobs struct { SQLExecutor ModuleTestingKnobs SQLLeaseManager ModuleTestingKnobs SQLSchemaChanger ModuleTestingKnobs + GCJob ModuleTestingKnobs PGWireTestingKnobs ModuleTestingKnobs SQLMigrationManager ModuleTestingKnobs DistSQL ModuleTestingKnobs diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 67c1ecf1c7eb..406f48fcdd83 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -1347,12 +1347,10 @@ func TestRestoreFailCleanup(t *testing.T) { defer leaktest.AfterTest(t)() params := base.TestServerArgs{} - // Disable external processing of mutations so that the final check of - // crdb_internal.tables is guaranteed to not be cleaned up. Although this - // was never observed by a stress test, it is here for safety. - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + // Disable GC job so that the final check of crdb_internal.tables is + // guaranteed to not be cleaned up. Although this was never observed by a + // stress test, it is here for safety. + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} /* blocks forever */ }} const numAccounts = 1000 _, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, @@ -1395,9 +1393,8 @@ func TestRestoreFailDatabaseCleanup(t *testing.T) { // Disable external processing of mutations so that the final check of // crdb_internal.tables is guaranteed to not be cleaned up. Although this // was never observed by a stress test, it is here for safety. - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + blockGC := make(chan struct{}) + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }} const numAccounts = 1000 _, _, sqlDB, dir, cleanup := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, @@ -1428,6 +1425,7 @@ func TestRestoreFailDatabaseCleanup(t *testing.T) { t, `database "data" does not exist`, `DROP DATABASE data`, ) + close(blockGC) } func TestBackupRestoreInterleaved(t *testing.T) { diff --git a/pkg/ccl/partitionccl/drop_test.go b/pkg/ccl/partitionccl/drop_test.go index 6b7e0f676d91..ae9d4defc92a 100644 --- a/pkg/ccl/partitionccl/drop_test.go +++ b/pkg/ccl/partitionccl/drop_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -40,15 +41,16 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) { const numRows = 100 + defer gcjob.SetSmallMaxGCIntervalForTest()() + asyncNotification := make(chan struct{}) params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Currently there's no index GC job implemented. Eventually - // the GC job needs to block until the asyncNotification channel is - // closed, which will probably need to be controlled in a schema change - // knob. + GCJob: &sql.GCJobTestingKnobs{ + RunBeforeResume: func() { + <-asyncNotification + }, }, } s, sqlDBRaw, kvDB := serverutils.StartServer(t, params) @@ -116,7 +118,6 @@ func TestDropIndexWithZoneConfigCCL(t *testing.T) { } close(asyncNotification) - t.Skip("skipping last portion of test until schema change GC job is implemented") // Wait for index drop to complete so zone configs are updated. testutils.SucceedsSoon(t, func() error { if kvs, err := kvDB.Scan(context.TODO(), indexSpan.Key, indexSpan.EndKey, 0); err != nil { diff --git a/pkg/server/server.go b/pkg/server/server.go index 1f2c1400853f..b70095d7bce1 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + _ "github.com/cockroachdb/cockroach/pkg/sql/gcjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -821,6 +822,11 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { } else { execCfg.SchemaChangerTestingKnobs = new(sql.SchemaChangerTestingKnobs) } + if gcJobTestingKnobs := s.cfg.TestingKnobs.GCJob; gcJobTestingKnobs != nil { + execCfg.GCJobTestingKnobs = gcJobTestingKnobs.(*sql.GCJobTestingKnobs) + } else { + execCfg.GCJobTestingKnobs = new(sql.GCJobTestingKnobs) + } if distSQLRunTestingKnobs := s.cfg.TestingKnobs.DistSQL; distSQLRunTestingKnobs != nil { execCfg.DistSQLRunTestingKnobs = distSQLRunTestingKnobs.(*execinfra.TestingKnobs) } else { diff --git a/pkg/sql/as_of_test.go b/pkg/sql/as_of_test.go index cbaf2ca9c56a..11ee964137a0 100644 --- a/pkg/sql/as_of_test.go +++ b/pkg/sql/as_of_test.go @@ -33,9 +33,7 @@ func TestAsOfTime(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs.SQLSchemaChanger = &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - } + params.Knobs.GCJob = &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }} s, db, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index 7eaa5a8f1ca4..4194df0f792c 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -15,7 +15,6 @@ import ( gosql "database/sql" "fmt" "math/rand" - "sync/atomic" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -29,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/sqlmigrations" @@ -123,11 +123,6 @@ func addDefaultZoneConfig(sqlDB *gosql.DB, id sqlbase.ID) (zonepb.ZoneConfig, er func TestDropDatabase(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -297,12 +292,10 @@ CREATE DATABASE t; // Test that a dropped database's data gets deleted properly. func TestDropDatabaseDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() + + defer gcjob.SetSmallMaxGCIntervalForTest()() + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -392,8 +385,9 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. // Maybe this test API should use an offset starting from the most recent job // instead. + const migrationJobOffset = 4 sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 4, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "DROP DATABASE t CASCADE", DescriptorIDs: sqlbase.IDs{ @@ -403,7 +397,6 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") // Push a new zone config for the table with TTL=0 so the data is // deleted immediately. if _, err := addImmediateGCZoneConfig(sqlDB, tbDesc.ID); err != nil { @@ -427,15 +420,15 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); t.Fatal(err) } - if err := jobutils.VerifyRunningSystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, sql.RunningStatusWaitingGC, jobs.Record{ - Username: security.RootUser, - Description: "DROP DATABASE t CASCADE", - DescriptorIDs: sqlbase.IDs{ - tbDesc.ID, tb2Desc.ID, - }, - }); err != nil { - t.Fatal(err) - } + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusRunning, jobs.Record{ + Username: security.RootUser, + Description: "GC for DROP DATABASE t CASCADE", + DescriptorIDs: sqlbase.IDs{ + tbDesc.ID, tb2Desc.ID, + }, + }) + }) if _, err := addImmediateGCZoneConfig(sqlDB, tb2Desc.ID); err != nil { t.Fatal(err) @@ -455,7 +448,7 @@ INSERT INTO t.kv2 VALUES ('c', 'd'), ('a', 'b'), ('e', 'a'); // Table 2 data is deleted. tests.CheckKeyCount(t, kvDB, table2Span, 0) - if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "DROP DATABASE t CASCADE", DescriptorIDs: sqlbase.IDs{ @@ -525,7 +518,6 @@ func TestDropIndex(t *testing.T) { params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ BackfillChunkSize: chunkSize, - // TODO (lucy): Un-skip this test when the GC job is implemented. }, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { @@ -570,8 +562,9 @@ func TestDropIndex(t *testing.T) { // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. // Maybe this test API should use an offset starting from the most recent job // instead. + const migrationJobOffset = 4 sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 5, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: `DROP INDEX t.public.kv@foo`, DescriptorIDs: sqlbase.IDs{ @@ -594,7 +587,6 @@ func TestDropIndex(t *testing.T) { tests.CheckKeyCount(t, kvDB, newIdxSpan, numRows) tests.CheckKeyCount(t, kvDB, tableDesc.TableSpan(), 4*numRows) - t.Skip("skipping last portion of test until schema change GC job is implemented") clearIndexAttempt = true // Add a zone config for the table. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { @@ -602,7 +594,7 @@ func TestDropIndex(t *testing.T) { } testutils.SucceedsSoon(t, func() error { - return jobutils.VerifySystemJob(t, sqlRun, 1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + return jobutils.VerifySystemJob(t, sqlRun, migrationJobOffset+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: `DROP INDEX t.public.kv@foo`, DescriptorIDs: sqlbase.IDs{ @@ -611,6 +603,16 @@ func TestDropIndex(t *testing.T) { }) }) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + Username: security.RootUser, + Description: `GC for DROP INDEX t.public.kv@foo`, + DescriptorIDs: sqlbase.IDs{ + tableDesc.ID, + }, + }) + }) + if !emptySpan { t.Fatalf("tried to clear index with non-empty resume span") } @@ -689,7 +691,6 @@ func TestDropIndexInterleaved(t *testing.T) { params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. BackfillChunkSize: chunkSize, }, } @@ -707,7 +708,6 @@ func TestDropIndexInterleaved(t *testing.T) { if _, err := sqlDB.Exec(`DROP INDEX t.intlv@intlv_idx`); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") tests.CheckKeyCount(t, kvDB, tableSpan, 2*numRows) // Ensure that index is not active. @@ -808,11 +808,9 @@ func TestDropTable(t *testing.T) { func TestDropTableDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } + + defer gcjob.SetSmallMaxGCIntervalForTest()() + s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) ctx := context.TODO() @@ -850,6 +848,11 @@ func TestDropTableDeleteData(t *testing.T) { } } + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + const migrationJobOffset = 4 + // Data hasn't been GC-ed. sqlRun := sqlutils.MakeSQLRunner(sqlDB) for i := 0; i < numTables; i++ { @@ -859,10 +862,7 @@ func TestDropTableDeleteData(t *testing.T) { tableSpan := descs[i].TableSpan() tests.CheckKeyCount(t, kvDB, tableSpan, numKeys) - // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. - // Maybe this test API should use an offset starting from the most recent job - // instead. - if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+4, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+migrationJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()), DescriptorIDs: sqlbase.IDs{ @@ -873,7 +873,6 @@ func TestDropTableDeleteData(t *testing.T) { } } - t.Skip("skipping last portion of test until schema change GC job is implemented") // The closure pushes a zone config reducing the TTL to 0 for descriptor i. pushZoneCfg := func(i int) { if _, err := addImmediateGCZoneConfig(sqlDB, descs[i].ID); err != nil { @@ -893,7 +892,7 @@ func TestDropTableDeleteData(t *testing.T) { tests.CheckKeyCount(t, kvDB, tableSpan, 0) // Ensure that the job is marked as succeeded. - if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + if err := jobutils.VerifySystemJob(t, sqlRun, 2*i+1+migrationJobOffset, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: fmt.Sprintf(`DROP TABLE t.public.%s`, descs[i].GetName()), DescriptorIDs: sqlbase.IDs{ @@ -902,6 +901,17 @@ func TestDropTableDeleteData(t *testing.T) { }); err != nil { t.Fatal(err) } + + // Ensure that the job is marked as succeeded. + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob(t, sqlRun, i, jobspb.TypeSchemaChangeGC, jobs.StatusSucceeded, jobs.Record{ + Username: security.RootUser, + Description: fmt.Sprintf(`GC for DROP TABLE t.public.%s`, descs[i].GetName()), + DescriptorIDs: sqlbase.IDs{ + descs[i].ID, + }, + }) + }) } // Push a new zone config for a few tables with TTL=0 so the data @@ -951,16 +961,10 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - blockSchemaChanges := make(chan struct{}) + defer gcjob.SetSmallMaxGCIntervalForTest()() + params, _ := tests.CreateTestServerParams() params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - SchemaChangeJobNoOp: func() bool { - return true - }, - // TODO (lucy): Un-skip this test when the GC job is implemented, and set - // the knob to block until we're ready to GC - }, SQLMigrationManager: &sqlmigrations.MigrationManagerTestingKnobs{ DisableBackfillMigrations: true, }, @@ -970,12 +974,13 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(sqlDBRaw) + // Disable strict GC TTL enforcement because we're going to shove a zero-value + // TTL into the system with addImmediateGCZoneConfig. + defer disableGCTTLStrictEnforcement(t, sqlDBRaw)() + const numRows = 100 sqlutils.CreateTable(t, sqlDBRaw, "t", "a INT", numRows, sqlutils.ToRowFn(sqlutils.RowIdxFn)) - // Set TTL so the data is deleted immediately. - sqlDB.Exec(t, `ALTER TABLE test.t CONFIGURE ZONE USING gc.ttlseconds = 1`) - // Give the table an old format version. tableDesc := sqlbase.GetTableDescriptor(kvDB, "test", "t") tableDesc.FormatVersion = sqlbase.FamilyFormatVersion @@ -991,7 +996,11 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { // Simulate a migration upgrading the table descriptor's format version after // the table has been dropped but before the truncation has occurred. - tableDesc = sqlbase.GetTableDescriptor(kvDB, "test", "t") + var err error + tableDesc, err = sqlbase.GetTableDescFromID(ctx, kvDB.NewTxn(ctx, ""), tableDesc.ID) + if err != nil { + t.Fatal(err) + } if !tableDesc.Dropped() { t.Fatalf("expected descriptor to be in DROP state, but was in %s", tableDesc.State) } @@ -1001,10 +1010,13 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") + // Set TTL so the data is deleted immediately. + if _, err := addImmediateGCZoneConfig(sqlDBRaw, tableDesc.ID); err != nil { + t.Fatal(err) + } + // Allow the schema change to proceed and verify that the data is eventually // deleted, despite the interleaved modification to the table descriptor. - close(blockSchemaChanges) testutils.SucceedsSoon(t, func() error { return descExists(sqlDBRaw, false, tableDesc.ID) }) @@ -1016,13 +1028,9 @@ func TestDropTableWhileUpgradingFormat(t *testing.T) { func TestDropTableInterleavedDeleteData(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - var enableAsync uint32 - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented, and set - // the knob to block until we're ready to GC - }, - } + + defer gcjob.SetSmallMaxGCIntervalForTest()() + s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -1046,8 +1054,9 @@ func TestDropTableInterleavedDeleteData(t *testing.T) { t.Fatalf("different error than expected: %v", err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") - atomic.StoreUint32(&enableAsync, 1) + if _, err := addImmediateGCZoneConfig(sqlDB, tableDescInterleaved.ID); err != nil { + t.Fatal(err) + } testutils.SucceedsSoon(t, func() error { return descExists(sqlDB, false, tableDescInterleaved.ID) diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 111815cfa577..4344d55e1cfe 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -572,6 +572,7 @@ type ExecutorConfig struct { TestingKnobs ExecutorTestingKnobs PGWireTestingKnobs *PGWireTestingKnobs SchemaChangerTestingKnobs *SchemaChangerTestingKnobs + GCJobTestingKnobs *GCJobTestingKnobs DistSQLRunTestingKnobs *execinfra.TestingKnobs EvalContextTestingKnobs tree.EvalContextTestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. diff --git a/pkg/sql/gcjob/descriptor_utils.go b/pkg/sql/gcjob/descriptor_utils.go index c5335cb5ec7e..3987c6e067b8 100644 --- a/pkg/sql/gcjob/descriptor_utils.go +++ b/pkg/sql/gcjob/descriptor_utils.go @@ -59,14 +59,14 @@ func dropTableDesc(ctx context.Context, db *kv.DB, tableDesc *sqlbase.TableDescr zoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(tableDesc.ID)) return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetSystemConfigTrigger(); err != nil { + return err + } b := &kv.Batch{} // Delete the descriptor. b.Del(descKey) // Delete the zone config entry for this table. b.DelRange(zoneKeyPrefix, zoneKeyPrefix.PrefixEnd(), false /* returnKeys */) - if err := txn.SetSystemConfigTrigger(); err != nil { - return err - } return txn.Run(ctx, b) }) @@ -76,6 +76,9 @@ func dropTableDesc(ctx context.Context, db *kv.DB, tableDesc *sqlbase.TableDescr func deleteDatabaseZoneConfig(ctx context.Context, db *kv.DB, databaseID sqlbase.ID) error { return db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { b := &kv.Batch{} + if err := txn.SetSystemConfigTrigger(); err != nil { + return err + } // Delete the zone config entry for the dropped database associated with the // job, if it exists. diff --git a/pkg/sql/gcjob/gc_job.go b/pkg/sql/gcjob/gc_job.go index f69c27b93f4f..aa17e6ae018c 100644 --- a/pkg/sql/gcjob/gc_job.go +++ b/pkg/sql/gcjob/gc_job.go @@ -31,36 +31,52 @@ var ( MaxSQLGCInterval = 5 * time.Minute ) +// SetSmallMaxGCIntervalForTest sets the MaxSQLGCInterval and then returns a closure +// that resets it. +// This is to be used in tests like: +// defer SetSmallMaxGCIntervalForTest() +func SetSmallMaxGCIntervalForTest() func() { + oldInterval := MaxSQLGCInterval + MaxSQLGCInterval = 500 * time.Millisecond + return func() { + MaxSQLGCInterval = oldInterval + } +} + type schemaChangeGCResumer struct { jobID int64 } +// performGC GCs any schema elements that are in the DELETING state and returns +// a bool indicating if it GC'd any elements. func performGC( ctx context.Context, execCfg *sql.ExecutorConfig, - jobID int64, details *jobspb.SchemaChangeGCDetails, progress *jobspb.SchemaChangeGCProgress, -) error { +) (bool, error) { + didGC := false if details.Indexes != nil { - if err := gcIndexes(ctx, execCfg, details.ParentID, progress); err != nil { - return errors.Wrap(err, "attempting to GC indexes") + if didGCIndex, err := gcIndexes(ctx, execCfg, details.ParentID, progress); err != nil { + return false, errors.Wrap(err, "attempting to GC indexes") + } else if didGCIndex { + didGC = true } } else if details.Tables != nil { - if err := gcTables(ctx, execCfg, progress); err != nil { - return errors.Wrap(err, "attempting to GC tables") + if didGCTable, err := gcTables(ctx, execCfg, progress); err != nil { + return false, errors.Wrap(err, "attempting to GC tables") + } else if didGCTable { + didGC = true } // Drop database zone config when all the tables have been GCed. if details.ParentID != sqlbase.InvalidID && isDoneGC(progress) { if err := deleteDatabaseZoneConfig(ctx, execCfg.DB, details.ParentID); err != nil { - return errors.Wrap(err, "deleting database zone config") + return false, errors.Wrap(err, "deleting database zone config") } } } - - persistProgress(ctx, execCfg, jobID, progress) - return nil + return didGC, nil } // Resume is part of the jobs.Resumer interface. @@ -70,6 +86,9 @@ func (r schemaChangeGCResumer) Resume( p := phs.(sql.PlanHookState) // TODO(pbardea): Wait for no versions. execCfg := p.ExecCfg() + if fn := execCfg.GCJobTestingKnobs.RunBeforeResume; fn != nil { + fn() + } details, progress, err := initDetailsAndProgress(ctx, execCfg, r.jobID) if err != nil { return err @@ -79,7 +98,7 @@ func (r schemaChangeGCResumer) Resume( allTables := getAllTablesWaitingForGC(details, progress) expired, earliestDeadline := refreshTables(ctx, execCfg, allTables, tableDropTimes, indexDropTimes, r.jobID, progress) - timerDuration := time.Until(earliestDeadline) + timerDuration := timeutil.Until(earliestDeadline) if expired { timerDuration = 0 } else if timerDuration > MaxSQLGCInterval { @@ -118,9 +137,12 @@ func (r schemaChangeGCResumer) Resume( remainingTables := getAllTablesWaitingForGC(details, progress) _, earliestDeadline = refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress) - if err := performGC(ctx, execCfg, r.jobID, details, progress); err != nil { + if didWork, err := performGC(ctx, execCfg, details, progress); err != nil { return err + } else if didWork { + persistProgress(ctx, execCfg, r.jobID, progress) } + if isDoneGC(progress) { return nil } diff --git a/pkg/sql/gcjob/gc_job_utils.go b/pkg/sql/gcjob/gc_job_utils.go index c85be92b175d..18b853b2b5a1 100644 --- a/pkg/sql/gcjob/gc_job_utils.go +++ b/pkg/sql/gcjob/gc_job_utils.go @@ -176,7 +176,7 @@ func persistProgress( if err := job.SetProgress(ctx, *progress); err != nil { return err } - log.Infof(ctx, "updated progress payload %+v", progress) + log.Infof(ctx, "updated progress payload: %+v", progress) return nil }); err != nil { log.Warningf(ctx, "failed to update job's progress payload err: %+v", err) diff --git a/pkg/sql/gcjob/index_garbage_collection.go b/pkg/sql/gcjob/index_garbage_collection.go index 1add46992227..33565eaaa22b 100644 --- a/pkg/sql/gcjob/index_garbage_collection.go +++ b/pkg/sql/gcjob/index_garbage_collection.go @@ -30,7 +30,8 @@ func gcIndexes( execCfg *sql.ExecutorConfig, parentID sqlbase.ID, progress *jobspb.SchemaChangeGCProgress, -) error { +) (bool, error) { + didGC := false droppedIndexes := progress.Indexes if log.V(2) { log.Infof(ctx, "GC is being considered on table %d for indexes indexes: %+v", parentID, droppedIndexes) @@ -42,7 +43,7 @@ func gcIndexes( parentTable, err = sqlbase.GetTableDescFromID(ctx, txn, parentID) return err }); err != nil { - return errors.Wrapf(err, "fetching parent table %d", parentID) + return false, errors.Wrapf(err, "fetching parent table %d", parentID) } for _, index := range droppedIndexes { @@ -52,7 +53,7 @@ func gcIndexes( indexDesc := sqlbase.IndexDescriptor{ID: index.IndexID} if err := clearIndex(ctx, execCfg.DB, parentTable, indexDesc); err != nil { - return errors.Wrapf(err, "clearing index %d", indexDesc.ID) + return false, errors.Wrapf(err, "clearing index %d", indexDesc.ID) } // All the data chunks have been removed. Now also removed the @@ -60,15 +61,17 @@ func gcIndexes( if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return sql.RemoveIndexZoneConfigs(ctx, txn, execCfg, parentTable.GetID(), []sqlbase.IndexDescriptor{indexDesc}) }); err != nil { - return errors.Wrapf(err, "removing index %d zone configs", indexDesc.ID) + return false, errors.Wrapf(err, "removing index %d zone configs", indexDesc.ID) } if err := completeDroppedIndex(ctx, execCfg, parentTable, index.IndexID, progress); err != nil { - return err + return false, err } + + didGC = true } - return nil + return didGC, nil } // clearIndexes issues Clear Range requests over all specified indexes. diff --git a/pkg/sql/gcjob/refresh_statuses.go b/pkg/sql/gcjob/refresh_statuses.go index 35d50542ce5d..8dcdf2234d10 100644 --- a/pkg/sql/gcjob/refresh_statuses.go +++ b/pkg/sql/gcjob/refresh_statuses.go @@ -143,7 +143,6 @@ func updateTableStatus( progress *jobspb.SchemaChangeGCProgress, ) time.Time { deadline := timeutil.Unix(0, int64(math.MaxInt64)) - lifetime := ttlSeconds * time.Second.Nanoseconds() sp := table.TableSpan() for i, t := range progress.Tables { @@ -152,7 +151,7 @@ func updateTableStatus( continue } - deadlineNanos := tableDropTimes[t.ID] + lifetime + deadlineNanos := tableDropTimes[t.ID] + ttlSeconds*time.Second.Nanoseconds() deadline = timeutil.Unix(0, deadlineNanos) if isProtected(ctx, protectedtsCache, tableDropTimes[t.ID], sp) { log.Infof(ctx, "a timestamp protection delayed GC of table %d", t.ID) diff --git a/pkg/sql/gcjob/table_garbage_collection.go b/pkg/sql/gcjob/table_garbage_collection.go index f77e0a6df662..ae3f6654ce08 100644 --- a/pkg/sql/gcjob/table_garbage_collection.go +++ b/pkg/sql/gcjob/table_garbage_collection.go @@ -31,7 +31,8 @@ import ( // The job progress is updated in place, but needs to be persisted to the job. func gcTables( ctx context.Context, execCfg *sql.ExecutorConfig, progress *jobspb.SchemaChangeGCProgress, -) error { +) (bool, error) { + didGC := false if log.V(2) { log.Infof(ctx, "GC is being considered for tables: %+v", progress.Tables) } @@ -47,7 +48,7 @@ func gcTables( table, err = sqlbase.GetTableDescFromID(ctx, txn, droppedTable.ID) return err }); err != nil { - return errors.Wrapf(err, "fetching table %d", droppedTable.ID) + return false, errors.Wrapf(err, "fetching table %d", droppedTable.ID) } if !table.Dropped() { @@ -57,18 +58,19 @@ func gcTables( // First, delete all the table data. if err := clearTableData(ctx, execCfg.DB, execCfg.DistSender, table); err != nil { - return errors.Wrapf(err, "clearing data for table %d", table.ID) + return false, errors.Wrapf(err, "clearing data for table %d", table.ID) } // Finished deleting all the table data, now delete the table meta data. if err := dropTableDesc(ctx, execCfg.DB, table); err != nil { - return errors.Wrapf(err, "dropping table descriptor for table %d", table.ID) + return false, errors.Wrapf(err, "dropping table descriptor for table %d", table.ID) } // Update the details payload to indicate that the table was dropped. markTableGCed(ctx, table.ID, progress) + didGC = true } - return nil + return didGC, nil } // clearTableData deletes all of the data in the specified table. @@ -137,7 +139,7 @@ func clearTableData( }) log.VEventf(ctx, 2, "ClearRange %s - %s", lastKey, endKey) if err := db.Run(ctx, &b); err != nil { - return err + return errors.Wrapf(err, "clear range %s - %s", lastKey, endKey) } n = 0 lastKey = endKey diff --git a/pkg/sql/gcjob/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go similarity index 97% rename from pkg/sql/gcjob/gc_job_test.go rename to pkg/sql/gcjob_test/gc_job_test.go index e5e8acee8619..d6e44eb69887 100644 --- a/pkg/sql/gcjob/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package gcjob +package gcjob_test import ( "context" @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" @@ -38,10 +39,10 @@ func TestSchemaChangeGCJob(t *testing.T) { defer func(oldAdoptInterval, oldGCInterval time.Duration) { jobs.DefaultAdoptInterval = oldAdoptInterval - MaxSQLGCInterval = oldGCInterval - }(jobs.DefaultAdoptInterval, MaxSQLGCInterval) + gcjob.MaxSQLGCInterval = oldGCInterval + }(jobs.DefaultAdoptInterval, gcjob.MaxSQLGCInterval) jobs.DefaultAdoptInterval = 100 * time.Millisecond - MaxSQLGCInterval = 100 * time.Millisecond + gcjob.MaxSQLGCInterval = 500 * time.Millisecond type DropItem int const ( diff --git a/pkg/sql/gcjob/main_test.go b/pkg/sql/gcjob_test/main_test.go similarity index 97% rename from pkg/sql/gcjob/main_test.go rename to pkg/sql/gcjob_test/main_test.go index f94d78246416..9f986e156bb4 100644 --- a/pkg/sql/gcjob/main_test.go +++ b/pkg/sql/gcjob_test/main_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package gcjob +package gcjob_test import ( "os" diff --git a/pkg/sql/lease_test.go b/pkg/sql/lease_test.go index 1ee321eaa187..4ea8483c2ca3 100644 --- a/pkg/sql/lease_test.go +++ b/pkg/sql/lease_test.go @@ -526,8 +526,9 @@ func TestCantLeaseDeletedTable(testingT *testing.T) { defer mu.Unlock() return clearSchemaChangers }, - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, } t := newLeaseTest(testingT, params) @@ -616,8 +617,9 @@ func TestLeasesOnDeletedTableAreReleasedImmediately(t *testing.T) { defer mu.Unlock() return clearSchemaChangers }, - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, } s, db, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -908,11 +910,6 @@ CREATE TABLE t.foo (v INT); func TestTxnObeysTableModificationTime(t *testing.T) { defer leaktest.AfterTest(t)() params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(context.TODO()) @@ -1090,7 +1087,6 @@ INSERT INTO t.kv VALUES ('a', 'b'); tableSpan := tableDesc.TableSpan() tests.CheckKeyCount(t, kvDB, tableSpan, 4) - t.Skip("skipping last portion of test until schema change GC job is implemented") // Allow async schema change waiting for GC to complete (when dropping an // index) and clear the index keys. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { diff --git a/pkg/sql/logictest/testdata/logic_test/alter_table b/pkg/sql/logictest/testdata/logic_test/alter_table index 28f98e76334f..445463d4cf9a 100644 --- a/pkg/sql/logictest/testdata/logic_test/alter_table +++ b/pkg/sql/logictest/testdata/logic_test/alter_table @@ -67,15 +67,15 @@ statement error pgcode 23505 violates unique constraint "bar" ALTER TABLE t ADD CONSTRAINT bar UNIQUE (c) # Test that rollback was successful -# TODO (lucy): Update once we have a separate GC job query TTTTTR SELECT job_type, regexp_replace(description, 'JOB \d+', 'JOB ...'), user_name, status, running_status, fraction_completed::decimal(10,2) FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root failed NULL 0.00 +SCHEMA CHANGE GC GC for ROLLBACK of ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root running NULL 0.00 +SCHEMA CHANGE ALTER TABLE test.public.t ADD CONSTRAINT bar UNIQUE (c) root failed NULL 0.00 query IIII colnames,rowsort SELECT * FROM t @@ -188,15 +188,15 @@ ALTER TABLE t DROP CONSTRAINT foo statement ok DROP INDEX foo CASCADE -# TODO (lucy): Update once we have a separate GC job query TTTTTRT SELECT job_type, description, user_name, status, running_status, fraction_completed, error FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE DROP INDEX test.public.t@foo CASCADE root succeeded NULL 1 · +SCHEMA CHANGE GC GC for DROP INDEX test.public.t@foo CASCADE root running NULL 0 · +SCHEMA CHANGE DROP INDEX test.public.t@foo CASCADE root succeeded NULL 1 · query TTBITTBB colnames SHOW INDEXES FROM t @@ -273,15 +273,15 @@ INSERT INTO t (a, d, x, y, z) VALUES (33, 34, DECIMAL '2.0', DECIMAL '2.1', DECI statement ok DROP INDEX t@t_f_idx -# TODO (lucy): Update once we have a separate GC job query TTTTTRT SELECT job_type, description, user_name, status, running_status, fraction_completed, error FROM crdb_internal.jobs -WHERE job_type = 'SCHEMA CHANGE' +WHERE job_type = 'SCHEMA CHANGE' OR job_type = 'SCHEMA CHANGE GC' ORDER BY created DESC -LIMIT 1 +LIMIT 2 ---- -SCHEMA CHANGE DROP INDEX test.public.t@t_f_idx root succeeded NULL 1 · +SCHEMA CHANGE GC GC for DROP INDEX test.public.t@t_f_idx root running NULL 0 · +SCHEMA CHANGE DROP INDEX test.public.t@t_f_idx root succeeded NULL 1 · statement ok ALTER TABLE t DROP COLUMN f diff --git a/pkg/sql/logictest/testdata/logic_test/drop_database b/pkg/sql/logictest/testdata/logic_test/drop_database index 10b004e28e56..d42237b0ddc3 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_database +++ b/pkg/sql/logictest/testdata/logic_test/drop_database @@ -34,15 +34,15 @@ postgres system test -# TODO (lucy): Update this once we have GC jobs. # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE description != 'updating privileges' +SELECT job_type, status FROM [SHOW JOBS] WHERE description != 'updating privileges' ---- -succeeded NULL +SCHEMA CHANGE succeeded +SCHEMA CHANGE GC running statement ok CREATE DATABASE "foo bar" diff --git a/pkg/sql/logictest/testdata/logic_test/drop_table b/pkg/sql/logictest/testdata/logic_test/drop_table index 17d7a8fdd2e6..ffaa39b64d26 100644 --- a/pkg/sql/logictest/testdata/logic_test/drop_table +++ b/pkg/sql/logictest/testdata/logic_test/drop_table @@ -25,15 +25,15 @@ SELECT * FROM a statement ok DROP TABLE a -# TODO (lucy): Update this once we have GC jobs. # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +SELECT job_type, status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' OR (job_type = 'SCHEMA CHANGE' AND description != 'updating privileges') ---- -succeeded NULL +SCHEMA CHANGE succeeded +SCHEMA CHANGE GC running query T SHOW TABLES FROM test diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn index 478da67c0c80..57e8115409e0 100644 --- a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn @@ -771,14 +771,19 @@ k CHAR false NULL · {primary} false query error pq: index "j_idx" not found SELECT * FROM customers@j_idx -# TODO (lucy): Update once we have a separate GC job -query TTT +query TT SELECT status, - running_status, regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' ORDER BY job_id DESC LIMIT 1 ---- -failed NULL ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) +failed ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) + +query TT +SELECT status, + regexp_replace(description, 'ROLL BACK JOB \d+.*', 'ROLL BACK JOB') as description + FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' ORDER BY job_id DESC LIMIT 1 +---- +running GC for ROLLBACK of ALTER TABLE test.public.customers ADD COLUMN i INT8 DEFAULT 5;ALTER TABLE test.public.customers ADD COLUMN j INT8 DEFAULT 4;ALTER TABLE test.public.customers ADD COLUMN l INT8 DEFAULT 3;ALTER TABLE test.public.customers ADD COLUMN m CHAR;ALTER TABLE test.public.customers ADD COLUMN n CHAR DEFAULT 'a';CREATE INDEX j_idx ON test.public.customers (j);CREATE INDEX l_idx ON test.public.customers (l);CREATE INDEX m_idx ON test.public.customers (m);CREATE UNIQUE INDEX i_idx ON test.public.customers (i);CREATE UNIQUE INDEX n_idx ON test.public.customers (n) subtest add_multiple_computed_elements diff --git a/pkg/sql/logictest/testdata/logic_test/truncate b/pkg/sql/logictest/testdata/logic_test/truncate index f926f598d1a4..23e9ce456a0c 100644 --- a/pkg/sql/logictest/testdata/logic_test/truncate +++ b/pkg/sql/logictest/testdata/logic_test/truncate @@ -48,18 +48,22 @@ query II SELECT * FROM kview ---- -# TODO (lucy): update with GC job # The "updating privileges" clause in the SELECT statement is for excluding jobs # run by an unrelated startup migration. # TODO (lucy): Update this if/when we decide to change how these jobs queued by # the startup migration are handled. -query TT -SELECT status, running_status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +query T +SELECT status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND description != 'updating privileges' +---- +succeeded +succeeded +succeeded +succeeded + +query T +SELECT status FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' ---- -succeeded NULL -succeeded NULL -succeeded NULL -succeeded NULL +running # Ensure that TRUNCATE works with a self referential FK. statement ok diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 5197cfe1aa17..8a74e4089776 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -17,10 +17,8 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -46,15 +44,9 @@ import ( ) const ( - // RunningStatusDrainingNames is for jobs that are currently in progress and - // are draining names. - RunningStatusDrainingNames jobs.RunningStatus = "draining names" // RunningStatusWaitingGC is for jobs that are currently in progress and // are waiting for the GC interval to expire RunningStatusWaitingGC jobs.RunningStatus = "waiting for GC TTL" - // RunningStatusCompaction is for jobs that are currently in progress and - // undergoing RocksDB compaction - RunningStatusCompaction jobs.RunningStatus = "RocksDB compaction" // RunningStatusDeleteOnly is for jobs that are currently waiting on // the cluster to converge to seeing the schema element in the DELETE_ONLY // state. @@ -71,28 +63,14 @@ const ( RunningStatusValidation jobs.RunningStatus = "validating schema" ) -// TODO (lucy): After refactoring MaybeGCMutations to be in its own job, this -// will probably go away. For now, preserve it the way it is even though nothing -// is using it. -type droppedIndex struct { - indexID sqlbase.IndexID - //lint:ignore U1001 see above comment - dropTime int64 - deadline int64 -} - // SchemaChanger is used to change the schema on a table. type SchemaChanger struct { - tableID sqlbase.ID - mutationID sqlbase.MutationID - nodeID roachpb.NodeID - db *kv.DB - leaseMgr *LeaseManager - - // TODO (lucy): Replace this with job state once we have a GC job. This is - // only still here because the (currently unused) MaybeGCMutations depends on - // it. - dropIndexTimes []droppedIndex + tableID sqlbase.ID + mutationID sqlbase.MutationID + droppedDatabaseID sqlbase.ID + nodeID roachpb.NodeID + db *kv.DB + leaseMgr *LeaseManager testingKnobs *SchemaChangerTestingKnobs distSQLPlanner *DistSQLPlanner @@ -212,171 +190,6 @@ func (e errTableVersionMismatch) Error() string { return fmt.Sprintf("table version mismatch: %d, expected: %d", e.version, e.expected) } -// DropTableDesc removes a descriptor from the KV database. -func (sc *SchemaChanger) DropTableDesc( - ctx context.Context, tableDesc *sqlbase.TableDescriptor, traceKV bool, -) error { - descKey := sqlbase.MakeDescMetadataKey(tableDesc.ID) - zoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(tableDesc.ID)) - - // Finished deleting all the table data, now delete the table meta data. - return sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // Delete table descriptor - b := &kv.Batch{} - if traceKV { - log.VEventf(ctx, 2, "Del %s", descKey) - log.VEventf(ctx, 2, "DelRange %s", zoneKeyPrefix) - } - // Delete the descriptor. - b.Del(descKey) - // Delete the zone config entry for this table. - b.DelRange(zoneKeyPrefix, zoneKeyPrefix.PrefixEnd(), false /* returnKeys */) - if err := txn.SetSystemConfigTrigger(); err != nil { - return err - } - - if tableDesc.GetDropJobID() != 0 { - if err := sc.updateDropTableJob( - ctx, - txn, - tableDesc.GetDropJobID(), - tableDesc.ID, - jobspb.Status_DONE, - func(ctx context.Context, txn *kv.Txn, job *jobs.Job) error { - // Delete the zone config entry for the dropped database associated - // with the job, if it exists. - details := job.Details().(jobspb.SchemaChangeDetails) - if details.DroppedDatabaseID == sqlbase.InvalidID { - return nil - } - dbZoneKeyPrefix := config.MakeZoneKeyPrefix(uint32(details.DroppedDatabaseID)) - if traceKV { - log.VEventf(ctx, 2, "DelRange %s", zoneKeyPrefix) - } - b.DelRange(dbZoneKeyPrefix, dbZoneKeyPrefix.PrefixEnd(), false /* returnKeys */) - return nil - }); err != nil { - log.Warningf(ctx, "failed to update job status: %+v", err) - } - } - return txn.Run(ctx, b) - }) -} - -// truncateTable deletes all of the data in the specified table. -func (sc *SchemaChanger) truncateTable(ctx context.Context, table *sqlbase.TableDescriptor) error { - // If DropTime isn't set, assume this drop request is from a version - // 1.1 server and invoke legacy code that uses DeleteRange and range GC. - if table.DropTime == 0 { - return ClearTableDataInChunks(ctx, table, sc.db, false /* traceKV */) - } - - tableKey := roachpb.RKey(keys.MakeTablePrefix(uint32(table.ID))) - tableSpan := roachpb.RSpan{Key: tableKey, EndKey: tableKey.PrefixEnd()} - - // ClearRange requests lays down RocksDB range deletion tombstones that have - // serious performance implications (#24029). The logic below attempts to - // bound the number of tombstones in one store by sending the ClearRange - // requests to each range in the table in small, sequential batches rather - // than letting DistSender send them all in parallel, to hopefully give the - // compaction queue time to compact the range tombstones away in between - // requests. - // - // As written, this approach has several deficiencies. It does not actually - // wait for the compaction queue to compact the tombstones away before - // sending the next request. It is likely insufficient if multiple DROP - // TABLEs are in flight at once. It does not save its progress in case the - // coordinator goes down. These deficiences could be addressed, but this code - // was originally a stopgap to avoid the range tombstone performance hit. The - // RocksDB range tombstone implementation has since been improved and the - // performance implications of many range tombstones has been reduced - // dramatically making this simplistic throttling sufficient. - - // These numbers were chosen empirically for the clearrange roachtest and - // could certainly use more tuning. - const batchSize = 100 - const waitTime = 500 * time.Millisecond - - var n int - lastKey := tableSpan.Key - ri := kvcoord.NewRangeIterator(sc.execCfg.DistSender) - for ri.Seek(ctx, tableSpan.Key, kvcoord.Ascending); ; ri.Next(ctx) { - if !ri.Valid() { - return ri.Error().GoError() - } - - if n++; n >= batchSize || !ri.NeedAnother(tableSpan) { - endKey := ri.Desc().EndKey - if tableSpan.EndKey.Less(endKey) { - endKey = tableSpan.EndKey - } - var b kv.Batch - b.AddRawRequest(&roachpb.ClearRangeRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: lastKey.AsRawKey(), - EndKey: endKey.AsRawKey(), - }, - }) - log.VEventf(ctx, 2, "ClearRange %s - %s", lastKey, endKey) - if err := sc.db.Run(ctx, &b); err != nil { - return err - } - n = 0 - lastKey = endKey - time.Sleep(waitTime) - } - - if !ri.NeedAnother(tableSpan) { - break - } - } - - return nil -} - -// Silence unused warning. -var _ = (*SchemaChanger).maybeDropTable - -// maybe Drop a table. Return nil if successfully dropped. -func (sc *SchemaChanger) maybeDropTable(ctx context.Context, table *sqlbase.TableDescriptor) error { - if !table.Dropped() { - return nil - } - - // This can happen if a change other than the drop originally - // scheduled the changer for this table. If that's the case, - // we still need to wait for the deadline to expire. - if table.DropTime != 0 { - var timeRemaining time.Duration - if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - timeRemaining = 0 - _, zoneCfg, _, err := GetZoneConfigInTxn(ctx, txn, uint32(table.ID), - &sqlbase.IndexDescriptor{}, "", false /* getInheritedDefault */) - if err != nil { - return err - } - deadline := table.DropTime + int64(zoneCfg.GC.TTLSeconds)*time.Second.Nanoseconds() - timeRemaining = timeutil.Since(timeutil.Unix(0, deadline)) - return nil - }); err != nil { - return err - } - if timeRemaining < 0 { - return errNotHitGCTTLDeadline - } - } - - // Do all the hard work of deleting the table data and the table ID. - if err := sc.truncateTable(ctx, table); err != nil { - return err - } - - if err := sc.DropTableDesc(ctx, table, false /* traceKV */); err != nil { - return err - } - return nil -} - // maybe backfill a created table by executing the AS query. Return nil if // successfully backfilled. // @@ -513,133 +326,6 @@ func (sc *SchemaChanger) maybeMakeAddTablePublic( return nil } -// Silence unused warning. -var _ = (*SchemaChanger).maybeGCMutations - -func (sc *SchemaChanger) maybeGCMutations( - ctx context.Context, table *sqlbase.TableDescriptor, -) error { - if len(table.GCMutations) == 0 || len(sc.dropIndexTimes) == 0 { - return nil - } - - // Don't perform GC work if there are non-GC mutations waiting. - if len(table.Mutations) > 0 { - return nil - } - - // Find dropped index with earliest GC deadline. - dropped := sc.dropIndexTimes[0] - for i := 1; i < len(sc.dropIndexTimes); i++ { - if other := sc.dropIndexTimes[i]; other.deadline < dropped.deadline { - dropped = other - } - } - - var mutation sqlbase.TableDescriptor_GCDescriptorMutation - found := false - for _, gcm := range table.GCMutations { - if gcm.IndexID == sc.dropIndexTimes[0].indexID { - found = true - mutation = gcm - break - } - } - if !found { - return errors.AssertionFailedf("no GC mutation for index %d", errors.Safe(sc.dropIndexTimes[0].indexID)) - } - - // Check if the deadline for GC'd dropped index expired because - // a change other than the drop could have scheduled the changer - // for this table. - timeRemaining := timeutil.Since(timeutil.Unix(0, dropped.deadline)) - if timeRemaining < 0 { - // Return nil to allow other any mutations to make progress. - return nil - } - if err := sc.truncateIndexes(ctx, table.Version, []sqlbase.IndexDescriptor{{ID: mutation.IndexID}}); err != nil { - return err - } - - _, err := sc.leaseMgr.Publish( - ctx, - table.ID, - func(tbl *sqlbase.MutableTableDescriptor) error { - found := false - for i := 0; i < len(tbl.GCMutations); i++ { - if other := tbl.GCMutations[i]; other.IndexID == mutation.IndexID { - tbl.GCMutations = append(tbl.GCMutations[:i], tbl.GCMutations[i+1:]...) - found = true - break - } - } - - if !found { - return errDidntUpdateDescriptor - } - - return nil - }, - nil, - ) - - return err -} - -func (sc *SchemaChanger) updateDropTableJob( - ctx context.Context, - txn *kv.Txn, - jobID int64, - tableID sqlbase.ID, - status jobspb.Status, - onSuccess func(context.Context, *kv.Txn, *jobs.Job) error, -) error { - job, err := sc.jobRegistry.LoadJobWithTxn(ctx, jobID, txn) - if err != nil { - return err - } - - schemaDetails, ok := job.Details().(jobspb.SchemaChangeDetails) - if !ok { - return errors.AssertionFailedf("unexpected details for job: %T", job.Details()) - } - - lowestStatus := jobspb.Status_DONE - for i := range schemaDetails.DroppedTables { - if tableID == schemaDetails.DroppedTables[i].ID { - schemaDetails.DroppedTables[i].Status = status - } - - if lowestStatus > schemaDetails.DroppedTables[i].Status { - lowestStatus = schemaDetails.DroppedTables[i].Status - } - } - - var runningStatus jobs.RunningStatus - switch lowestStatus { - case jobspb.Status_DRAINING_NAMES: - runningStatus = RunningStatusDrainingNames - case jobspb.Status_WAIT_FOR_GC_INTERVAL: - runningStatus = RunningStatusWaitingGC - case jobspb.Status_ROCKSDB_COMPACTION: - runningStatus = RunningStatusCompaction - case jobspb.Status_DONE: - return job.WithTxn(txn).Succeeded(ctx, func(ctx context.Context, txn *kv.Txn) error { - return onSuccess(ctx, txn, job) - }) - default: - return errors.AssertionFailedf("unexpected dropped table status %d", errors.Safe(lowestStatus)) - } - - if err := job.WithTxn(txn).SetDetails(ctx, schemaDetails); err != nil { - return err - } - - return job.WithTxn(txn).RunningStatus(ctx, func(ctx context.Context, _ jobspb.Details) (jobs.RunningStatus, error) { - return runningStatus, nil - }) -} - // Drain old names from the cluster. func (sc *SchemaChanger) drainNames(ctx context.Context) error { // Publish a new version with all the names drained after everyone @@ -674,6 +360,62 @@ func (sc *SchemaChanger) drainNames(ctx context.Context) error { return err } +func startGCJob( + ctx context.Context, + db *kv.DB, + jobRegistry *jobs.Registry, + username string, + schemaChangeDescription string, + details jobspb.SchemaChangeGCDetails, +) error { + var sj *jobs.StartableJob + descriptorIDs := make([]sqlbase.ID, 0) + if len(details.Indexes) > 0 { + if len(descriptorIDs) == 0 { + descriptorIDs = []sqlbase.ID{details.ParentID} + } + } else if len(details.Tables) > 0 { + for _, table := range details.Tables { + descriptorIDs = append(descriptorIDs, table.ID) + } + } else { + // Nothing to GC. + return nil + } + + if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + jobRecord := jobs.Record{ + Description: fmt.Sprintf("GC for %s", schemaChangeDescription), + Username: username, + DescriptorIDs: descriptorIDs, + Details: details, + Progress: jobspb.SchemaChangeGCProgress{}, + NonCancelable: true, + } + var err error + if sj, err = jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn, nil /* resultCh */); err != nil { + return err + } + return nil + }); err != nil { + return err + } + if _, err := sj.Start(ctx); err != nil { + return err + } + return nil +} + +func (sc *SchemaChanger) startGCJob( + ctx context.Context, details jobspb.SchemaChangeGCDetails, isRollback bool, +) error { + description := sc.job.Payload().Description + if isRollback { + description = "ROLLBACK of " + description + } + return startGCJob(ctx, sc.db, sc.jobRegistry, sc.job.Payload().Username, description, details) +} + // Execute the entire schema change in steps. // inSession is set to false when this is called from the asynchronous // schema change execution path. @@ -711,9 +453,24 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { } } - // TODO (lucy): Skip table GC for now so we can return results to the client - // immediately after draining names when a table is dropped. Eventually we - // will queue a separate job to do this. + if tableDesc.Dropped() && sc.droppedDatabaseID == sqlbase.InvalidID { + // We've dropped this table, let's kick off a GC job. + dropTime := timeutil.Now().UnixNano() + if tableDesc.DropTime > 0 { + dropTime = tableDesc.DropTime + } + gcDetails := jobspb.SchemaChangeGCDetails{ + Tables: []jobspb.SchemaChangeGCDetails_DroppedID{ + { + ID: tableDesc.ID, + DropTime: dropTime, + }, + }, + } + if err := sc.startGCJob(ctx, gcDetails, false /* isRollback */); err != nil { + return err + } + } if err := sc.maybeBackfillCreateTableAs(ctx, tableDesc); err != nil { return err @@ -723,8 +480,6 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { return err } - // TODO (lucy): Skip index GC for now, see above - // Wait for the schema change to propagate to all nodes after this function // returns, so that the new schema is live everywhere. This is not needed for // correctness but is done to make the UI experience/tests predictable. @@ -762,6 +517,9 @@ func (sc *SchemaChanger) exec(ctx context.Context) error { // Run through mutation state machine and backfill. err = sc.runStateMachineAndBackfill(ctx) + if err != nil { + return err + } defer func() { if err := waitToUpdateLeases(err == nil /* refreshStats */); err != nil && !errors.Is(err, sqlbase.ErrDescriptorNotFound) { @@ -997,6 +755,7 @@ func (sc *SchemaChanger) waitToUpdateLeases(ctx context.Context, tableID sqlbase // done finalizes the mutations (adds new cols/indexes to the table). // It ensures that all nodes are on the current (pre-update) version of the // schema. +// It also kicks off GC jobs as needed. // Returns the updated descriptor. func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescriptor, error) { isRollback := false @@ -1080,7 +839,6 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr if indexDesc := mutation.GetIndex(); mutation.Direction == sqlbase.DescriptorMutation_DROP && indexDesc != nil { if canClearRangeForDrop(indexDesc) { - // We continue adding dropped indexes to GCMutations because that's // how we keep track of dropped index names (for, e.g., zone config // lookups), even though in the absence of a GC job there's nothing to // clean them up. @@ -1089,6 +847,20 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr sqlbase.TableDescriptor_GCDescriptorMutation{ IndexID: indexDesc.ID, }) + + dropTime := timeutil.Now().UnixNano() + indexGCDetails := jobspb.SchemaChangeGCDetails{ + Indexes: []jobspb.SchemaChangeGCDetails_DroppedIndex{ + { + IndexID: indexDesc.ID, + DropTime: dropTime, + }, + }, + ParentID: sc.tableID, + } + if err := sc.startGCJob(ctx, indexGCDetails, isRollback); err != nil { + return err + } } } if constraint := mutation.GetConstraint(); constraint != nil && @@ -1625,6 +1397,16 @@ func (sc *SchemaChanger) reverseMutation( return mutation, columns } +// GCJobTestingKnobs is for testing the Schema Changer GC job. +// Note that this is defined here for testing purposes to avoid cyclic +// dependencies. +type GCJobTestingKnobs struct { + RunBeforeResume func() +} + +// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. +func (*GCJobTestingKnobs) ModuleTestingKnobs() {} + // SchemaChangerTestingKnobs for testing the schema change execution path // through both the synchronous and asynchronous paths. type SchemaChangerTestingKnobs struct { @@ -1740,7 +1522,6 @@ type schemaChangeResumer struct { job *jobs.Job } -// Resume is part of the jobs.Resumer interface. func (r schemaChangeResumer) Resume( ctx context.Context, phs interface{}, resultsCh chan<- tree.Datums, ) error { @@ -1751,10 +1532,11 @@ func (r schemaChangeResumer) Resume( return nil } - execSchemaChange := func(tableID sqlbase.ID, mutationID sqlbase.MutationID) error { + execSchemaChange := func(tableID sqlbase.ID, mutationID sqlbase.MutationID, droppedDatabaseID sqlbase.ID) error { sc := SchemaChanger{ tableID: tableID, mutationID: mutationID, + droppedDatabaseID: droppedDatabaseID, nodeID: p.ExecCfg().NodeID.Get(), db: p.ExecCfg().DB, leaseMgr: p.ExecCfg().LeaseManager, @@ -1807,16 +1589,25 @@ func (r schemaChangeResumer) Resume( if details.DroppedDatabaseID != sqlbase.InvalidID { for i := range details.DroppedTables { droppedTable := &details.DroppedTables[i] - if err := execSchemaChange(droppedTable.ID, sqlbase.InvalidMutationID); err != nil { + if err := execSchemaChange(droppedTable.ID, sqlbase.InvalidMutationID, details.DroppedDatabaseID); err != nil { return err } } - return nil + dropTime := timeutil.Now().UnixNano() + tablesToGC := make([]jobspb.SchemaChangeGCDetails_DroppedID, len(details.DroppedTables)) + for i, table := range details.DroppedTables { + tablesToGC[i] = jobspb.SchemaChangeGCDetails_DroppedID{ID: table.ID, DropTime: dropTime} + } + databaseGCDetails := jobspb.SchemaChangeGCDetails{ + Tables: tablesToGC, + ParentID: details.DroppedDatabaseID, + } + return startGCJob(ctx, p.ExecCfg().DB, p.ExecCfg().JobRegistry, r.job.Payload().Username, r.job.Payload().Description, databaseGCDetails) } if details.TableID == sqlbase.InvalidID { return errors.AssertionFailedf("job has no database ID or table ID") } - return execSchemaChange(details.TableID, details.MutationID) + return execSchemaChange(details.TableID, details.MutationID, details.DroppedDatabaseID) } // OnFailOrCancel is part of the jobs.Resumer interface. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 539ff838b389..7a87de217557 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -454,9 +455,10 @@ func TestRaceWithBackfill(t *testing.T) { } params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { notifyBackfill() @@ -619,9 +621,10 @@ func TestDropWhileBackfill(t *testing.T) { } params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { if partialBackfillDone.Load().(bool) { @@ -730,11 +733,12 @@ func TestBackfillErrors(t *testing.T) { const numNodes, chunkSize, maxValue = 5, 100, 4000 params, _ := tests.CreateTestServerParams() + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, }, + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, } tc := serverutils.StartTestCluster(t, numNodes, @@ -823,6 +827,7 @@ CREATE UNIQUE INDEX vidx ON t.test (v); if err := checkTableKeyCountExact(ctx, kvDB, keyCount); err != nil { t.Fatal(err) } + close(blockGC) } // Test aborting a schema change backfill transaction and check that the @@ -1507,6 +1512,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); // doesn't leave the DB in a bad state. func TestSchemaChangeFailureAfterCheckpointing(t *testing.T) { defer leaktest.AfterTest(t)() + defer gcjob.SetSmallMaxGCIntervalForTest()() params, _ := tests.CreateTestServerParams() const chunkSize = 200 attempts := 0 @@ -1514,14 +1520,16 @@ func TestSchemaChangeFailureAfterCheckpointing(t *testing.T) { // attempt 2: writing the third chunk returns a permanent failure // purge the schema change. expectedAttempts := 3 + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, // Aggressively checkpoint, so that a schema change // failure happens after a checkpoint has been written. WriteCheckpointInterval: time.Nanosecond, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { attempts++ @@ -1588,6 +1596,7 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); if checks := tableDesc.AllActiveAndInactiveChecks(); len(checks) > 0 { t.Fatalf("found checks %+v", checks) } + close(blockGC) } // TestSchemaChangeReverseMutations tests that schema changes get reversed @@ -1803,7 +1812,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") // Add immediate GC TTL to allow index creation purge to complete. if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { t.Fatal(err) @@ -1825,13 +1833,18 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); } // State of jobs table + t.Skip("TODO(pbardea): The following fails due to causes seemingly unrelated to GC") runner := sqlutils.SQLRunner{DB: sqlDB} + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + const migrationJobOffset = 4 for i, tc := range testCases { status := jobs.StatusSucceeded if tc.errString != "" { status = jobs.StatusFailed } - if err := jobutils.VerifySystemJob(t, &runner, i, jobspb.TypeSchemaChange, status, jobs.Record{ + if err := jobutils.VerifySystemJob(t, &runner, migrationJobOffset+i, jobspb.TypeSchemaChange, status, jobs.Record{ Username: security.RootUser, Description: tc.sql, DescriptorIDs: sqlbase.IDs{ @@ -2059,12 +2072,13 @@ func TestSchemaUniqueColumnDropFailure(t *testing.T) { const maxValue = (expectedColumnBackfillAttempts/2+1)*chunkSize + 1 params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Turn on knob to disable GC once the GC job is implemented. BackfillChunkSize: chunkSize, // Aggressively checkpoint, so that a schema change // failure happens after a checkpoint has been written. WriteCheckpointInterval: time.Nanosecond, }, + // Disable GC job. + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { select {} }}, DistSQL: &execinfra.TestingKnobs{ RunBeforeBackfillChunk: func(sp roachpb.Span) error { attempts++ @@ -2292,7 +2306,6 @@ func TestPrimaryKeyChangeWithPrecedingIndexCreation(t *testing.T) { wg.Wait() - t.Skip("skipping last portion of job until schema change GC job is implemented") // There should be 4 k/v pairs per row: // * the original rowid index. // * the old index on v. @@ -2347,7 +2360,6 @@ CREATE TABLE t.test (k INT NOT NULL, v INT, v2 INT NOT NULL)`); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of job until schema change GC job is implemented") // There should be 4 k/v pairs per row: // * the original rowid index. // * the new primary index on v2. @@ -2822,12 +2834,10 @@ func TestPrimaryKeyIndexRewritesGetRemoved(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() + defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = 10 * time.Millisecond + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) @@ -2844,7 +2854,7 @@ ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (v); `); err != nil { t.Fatal(err) } - t.Skip("skipping last portion of test until schema change GC job is implemented") + // Wait for the async schema changer to run. tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { @@ -3713,12 +3723,9 @@ func TestTruncateInternals(t *testing.T) { const maxValue = 2000 params, _ := tests.CreateTestServerParams() // Disable schema changes. + blockGC := make(chan struct{}) params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - SchemaChangeJobNoOp: func() bool { - return true - }, - }, + GCJob: &sql.GCJobTestingKnobs{RunBeforeResume: func() { <-blockGC }}, } s, sqlDB, kvDB := serverutils.StartServer(t, params) @@ -3801,21 +3808,30 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL DEFAULT (DECIMAL '3.14 if !droppedDesc.Dropped() { t.Fatalf("bad state = %s", droppedDesc.State) } - // TODO (lucy): Test that the GC job is correctly queued, once it's - // implemented. This test should actually just disable GC instead of disabling - // the initial schema change. + + close(blockGC) + + sqlRun := sqlutils.MakeSQLRunner(sqlDB) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChangeGC, jobs.StatusRunning, jobs.Record{ + Description: "GC for TRUNCATE TABLE t.public.test", + Username: security.RootUser, + DescriptorIDs: sqlbase.IDs{tableDesc.ID}, + }) + }) } // Test that a table truncation completes properly. func TestTruncateCompletion(t *testing.T) { defer leaktest.AfterTest(t)() const maxValue = 2000 + + defer func(i time.Duration) { jobs.DefaultAdoptInterval = i }(jobs.DefaultAdoptInterval) + jobs.DefaultAdoptInterval = 10 * time.Millisecond + + defer gcjob.SetSmallMaxGCIntervalForTest()() + params, _ := tests.CreateTestServerParams() - params.Knobs = base.TestingKnobs{ - SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ - // TODO (lucy): Un-skip this test when the GC job is implemented. - }, - } s, sqlDB, kvDB := serverutils.StartServer(t, params) ctx := context.TODO() @@ -3851,7 +3867,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL REFERENCES t.pi (d) DE tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test") - t.Skip("skipping last portion of test until schema change GC job is implemented") // Add a zone config. var cfg zonepb.ZoneConfig cfg, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID) @@ -3938,7 +3953,12 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL REFERENCES t.pi (d) DE // Ensure that the job is marked as succeeded. sqlRun := sqlutils.MakeSQLRunner(sqlDB) - if err := jobutils.VerifySystemJob(t, sqlRun, 0, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ + + // TODO (lucy): The offset of +4 accounts for unrelated startup migrations. + // Maybe this test API should use an offset starting from the most recent job + // instead. + schemaChangeJobOffset := 4 + if err := jobutils.VerifySystemJob(t, sqlRun, schemaChangeJobOffset+2, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobs.Record{ Username: security.RootUser, Description: "TRUNCATE TABLE t.public.test", DescriptorIDs: sqlbase.IDs{ @@ -5639,7 +5659,6 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT8); atomic.StoreUint32(&enableAsyncSchemaChanges, 1) // Add immediate GC TTL to allow index creation purge to complete. - // TODO (lucy): if this test were't skipped we'd need to GC quickly here if _, err := addImmediateGCZoneConfig(sqlDB, tableDesc.ID); err != nil { t.Fatal(err) }