From a41f506b7f53545e7cc9e671463c7d11f3785a97 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Sun, 24 Oct 2021 21:17:11 -0400 Subject: [PATCH] jobs: don't block on notify We have a method to notify the registry to go scan for jobs. There was no reason for it to block. It did. This commit makes it not block without meaningfully changing the semantics. There are better improvements to be had by making the jobs subsystem much more targeted, but this is some serious bang for its buck in terms of speedup vs. lines changed. Release note (performance improvement): Creating many schema changes in parallel now runs faster due to improved concurrency notifying the jobs subsystem. --- pkg/ccl/backupccl/restore_job.go | 9 +++------ pkg/jobs/registry.go | 17 +++++++---------- pkg/spanconfig/spanconfigmanager/manager.go | 4 ++-- pkg/sql/schema_changer.go | 10 +++++----- pkg/sql/schemachanger/scjob/job.go | 10 ++-------- 5 files changed, 19 insertions(+), 31 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 60b60aadfe6a..9dd00aa5db41 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1621,9 +1621,8 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro if err := sql.DescsTxn(ctx, r.execCfg, publishDescriptors); err != nil { return err } - if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + + p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) if fn := r.testingKnobs.afterPublishingDescriptors; fn != nil { if err := fn(); err != nil { return err @@ -1728,9 +1727,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } // Reload the details as we may have updated the job. details = r.job.Details().(jobspb.RestoreDetails) - if err := p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + p.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) if details.DescriptorCoverage == tree.AllDescriptors { // We restore the system tables from the main data bundle so late because it diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index ecfe30488f39..8e5ecb342120 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -192,7 +192,10 @@ func MakeRegistry( execCtx: execCtxFn, preventAdoptionFile: preventAdoptionFile, td: td, - adoptionCh: make(chan adoptionNotice), + // Use a non-zero buffer to allow queueing of notifications. + // The writing method will use a default case to avoid blocking + // if a notification is already queued. + adoptionCh: make(chan adoptionNotice, 1), } if knobs != nil { r.knobs = *knobs @@ -251,15 +254,11 @@ func (r *Registry) MakeJobID() jobspb.JobID { } // NotifyToAdoptJobs notifies the job adoption loop to start claimed jobs. -func (r *Registry) NotifyToAdoptJobs(ctx context.Context) error { +func (r *Registry) NotifyToAdoptJobs(context.Context) { select { case r.adoptionCh <- resumeClaimedJobs: - case <-r.stopper.ShouldQuiesce(): - return stop.ErrUnavailable - case <-ctx.Done(): - return ctx.Err() + default: } - return nil } // WaitForJobs waits for a given list of jobs to reach some sort @@ -344,9 +343,7 @@ func (r *Registry) Run( return nil } log.Infof(ctx, "scheduled jobs %+v", jobs) - if err := r.NotifyToAdoptJobs(ctx); err != nil { - return err - } + r.NotifyToAdoptJobs(ctx) err := r.WaitForJobs(ctx, ex, jobs) if err != nil { return err diff --git a/pkg/spanconfig/spanconfigmanager/manager.go b/pkg/spanconfig/spanconfigmanager/manager.go index defe3b3f7cc5..9e46860f93d6 100644 --- a/pkg/spanconfig/spanconfigmanager/manager.go +++ b/pkg/spanconfig/spanconfigmanager/manager.go @@ -218,6 +218,6 @@ func (m *Manager) createAndStartJobIfNoneExists(ctx context.Context) (bool, erro if fn := m.knobs.ManagerCreatedJobInterceptor; fn != nil { fn(job) } - err := m.jr.NotifyToAdoptJobs(ctx) - return true, err + m.jr.NotifyToAdoptJobs(ctx) + return true, nil } diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 6561b8bffd71..56514a6769e7 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -502,7 +502,8 @@ func startGCJob( return err } log.Infof(ctx, "starting GC job %d", jobID) - return jobRegistry.NotifyToAdoptJobs(ctx) + jobRegistry.NotifyToAdoptJobs(ctx) + return nil } func (sc *SchemaChanger) execLogTags() *logtags.Buffer { @@ -895,7 +896,8 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er return err } log.Infof(ctx, "starting GC job %d", gcJobID) - return sc.jobRegistry.NotifyToAdoptJobs(ctx) + sc.jobRegistry.NotifyToAdoptJobs(ctx) + return nil } // RunStateMachineBeforeBackfill moves the state machine forward @@ -1350,9 +1352,7 @@ func (sc *SchemaChanger) done(ctx context.Context) error { return err } // Notify the job registry to start jobs, in case we started any. - if err := sc.jobRegistry.NotifyToAdoptJobs(ctx); err != nil { - return err - } + sc.jobRegistry.NotifyToAdoptJobs(ctx) // If any operations was skipped because a mutation was made // redundant due to a column getting dropped later on then we should diff --git a/pkg/sql/schemachanger/scjob/job.go b/pkg/sql/schemachanger/scjob/job.go index 8f9b7b92a749..454eb68416af 100644 --- a/pkg/sql/schemachanger/scjob/job.go +++ b/pkg/sql/schemachanger/scjob/job.go @@ -138,10 +138,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ }); err != nil { return err } - err := execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) - if err != nil { - return err - } + execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) } // If no stages exist, then execute a singe transaction @@ -159,10 +156,7 @@ func (n *newSchemaChangeResumer) Resume(ctx context.Context, execCtxI interface{ if err != nil { return err } - err = execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) - if err != nil { - return err - } + execCtx.ExecCfg().JobRegistry.NotifyToAdoptJobs(ctx) } return nil }