From 0c5bed9bfd3da8dbe867009fd466ad27b7f565d5 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Fri, 19 Feb 2021 13:40:45 -0500 Subject: [PATCH] sql,backupccl: stop using StartableJobs when not needed We were using `StartableJob`s in the schema change job and the restore job to start more jobs. This was unnecessary because we didn't need the ability to get results from the job resumer that `StartableJob` provide. It also incurred problems because creating a `StartableJob` creates state that can't be cleaned up if the transaction was restarted. This commit switches those jobs to normal jobs that are claimed and started by the registry. To recover the behavior where jobs are started "immediately," we notify the job registry adoption loop. Some schema change tests had to be skipped because they relied on the ability of the schema change job to manually start jobs. Release note: None --- pkg/ccl/backupccl/restore_job.go | 83 +++++-------- .../restore_schema_change_creation.go | 31 +++-- pkg/jobs/registry.go | 31 +++-- pkg/sql/alter_column_type_test.go | 3 + pkg/sql/schema_changer.go | 109 ++++++------------ 5 files changed, 96 insertions(+), 161 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index a6a16f42a213..955459c130d4 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1324,10 +1324,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error // public. // TODO (lucy): Ideally we'd just create the database in the public state in // the first place, as a special case. - var newDescriptorChangeJobs []*jobs.StartableJob publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) { - newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details) - return err + return r.publishDescriptors(ctx, txn, descsCol, details) } if err := descs.Txn( ctx, r.execCfg.Settings, r.execCfg.LeaseManager, r.execCfg.InternalExecutor, @@ -1335,11 +1333,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error ); err != nil { return err } - // Start the schema change jobs we created. - for _, newJob := range newDescriptorChangeJobs { - if err := newJob.Start(ctx); err != nil { - return err - } + if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { + return err } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { @@ -1383,9 +1378,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error if err := insertStats(ctx, r.job, p.ExecCfg(), latestStats); err != nil { return errors.Wrap(err, "inserting table statistics") } - var newDescriptorChangeJobs []*jobs.StartableJob publishDescriptors := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) (err error) { - newDescriptorChangeJobs, err = r.publishDescriptors(ctx, txn, descsCol, details) + err = r.publishDescriptors(ctx, txn, descsCol, details) return err } if err := descs.Txn( @@ -1396,12 +1390,8 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error } // Reload the details as we may have updated the job. details = r.job.Details().(jobspb.RestoreDetails) - - // Start the schema change jobs we created. - for _, newJob := range newDescriptorChangeJobs { - if err := newJob.Start(ctx); err != nil { - return err - } + if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { + return err } if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { @@ -1507,24 +1497,13 @@ func insertStats( // with a new value even if this transaction does not commit. func (r *restoreResumer) publishDescriptors( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, details jobspb.RestoreDetails, -) (newDescriptorChangeJobs []*jobs.StartableJob, err error) { - defer func() { - if err == nil { - return - } - for _, j := range newDescriptorChangeJobs { - if cleanupErr := j.CleanupOnRollback(ctx); cleanupErr != nil { - log.Warningf(ctx, "failed to clean up job %d: %v", j.ID(), cleanupErr) - } - } - newDescriptorChangeJobs = nil - }() +) (err error) { if details.DescriptorsPublished { - return nil, nil + return nil } if fn := r.testingKnobs.beforePublishingDescriptors; fn != nil { if err := fn(); err != nil { - return nil, err + return err } } log.VEventf(ctx, 1, "making tables live") @@ -1551,10 +1530,10 @@ func (r *restoreResumer) publishDescriptors( for _, tbl := range details.TableDescs { mutTable, err := descsCol.GetMutableTableVersionByID(ctx, tbl.GetID(), txn) if err != nil { - return newDescriptorChangeJobs, err + return err } if err := checkVersion(mutTable, tbl.Version); err != nil { - return newDescriptorChangeJobs, err + return err } allMutDescs = append(allMutDescs, mutTable) newTables = append(newTables, mutTable.TableDesc()) @@ -1564,12 +1543,11 @@ func (r *restoreResumer) publishDescriptors( if details.DescriptorCoverage != tree.AllDescriptors { // Convert any mutations that were in progress on the table descriptor // when the backup was taken, and convert them to schema change jobs. - newJobs, err := createSchemaChangeJobsFromMutations(ctx, - r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable) - if err != nil { - return newDescriptorChangeJobs, err + if err := createSchemaChangeJobsFromMutations(ctx, + r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable, + ); err != nil { + return err } - newDescriptorChangeJobs = append(newDescriptorChangeJobs, newJobs...) } } // For all of the newly created types, make type schema change jobs for any @@ -1577,28 +1555,28 @@ func (r *restoreResumer) publishDescriptors( for _, typDesc := range details.TypeDescs { typ, err := descsCol.GetMutableTypeVersionByID(ctx, txn, typDesc.GetID()) if err != nil { - return newDescriptorChangeJobs, err + return err } if err := checkVersion(typ, typDesc.Version); err != nil { - return newDescriptorChangeJobs, err + return err } allMutDescs = append(allMutDescs, typ) newTypes = append(newTypes, typ.TypeDesc()) if typ.HasPendingSchemaChanges() && details.DescriptorCoverage != tree.AllDescriptors { - typJob, err := createTypeChangeJobFromDesc(ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ) - if err != nil { - return newDescriptorChangeJobs, err + if err := createTypeChangeJobFromDesc( + ctx, r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), typ, + ); err != nil { + return err } - newDescriptorChangeJobs = append(newDescriptorChangeJobs, typJob) } } for _, sc := range details.SchemaDescs { mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, sc.ID, txn) if err != nil { - return newDescriptorChangeJobs, err + return err } if err := checkVersion(mutDesc, sc.Version); err != nil { - return newDescriptorChangeJobs, err + return err } mutSchema := mutDesc.(*schemadesc.Mutable) allMutDescs = append(allMutDescs, mutSchema) @@ -1611,10 +1589,10 @@ func (r *restoreResumer) publishDescriptors( // field in the details? mutDesc, err := descsCol.GetMutableDescriptorByID(ctx, dbDesc.ID, txn) if err != nil { - return newDescriptorChangeJobs, err + return err } if err := checkVersion(mutDesc, dbDesc.Version); err != nil { - return newDescriptorChangeJobs, err + return err } mutDB := mutDesc.(*dbdesc.Mutable) // TODO(lucy,ajwerner): Remove this in 21.1. @@ -1631,17 +1609,17 @@ func (r *restoreResumer) publishDescriptors( if err := descsCol.WriteDescToBatch( ctx, false /* kvTrace */, desc, b, ); err != nil { - return newDescriptorChangeJobs, err + return err } } if err := txn.Run(ctx, b); err != nil { - return newDescriptorChangeJobs, errors.Wrap(err, "publishing tables") + return errors.Wrap(err, "publishing tables") } for _, tenant := range details.Tenants { if err := sql.ActivateTenant(ctx, r.execCfg, txn, tenant.ID); err != nil { - return newDescriptorChangeJobs, err + return err } } @@ -1652,11 +1630,10 @@ func (r *restoreResumer) publishDescriptors( details.SchemaDescs = newSchemas details.DatabaseDescs = newDBs if err := r.job.SetDetails(ctx, txn, details); err != nil { - return newDescriptorChangeJobs, errors.Wrap(err, + return errors.Wrap(err, "updating job details after publishing tables") } - - return newDescriptorChangeJobs, nil + return nil } // OnFailOrCancel is part of the jobs.Resumer interface. Removes KV data that diff --git a/pkg/ccl/backupccl/restore_schema_change_creation.go b/pkg/ccl/backupccl/restore_schema_change_creation.go index 6d02e94edbcf..e9523a6428f7 100644 --- a/pkg/ccl/backupccl/restore_schema_change_creation.go +++ b/pkg/ccl/backupccl/restore_schema_change_creation.go @@ -124,7 +124,7 @@ func createTypeChangeJobFromDesc( txn *kv.Txn, username security.SQLUsername, typ catalog.TypeDescriptor, -) (*jobs.StartableJob, error) { +) error { // Any non-public members in the type descriptor are accumulated as // "transitioning" and their promotion or removal will be handled in a // single job. @@ -146,17 +146,18 @@ func createTypeChangeJobFromDesc( // Type change jobs are not cancellable. NonCancelable: true, } - job, err := jr.CreateStartableJobWithTxn(ctx, record, txn) + job, err := jr.CreateJobWithTxn(ctx, record, txn) if err != nil { - return nil, err + return err } - return job, nil + log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID()) + return nil } -// createSchemaChangeJobsFromMutations creates and runs jobs for any mutations -// on the table descriptor. It also updates tableDesc's MutationJobs to -// reference the new jobs. This is only used to reconstruct a job based off a -// mutation, namely during RESTORE. +// createSchemaChangeJobsFromMutations creates jobs for any mutations on the +// table descriptor. It also updates tableDesc's MutationJobs to reference the +// new jobs. This is only used to reconstruct a job based off a mutation, namely +// during RESTORE. func createSchemaChangeJobsFromMutations( ctx context.Context, jr *jobs.Registry, @@ -164,9 +165,8 @@ func createSchemaChangeJobsFromMutations( txn *kv.Txn, username security.SQLUsername, tableDesc *tabledesc.Mutable, -) ([]*jobs.StartableJob, error) { +) error { mutationJobs := make([]descpb.TableDescriptor_MutationJob, 0, len(tableDesc.Mutations)) - newJobs := make([]*jobs.StartableJob, 0, len(tableDesc.Mutations)) seenMutations := make(map[descpb.MutationID]bool) for _, mutation := range tableDesc.Mutations { if seenMutations[mutation.MutationID] { @@ -178,7 +178,7 @@ func createSchemaChangeJobsFromMutations( seenMutations[mutationID] = true jobDesc, mutationCount, err := jobDescriptionFromMutationID(tableDesc.TableDesc(), mutationID) if err != nil { - return nil, err + return err } spanList := make([]jobspb.ResumeSpanList, mutationCount) for i := range spanList { @@ -201,20 +201,19 @@ func createSchemaChangeJobsFromMutations( }, Progress: jobspb.SchemaChangeProgress{}, } - newJob, err := jr.CreateStartableJobWithTxn(ctx, jobRecord, txn) + newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn) if err != nil { - return nil, err + return err } newMutationJob := descpb.TableDescriptor_MutationJob{ MutationID: mutationID, JobID: *newJob.ID(), } mutationJobs = append(mutationJobs, newMutationJob) - newJobs = append(newJobs, newJob) log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d", - newJob.ID(), tableDesc.ID, mutationID) + *newJob.ID(), tableDesc.ID, mutationID) } tableDesc.MutationJobs = mutationJobs - return newJobs, nil + return nil } diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 5cf53dc33c1f..1f22558588c0 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -306,6 +306,18 @@ func (r *Registry) CreateAndStartJob( return rj, nil } +// NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs. +func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error { + select { + case r.adoptionCh <- resumeClaimedJobs: + case <-r.stopper.ShouldQuiesce(): + return stop.ErrUnavailable + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + // Run starts previously unstarted jobs from a list of scheduled // jobs. Canceling ctx interrupts the waiting but doesn't cancel the jobs. func (r *Registry) Run(ctx context.Context, ex sqlutil.InternalExecutor, jobs []int64) error { @@ -313,24 +325,11 @@ func (r *Registry) Run(ctx context.Context, ex sqlutil.InternalExecutor, jobs [] return nil } log.Infof(ctx, "scheduled jobs %+v", jobs) + if err := r.NotifyToAdoptJobs(ctx); err != nil { + return err + } buf := bytes.Buffer{} - usingSQLLiveness := r.startUsingSQLLivenessAdoption(ctx) for i, id := range jobs { - // In the pre-20.2 and mixed-version state, the adoption loop needs to be - // notified once per job (in the worst case) in order to ensure that all - // newly created jobs get adopted in a timely manner. In the sqlliveness - // world of 20.2 and later, we only need to notify the loop once as the - // newly created jobs are already claimed. The adoption loop will merely - // start all previously claimed jobs. - if !usingSQLLiveness || i == 0 { - select { - case r.adoptionCh <- resumeClaimedJobs: - case <-r.stopper.ShouldQuiesce(): - return stop.ErrUnavailable - case <-ctx.Done(): - return ctx.Err() - } - } if i > 0 { buf.WriteString(",") } diff --git a/pkg/sql/alter_column_type_test.go b/pkg/sql/alter_column_type_test.go index bc6386a7981f..1a8e45eda101 100644 --- a/pkg/sql/alter_column_type_test.go +++ b/pkg/sql/alter_column_type_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -35,6 +36,7 @@ import ( func TestInsertBeforeOldColumnIsDropped(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 60801, "needs to be rewritten to not use RunBeforeChildJobs") params, _ := tests.CreateTestServerParams() childJobStartNotification := make(chan struct{}) @@ -97,6 +99,7 @@ ALTER TABLE test ALTER COLUMN x TYPE STRING;`) func TestInsertBeforeOldColumnIsDroppedUsingExpr(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + skip.WithIssue(t, 60801, "needs to be rewritten to not use RunBeforeChildJobs") params, _ := tests.CreateTestServerParams() childJobStartNotification := make(chan struct{}) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 9b9f69983c46..422c914f6895 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -471,22 +471,15 @@ func startGCJob( schemaChangeDescription string, details jobspb.SchemaChangeGCDetails, ) error { - var sj *jobs.StartableJob jobRecord := CreateGCJobRecord(schemaChangeDescription, username, details) if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - var err error - if sj, err = jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn); err != nil { - return err - } - return nil - }); err != nil { + _, err := jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn) return err - } - log.Infof(ctx, "starting GC job %d", *sj.ID()) - if err := sj.Start(ctx); err != nil { + }); err != nil { return err } - return nil + // TODO (lucy): Add logging once we create the job ID outside the txn closure. + return jobRegistry.NotifyToAdoptJobs(ctx) } func (sc *SchemaChanger) execLogTags() *logtags.Buffer { @@ -839,7 +832,6 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er // Check if the target table needs to be cleaned up at all. If the target // table was in the ADD state and the schema change failed, then we need to // clean up the descriptor. - var cleanupJob *jobs.StartableJob if err := sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) if err != nil { @@ -877,27 +869,15 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er }, }, ) - job, err := sc.jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn) - if err != nil { + if _, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn); err != nil { return err } - cleanupJob = job return txn.Run(ctx, b) }); err != nil { - if cleanupJob != nil { - if rollbackErr := cleanupJob.CleanupOnRollback(ctx); rollbackErr != nil { - log.Warningf(ctx, "failed to clean up job: %v", rollbackErr) - } - } return err } - if cleanupJob != nil { - if err := cleanupJob.Start(ctx); err != nil { - log.Warningf(ctx, "starting job %d failed with error: %v", *cleanupJob.ID(), err) - } - log.VEventf(ctx, 2, "started job %d", *cleanupJob.ID()) - } - return nil + // TODO (lucy): Add logging once we create the job ID outside the txn closure. + return sc.jobRegistry.NotifyToAdoptJobs(ctx) } // RunStateMachineBeforeBackfill moves the state machine forward @@ -980,7 +960,7 @@ func (sc *SchemaChanger) RunStateMachineBeforeBackfill(ctx context.Context) erro func (sc *SchemaChanger) createIndexGCJob( ctx context.Context, index *descpb.IndexDescriptor, txn *kv.Txn, jobDesc string, -) (*jobs.StartableJob, error) { +) error { dropTime := timeutil.Now().UnixNano() indexGCDetails := jobspb.SchemaChangeGCDetails{ Indexes: []jobspb.SchemaChangeGCDetails_DroppedIndex{ @@ -993,12 +973,12 @@ func (sc *SchemaChanger) createIndexGCJob( } gcJobRecord := CreateGCJobRecord(jobDesc, sc.job.Payload().UsernameProto.Decode(), indexGCDetails) - indexGCJob, err := sc.jobRegistry.CreateStartableJobWithTxn(ctx, gcJobRecord, txn) + indexGCJob, err := sc.jobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn) if err != nil { - return nil, err + return err } - log.VEventf(ctx, 2, "created index GC job %d", *indexGCJob.ID()) - return indexGCJob, nil + log.Infof(ctx, "created index GC job %d", *indexGCJob.ID()) + return nil } // WaitToUpdateLeases until the entire cluster has been updated to the latest @@ -1040,13 +1020,10 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // Jobs (for GC, etc.) that need to be started immediately after the table // descriptor updates are published. - var childJobs []*jobs.StartableJob var didUpdate bool modified, err := sc.txnWithModified(ctx, func( ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, ) error { - childJobs = nil - scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) if err != nil { return err @@ -1101,11 +1078,9 @@ func (sc *SchemaChanger) done(ctx context.Context) error { description = "ROLLBACK of " + description } - childJob, err := sc.createIndexGCJob(ctx, indexDesc, txn, description) - if err != nil { + if err := sc.createIndexGCJob(ctx, indexDesc, txn, description); err != nil { return err } - childJobs = append(childJobs, childJob) } } if constraint := mutation.GetConstraint(); constraint != nil && @@ -1211,34 +1186,26 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // existing indexes on the table. if mutation.Direction == descpb.DescriptorMutation_ADD { desc := fmt.Sprintf("REFRESH MATERIALIZED VIEW %q cleanup", scTable.Name) - pkJob, err := sc.createIndexGCJob(ctx, scTable.GetPrimaryIndex().IndexDesc(), txn, desc) - if err != nil { + if err := sc.createIndexGCJob(ctx, scTable.GetPrimaryIndex().IndexDesc(), txn, desc); err != nil { return err } - childJobs = append(childJobs, pkJob) for _, idx := range scTable.PublicNonPrimaryIndexes() { - idxJob, err := sc.createIndexGCJob(ctx, idx.IndexDesc(), txn, desc) - if err != nil { + if err := sc.createIndexGCJob(ctx, idx.IndexDesc(), txn, desc); err != nil { return err } - childJobs = append(childJobs, idxJob) } } else if mutation.Direction == descpb.DescriptorMutation_DROP { // Otherwise, the refresh job ran into an error and is being rolled // back. So, we need to GC all of the indexes that were going to be // created, in case any data was written to them. desc := fmt.Sprintf("ROLLBACK OF REFRESH MATERIALIZED VIEW %q", scTable.Name) - pkJob, err := sc.createIndexGCJob(ctx, &refresh.NewPrimaryIndex, txn, desc) - if err != nil { + if err := sc.createIndexGCJob(ctx, &refresh.NewPrimaryIndex, txn, desc); err != nil { return err } - childJobs = append(childJobs, pkJob) for i := range refresh.NewIndexes { - idxJob, err := sc.createIndexGCJob(ctx, &refresh.NewIndexes[i], txn, desc) - if err != nil { + if err := sc.createIndexGCJob(ctx, &refresh.NewIndexes[i], txn, desc); err != nil { return err } - childJobs = append(childJobs, idxJob) } } } @@ -1345,7 +1312,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { } // If we performed MakeMutationComplete on a PrimaryKeySwap mutation, then we need to start // a job for the index deletion mutations that the primary key swap mutation added, if any. - if childJobs, err = sc.queueCleanupJobs(ctx, scTable, txn, childJobs); err != nil { + if err := sc.queueCleanupJobs(ctx, scTable, txn); err != nil { return err } } @@ -1358,7 +1325,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { // If we performed MakeMutationComplete on a computed column swap, then // we need to start a job for the column deletion that the swap mutation // added if any. - if childJobs, err = sc.queueCleanupJobs(ctx, scTable, txn, childJobs); err != nil { + if err := sc.queueCleanupJobs(ctx, scTable, txn); err != nil { return err } } @@ -1444,25 +1411,9 @@ func (sc *SchemaChanger) done(ctx context.Context) error { sc.mutationID, info) }) - if fn := sc.testingKnobs.RunBeforeChildJobs; fn != nil { - if len(childJobs) != 0 { - fn() - } - } if err != nil { - for _, job := range childJobs { - if rollbackErr := job.CleanupOnRollback(ctx); rollbackErr != nil { - log.Warningf(ctx, "failed to clean up job: %v", rollbackErr) - } - } return err } - for _, job := range childJobs { - if err := job.Start(ctx); err != nil { - log.Warningf(ctx, "starting job %d failed with error: %v", *job.ID(), err) - } - log.VEventf(ctx, 2, "started job %d", *job.ID()) - } // Wait for the modified versions of tables other than the table we're // updating to have their leases updated. for _, desc := range modified { @@ -1474,6 +1425,10 @@ func (sc *SchemaChanger) done(ctx context.Context) error { return err } } + // Notify the job registry to start jobs, in case we started any. + if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil { + return err + } return nil } @@ -2008,6 +1963,10 @@ type SchemaChangerTestingKnobs struct { // RunBeforeChildJobs is called just before child jobs are run to clean up // dropped schema elements after a mutation. + // Warning: This knob is currently unimplemented in the schema changer because + // we no longer have a way of guaranteeing that this knob runs before the jobs + // start. It remains in some skipped tests, which need to be reworked and then + // un-skipped. RunBeforeChildJobs func() // RunBeforeIndexValidation is called just before starting the index validation, @@ -2455,8 +2414,8 @@ func init() { // queueCleanupJobs checks if the completed schema change needs to start a // child job to clean up dropped schema elements. func (sc *SchemaChanger) queueCleanupJobs( - ctx context.Context, scDesc *tabledesc.Mutable, txn *kv.Txn, childJobs []*jobs.StartableJob, -) ([]*jobs.StartableJob, error) { + ctx context.Context, scDesc *tabledesc.Mutable, txn *kv.Txn, +) error { // Create jobs for dropped columns / indexes to be deleted. mutationID := scDesc.ClusterVersion.NextMutationID span := scDesc.PrimaryIndexSpan(sc.execCfg.Codec) @@ -2486,19 +2445,17 @@ func (sc *SchemaChanger) queueCleanupJobs( Progress: jobspb.SchemaChangeProgress{}, NonCancelable: true, } - job, err := sc.jobRegistry.CreateStartableJobWithTxn(ctx, jobRecord, txn) + job, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn) if err != nil { - return nil, err + return err } - log.VEventf(ctx, 2, "created job %d to drop previous columns "+ - "and indexes.", *job.ID()) - childJobs = append(childJobs, job) + log.Infof(ctx, "created job %d to drop previous columns and indexes", *job.ID()) scDesc.MutationJobs = append(scDesc.MutationJobs, descpb.TableDescriptor_MutationJob{ MutationID: mutationID, JobID: *job.ID(), }) } - return childJobs, nil + return nil } // DeleteTableDescAndZoneConfig removes a table's descriptor and zone config from the KV database.