diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 60b60aadfe6a..9dd00aa5db41 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1621,9 +1621,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro if err := sql.DescsTxn(ctx, r.execCfg, publishDescriptors); err != nil { return err } - if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + + p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1728,9 +1727,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } // Reload the details as we may have updated the job. details = r.job.Details().(jobspb.RestoreDetails) - if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) if details.DescriptorCoverage == tree.AllDescriptors { // We restore the system tables from the main data bundle so late because it diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index ecfe30488f39..8e5ecb342120 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -192,7 +192,10 @@ func MakeRegistry( execCtx: execCtxFn, preventAdoptionFile: preventAdoptionFile, td: td, - adoptionCh: make(chan adoptionNotice), + // Use a non-zero buffer to allow queueing of notifications. + // The writing method will use a default case to avoid blocking + // if a notification is already queued. + adoptionCh: make(chan adoptionNotice, 1), } if knobs != nil { r.knobs = *knobs @@ -251,15 +254,11 @@ func (r *Registry) MakeJobID() jobspb.JobID { } // NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs. -func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error { +func (r *Registry) NotifyToAdoptJobs(context.Context) { select { case r.adoptionCh <- resumeClaimedJobs: - case <-r.stopper.ShouldQuiesce(): - return stop.ErrUnavailable - case <-ctx.Done(): - return ctx.Err() + default: } - return nil } // WaitForJobs waits for a given list of jobs to reach some sort @@ -344,9 +343,7 @@ func (r *Registry) Run( return nil } log.Infof(ctx, "scheduled jobs %+v", jobs) - if err := r.NotifyToAdoptJobs(ctx); err != nil { - return err - } + r.NotifyToAdoptJobs(ctx) err := r.WaitForJobs(ctx, ex, jobs) if err != nil { return err diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index defe3b3f7cc5..9e46860f93d6 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -218,6 +218,6 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro if fn := m.knobs.ManagerCreatedJobInterceptor; fn != nil { fn(job) } - err := m.jr.NotifyToAdoptJobs(ctx) - return true, err + m.jr.NotifyToAdoptJobs(ctx) + return true, nil } diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 6561b8bffd71..56514a6769e7 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -502,7 +502,8 @@ func startGCJob( return err } log.Infof(ctx, "starting GC job %d", jobID) - return jobRegistry.NotifyToAdoptJobs(ctx) + jobRegistry.NotifyToAdoptJobs(ctx) + return nil } func (sc *SchemaChanger) execLogTags() *logtags.Buffer { @@ -895,7 +896,8 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er return err } log.Infof(ctx, "starting GC job %d", gcJobID) - return sc.jobRegistry.NotifyToAdoptJobs(ctx) + sc.jobRegistry.NotifyToAdoptJobs(ctx) + return nil } // RunStateMachineBeforeBackfill moves the state machine forward @@ -1350,9 +1352,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { return err } // Notify the job registry to start jobs, in case we started any. - if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + sc.jobRegistry.NotifyToAdoptJobs(ctx) // If any operations was skipped because a mutation was made // redundant due to a column getting dropped later on then we should diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index 8f9b7b92a749..454eb68416af 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -138,10 +138,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ }); err != nil { return err } - err := execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) - if err != nil { - return err - } + execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) } // If no stages exist, then execute a singe transaction @@ -159,10 +156,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ if err != nil { return err } - err = execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) - if err != nil { - return err - } + execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) } return nil }