Skip to content

Commit

Permalink
jobs: don't block on notify
Browse files Browse the repository at this point in the history
We have a method to notify the registry to go scan for jobs. There was no
reason for it to block. It did. This commit makes it not block without
meaningfully changing the semantics.

There are better improvements to be had by making the jobs subsystem much
more targeted, but this is some serious bang for its buck in terms of speedup
vs. lines changed.

Release note (performance improvement): Creating many schema changes in
parallel now runs faster due to improved concurrency notifying the jobs
subsystem.
  • Loading branch information
ajwerner committed Oct 25, 2021
1 parent 31ccb1c commit a41f506
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 31 deletions.
9 changes: 3 additions & 6 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1621,9 +1621,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
if err := sql.DescsTxn(ctx, r.execCfg, publishDescriptors); err != nil {
return err
}
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}

p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil {
if err := fn(); err != nil {
return err
Expand Down Expand Up @@ -1728,9 +1727,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)
if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)

if details.DescriptorCoverage == tree.AllDescriptors {
// We restore the system tables from the main data bundle so late because it
Expand Down
17 changes: 7 additions & 10 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func MakeRegistry(
execCtx: execCtxFn,
preventAdoptionFile: preventAdoptionFile,
td: td,
adoptionCh: make(chan adoptionNotice),
// Use a non-zero buffer to allow queueing of notifications.
// The writing method will use a default case to avoid blocking
// if a notification is already queued.
adoptionCh: make(chan adoptionNotice, 1),
}
if knobs != nil {
r.knobs = *knobs
Expand Down Expand Up @@ -251,15 +254,11 @@ func (r *Registry) MakeJobID() jobspb.JobID {
}

// NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs.
func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error {
func (r *Registry) NotifyToAdoptJobs(context.Context) {
select {
case r.adoptionCh <- resumeClaimedJobs:
case <-r.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
}

// WaitForJobs waits for a given list of jobs to reach some sort
Expand Down Expand Up @@ -344,9 +343,7 @@ func (r *Registry) Run(
return nil
}
log.Infof(ctx, "scheduled jobs %+v", jobs)
if err := r.NotifyToAdoptJobs(ctx); err != nil {
return err
}
r.NotifyToAdoptJobs(ctx)
err := r.WaitForJobs(ctx, ex, jobs)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/spanconfig/spanconfigmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,6 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro
if fn := m.knobs.ManagerCreatedJobInterceptor; fn != nil {
fn(job)
}
err := m.jr.NotifyToAdoptJobs(ctx)
return true, err
m.jr.NotifyToAdoptJobs(ctx)
return true, nil
}
10 changes: 5 additions & 5 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ func startGCJob(
return err
}
log.Infof(ctx, "starting GC job %d", jobID)
return jobRegistry.NotifyToAdoptJobs(ctx)
jobRegistry.NotifyToAdoptJobs(ctx)
return nil
}

func (sc *SchemaChanger) execLogTags() *logtags.Buffer {
Expand Down Expand Up @@ -895,7 +896,8 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er
return err
}
log.Infof(ctx, "starting GC job %d", gcJobID)
return sc.jobRegistry.NotifyToAdoptJobs(ctx)
sc.jobRegistry.NotifyToAdoptJobs(ctx)
return nil
}

// RunStateMachineBeforeBackfill moves the state machine forward
Expand Down Expand Up @@ -1350,9 +1352,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
return err
}
// Notify the job registry to start jobs, in case we started any.
if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil {
return err
}
sc.jobRegistry.NotifyToAdoptJobs(ctx)

// If any operations was skipped because a mutation was made
// redundant due to a column getting dropped later on then we should
Expand Down
10 changes: 2 additions & 8 deletions pkg/sql/schemachanger/scjob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{
}); err != nil {
return err
}
err := execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if err != nil {
return err
}
execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
}

// If no stages exist, then execute a singe transaction
Expand All @@ -159,10 +156,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{
if err != nil {
return err
}
err = execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
if err != nil {
return err
}
execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx)
}
return nil
}
Expand Down

0 comments on commit a41f506

Please sign in to comment.