diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 918d1a64faf4..84c13395435f 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -361,7 +361,7 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) { } for timer := time.NewTimer(initialDelay); ; timer.Reset( - getWaitPeriod(&s.Settings.SV, s.TestingKnobs)) { + getWaitPeriod(ctx, &s.Settings.SV, s.TestingKnobs)) { select { case <-stopper.ShouldQuiesce(): return @@ -421,7 +421,9 @@ const recheckEnabledAfterPeriod = 5 * time.Minute var warnIfPaceTooLow = log.Every(time.Minute) // Returns duration to wait before scanning system.scheduled_jobs. -func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Duration { +func getWaitPeriod( + ctx context.Context, sv *settings.Values, knobs base.ModuleTestingKnobs, +) time.Duration { if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil { return k.SchedulerDaemonScanDelay() } @@ -433,7 +435,7 @@ func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Dura pace := schedulerPaceSetting.Get(sv) if pace < minPacePeriod { if warnIfPaceTooLow.ShouldLog() { - log.Warningf(context.Background(), + log.Warningf(ctx, "job.scheduler.pace setting too low (%s < %s)", pace, minPacePeriod) } pace = minPacePeriod diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 89de07815a85..8936af22a2d6 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -217,17 +217,17 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) { schedulerEnabledSetting.Override(ctx, sv, false) // When disabled, we wait 5 minutes before rechecking. - require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil)) + require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, nil)) schedulerEnabledSetting.Override(ctx, sv, true) // When pace is too low, we use something more reasonable. schedulerPaceSetting.Override(ctx, sv, time.Nanosecond) - require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil)) + require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, nil)) // Otherwise, we use user specified setting. pace := 42 * time.Second schedulerPaceSetting.Override(ctx, sv, pace) - require.EqualValues(t, pace, getWaitPeriod(sv, nil)) + require.EqualValues(t, pace, getWaitPeriod(ctx, sv, nil)) } type recordScheduleExecutor struct { diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 80249d35344c..991a93218d00 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -90,6 +90,8 @@ const ( // node simply behaves as though its leniency period is 0. Epoch-based // nodes will see time-based nodes delay the act of stealing a job. type Registry struct { + serverCtx context.Context + ac log.AmbientContext stopper *stop.Stopper db *kv.DB @@ -166,6 +168,7 @@ const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS" // sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be // coerced into that in the Resumer functions. func MakeRegistry( + ctx context.Context, ac log.AmbientContext, stopper *stop.Stopper, clock *hlc.Clock, @@ -181,6 +184,7 @@ func MakeRegistry( knobs *TestingKnobs, ) *Registry { r := &Registry{ + serverCtx: ctx, ac: ac, stopper: stopper, clock: clock, @@ -245,7 +249,7 @@ func (r *Registry) ID() base.SQLInstanceID { // makeCtx returns a new context from r's ambient context and an associated // cancel func. func (r *Registry) makeCtx() (context.Context, func()) { - return context.WithCancel(r.ac.AnnotateCtx(context.Background())) + return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx)) } // MakeJobID generates a new job ID. @@ -825,7 +829,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { } }) - if err := stopper.RunAsyncTask(context.Background(), "jobs/cancel", func(ctx context.Context) { + if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -849,7 +853,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }); err != nil { return err } - if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) { + if err := stopper.RunAsyncTask(ctx, "jobs/gc", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -882,7 +886,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { }); err != nil { return err } - return stopper.RunAsyncTask(context.Background(), "jobs/adopt", func(ctx context.Context) { + return stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index e30402b757e0..052f202f89c4 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -375,6 +375,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { td := tracedumper.NewTraceDumper(ctx, cfg.InflightTraceDirName, cfg.Settings) *jobRegistry = *jobs.MakeRegistry( + ctx, cfg.AmbientCtx, cfg.stopper, cfg.clock,