Skip to content

Commit

Permalink
Merge pull request #47144 from lucy-zhang/backport20.1-47136
Browse files Browse the repository at this point in the history
release-20.1: sqlmigrations: create GC jobs for failed import/restore jobs from 19.2
  • Loading branch information
lucy-zhang authored Apr 7, 2020
2 parents 95887d4 + 16b43ee commit a84b18f
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 28 deletions.
47 changes: 47 additions & 0 deletions pkg/sql/schema_change_migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,50 @@ func TestMigrateSchemaChanges(t *testing.T) {
}
}
}

// TestGCJobCreated tests that a table descriptor in the DROP state with no
// running job has a GC job created for it.
func TestGCJobCreated(t *testing.T) {
defer leaktest.AfterTest(t)()
defer setTestJobsAdoptInterval()()
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLMigrationManager = &sqlmigrations.MigrationManagerTestingKnobs{
AlwaysRunJobMigration: true,
}
s, sqlDB, kvDB := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
ctx := context.Background()
sqlRunner := sqlutils.MakeSQLRunner(sqlDB)

// Create a table and then force it to be in the DROP state.
if _, err := sqlDB.Exec(`CREATE DATABASE t; CREATE TABLE t.test();`); err != nil {
t.Fatal(err)
}
tableDesc := sqlbase.GetTableDescriptor(kvDB, "t", "test")
tableDesc.State = sqlbase.TableDescriptor_DROP
tableDesc.Version++
tableDesc.DropTime = 1
if err := kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetSystemConfigTrigger(); err != nil {
return err
}
if err := sqlbase.RemoveObjectNamespaceEntry(ctx, txn, tableDesc.ID, tableDesc.ParentID, tableDesc.Name, false /* kvTrace */); err != nil {
return err
}
return kvDB.Put(ctx, sqlbase.MakeDescMetadataKey(tableDesc.GetID()), sqlbase.WrapDescriptor(tableDesc))
}); err != nil {
t.Fatal(err)
}

// Run the migration.
migMgr := s.MigrationManager().(*sqlmigrations.Manager)
if err := migMgr.StartSchemaChangeJobMigration(ctx); err != nil {
t.Fatal(err)
}

