Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72605: jobs: hang  the job registry off the server context r=miretskiy a=knz

Informs cockroachdb#58938.

Prior to this patch, log messages related to jobs were disconnected
from the server context and were thus missing the node ID and other
log tags.

This patch fixes it.

Release note: None

Co-authored-by: Raphael 'kena' Poss <[email protected]>
  • Loading branch information
craig[bot] and knz committed Nov 10, 2021
2 parents 598e241 + 3ae7174 commit a539b38
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
8 changes: 5 additions & 3 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(&s.Settings.SV, s.TestingKnobs)) {
getWaitPeriod(ctx, &s.Settings.SV, s.TestingKnobs)) {
select {
case <-stopper.ShouldQuiesce():
return
Expand Down Expand Up @@ -421,7 +421,9 @@ const recheckEnabledAfterPeriod = 5 * time.Minute
var warnIfPaceTooLow = log.Every(time.Minute)

// Returns duration to wait before scanning system.scheduled_jobs.
func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Duration {
func getWaitPeriod(
ctx context.Context, sv *settings.Values, knobs base.ModuleTestingKnobs,
) time.Duration {
if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil {
return k.SchedulerDaemonScanDelay()
}
Expand All @@ -433,7 +435,7 @@ func getWaitPeriod(sv *settings.Values, knobs base.ModuleTestingKnobs) time.Dura
pace := schedulerPaceSetting.Get(sv)
if pace < minPacePeriod {
if warnIfPaceTooLow.ShouldLog() {
log.Warningf(context.Background(),
log.Warningf(ctx,
"job.scheduler.pace setting too low (%s < %s)", pace, minPacePeriod)
}
pace = minPacePeriod
Expand Down
6 changes: 3 additions & 3 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,17 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
schedulerEnabledSetting.Override(ctx, sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.EqualValues(t, 5*time.Minute, getWaitPeriod(sv, nil))
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, nil))
schedulerEnabledSetting.Override(ctx, sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(ctx, sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(sv, nil))
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, nil))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(ctx, sv, pace)
require.EqualValues(t, pace, getWaitPeriod(sv, nil))
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, nil))
}

type recordScheduleExecutor struct {
Expand Down
12 changes: 8 additions & 4 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ const (
// node simply behaves as though its leniency period is 0. Epoch-based
// nodes will see time-based nodes delay the act of stealing a job.
type Registry struct {
serverCtx context.Context

ac log.AmbientContext
stopper *stop.Stopper
db *kv.DB
Expand Down Expand Up @@ -166,6 +168,7 @@ const PreventAdoptionFile = "DISABLE_STARTING_BACKGROUND_JOBS"
// sql.newInternalPlanner. It returns a sql.JobExecCtx, but must be
// coerced into that in the Resumer functions.
func MakeRegistry(
ctx context.Context,
ac log.AmbientContext,
stopper *stop.Stopper,
clock *hlc.Clock,
Expand All @@ -181,6 +184,7 @@ func MakeRegistry(
knobs *TestingKnobs,
) *Registry {
r := &Registry{
serverCtx: ctx,
ac: ac,
stopper: stopper,
clock: clock,
Expand Down Expand Up @@ -245,7 +249,7 @@ func (r *Registry) ID() base.SQLInstanceID {
// makeCtx returns a new context from r's ambient context and an associated
// cancel func.
func (r *Registry) makeCtx() (context.Context, func()) {
return context.WithCancel(r.ac.AnnotateCtx(context.Background()))
return context.WithCancel(r.ac.AnnotateCtx(r.serverCtx))
}

// MakeJobID generates a new job ID.
Expand Down Expand Up @@ -825,7 +829,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}
})

if err := stopper.RunAsyncTask(context.Background(), "jobs/cancel", func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "jobs/cancel", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

Expand All @@ -849,7 +853,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}); err != nil {
return err
}
if err := stopper.RunAsyncTask(context.Background(), "jobs/gc", func(ctx context.Context) {
if err := stopper.RunAsyncTask(ctx, "jobs/gc", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

Expand Down Expand Up @@ -882,7 +886,7 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
}); err != nil {
return err
}
return stopper.RunAsyncTask(context.Background(), "jobs/adopt", func(ctx context.Context) {
return stopper.RunAsyncTask(ctx, "jobs/adopt", func(ctx context.Context) {
ctx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()
lc, cleanup := makeLoopController(r.settings, adoptIntervalSetting, r.knobs.IntervalOverrides.Adopt)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {

td := tracedumper.NewTraceDumper(ctx, cfg.InflightTraceDirName, cfg.Settings)
*jobRegistry = *jobs.MakeRegistry(
ctx,
cfg.AmbientCtx,
cfg.stopper,
cfg.clock,
Expand Down

0 comments on commit a539b38

Please sign in to comment.