// Check that a GC job was created and completed successfully.
sqlRunner.CheckQueryResultsRetry(t,
"SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE GC' AND status = 'succeeded'",
[][]string{{"1"}},
)
}
95 changes: 67 additions & 28 deletions pkg/sqlmigrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,23 +803,28 @@ func migrateSchemaChangeJobs(ctx context.Context, r runner, registry *jobs.Regis

// Finally, we iterate through all table descriptors and jobs, and create jobs
// for any tables in the ADD state or that have draining names that don't
// already have jobs. We start by getting all descriptors and all running jobs
// in a single transaction. Each eligible table then gets a job created for
// it, each in a separate transaction; in each of those transactions, we write
// a table-specific KV with a key prefixed by schemaChangeJobMigrationKey to
// try to prevent more than one such job from being created for the table.
// already have jobs. We also create a GC job for all tables in the DROP state
// with no associated schema change or GC job, which can result from failed
// IMPORT and RESTORE jobs whose table data wasn't fully GC'ed.
//
// We start by getting all descriptors and all running jobs in a single
// transaction. Each eligible table then gets a job created for it, each in a
// separate transaction; in each of those transactions, we write a table-
// specific KV with a key prefixed by schemaChangeJobMigrationKey to try to
// prevent more than one such job from being created for the table.
//
// This process ensures that every table that entered into one of these
// intermediate states (being added, or having draining names) in 19.2 will
// have a schema change job created for it in 20.1, so that the table can
// finish being processed. It's not essential for only one job to be created
// for each table, since a redundant schema change job is a no-op, but we make
// an effort to do that anyway.
// intermediate states (being added/dropped, or having draining names) in 19.2
// will have a job created for it in 20.1, so that the table can finish being
// processed. It's not essential for only one job to be created for each
// table, since a redundant schema change job is a no-op, but we make an
// effort to do that anyway.
//
// There are probably more efficient ways to do this part of the migration,
// but the current approach seemed like the most straightforward.
var allDescs []sqlbase.DescriptorProto
jobsForDesc := make(map[sqlbase.ID][]int64)
schemaChangeJobsForDesc := make(map[sqlbase.ID][]int64)
gcJobsForDesc := make(map[sqlbase.ID][]int64)
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
descs, err := sql.GetAllDescriptors(ctx, txn)
if err != nil {
Expand All @@ -842,15 +847,20 @@ func migrateSchemaChangeJobs(ctx context.Context, r runner, registry *jobs.Regis
if err != nil {
return err
}
details := payload.GetSchemaChange()
if details == nil || details.FormatVersion < jobspb.JobResumerFormatVersion {
continue
}
if details.TableID != sqlbase.InvalidID {
jobsForDesc[details.TableID] = append(jobsForDesc[details.TableID], jobID)
} else {
for _, t := range details.DroppedTables {
jobsForDesc[t.ID] = append(jobsForDesc[t.ID], jobID)
if details := payload.GetSchemaChange(); details != nil {
if details.FormatVersion < jobspb.JobResumerFormatVersion {
continue
}
if details.TableID != sqlbase.InvalidID {
schemaChangeJobsForDesc[details.TableID] = append(schemaChangeJobsForDesc[details.TableID], jobID)
} else {
for _, t := range details.DroppedTables {
schemaChangeJobsForDesc[t.ID] = append(schemaChangeJobsForDesc[t.ID], jobID)
}
}
} else if details := payload.GetSchemaChangeGC(); details != nil {
for _, t := range details.Tables {
gcJobsForDesc[t.ID] = append(gcJobsForDesc[t.ID], jobID)
}
}
}
Expand All @@ -870,7 +880,7 @@ func migrateSchemaChangeJobs(ctx context.Context, r runner, registry *jobs.Regis
// appropriate to do nothing without returning an error.
log.Warningf(
ctx,
"tried to add job for table %d which is neither being added nor has draining names",
"tried to add schema change job for table %d which is neither being added nor has draining names",
desc.ID,
)
return nil
Expand All @@ -890,21 +900,40 @@ func migrateSchemaChangeJobs(ctx context.Context, r runner, registry *jobs.Regis
if err != nil {
return err
}
log.Infof(ctx, "migration created new job %d: %s", *job.ID(), description)
log.Infof(ctx, "migration created new schema change job %d: %s", *job.ID(), description)
return nil
}

createGCJobForTable := func(txn *kv.Txn, desc *sqlbase.TableDescriptor) error {
record := sql.CreateGCJobRecord(
fmt.Sprintf("table %d", desc.ID),
security.NodeUser,
sqlbase.IDs{desc.ID},
jobspb.SchemaChangeGCDetails{
Tables: []jobspb.SchemaChangeGCDetails_DroppedID{{ID: desc.ID, DropTime: desc.DropTime}},
})
job, err := registry.CreateJobWithTxn(ctx, record, txn)
if err != nil {
return err
}
log.Infof(ctx, "migration created new GC job %d for table %d", *job.ID(), desc.ID)
return nil
}

log.Infof(ctx, "evaluating tables for creating jobs")
for _, desc := range allDescs {
switch desc := desc.(type) {
case *sqlbase.TableDescriptor:
if len(jobsForDesc[desc.ID]) > 0 {
log.VEventf(ctx, 3, "table %d has running jobs, skipping", desc.ID)
if scJobs := schemaChangeJobsForDesc[desc.ID]; len(scJobs) > 0 {
log.VEventf(ctx, 3, "table %d has running schema change jobs %v, skipping", desc.ID, scJobs)
continue
} else if gcJobs := gcJobsForDesc[desc.ID]; len(gcJobs) > 0 {
log.VEventf(ctx, 3, "table %d has running GC jobs %v, skipping", desc.ID, gcJobs)
continue
}
if !desc.Adding() && !desc.HasDrainingNames() {
if !desc.Adding() && !desc.Dropped() && !desc.HasDrainingNames() {
log.VEventf(ctx, 3,
"table %d is not being added and does not have draining names, skipping",
"table %d is not being added or dropped and does not have draining names, skipping",
desc.ID,
)
continue
Expand All @@ -919,8 +948,18 @@ func migrateSchemaChangeJobs(ctx context.Context, r runner, registry *jobs.Regis
log.VEventf(ctx, 3, "table %d already processed in migration", desc.ID)
return nil
}
if err := createSchemaChangeJobForTable(txn, desc); err != nil {
return err
if desc.Adding() || desc.HasDrainingNames() {
if err := createSchemaChangeJobForTable(txn, desc); err != nil {
return err
}
} else if desc.Dropped() {
// Note that a table can be both in the DROP state and have draining
// names. In that case it was enough to just create a schema change
// job, as in the case above, because that job will itself create a
// GC job.
if err := createGCJobForTable(txn, desc); err != nil {
return err
}
}
if err := txn.Put(ctx, key, startTime); err != nil {
return err
Expand Down

0 comments on commit a84b18f

Please sign in to comment.