Skip to content

Commit

Permalink
jobs: Ensure schedules are cancelled when scheduler disabled.
Browse files Browse the repository at this point in the history
Ensure that currently executing schedules are cancelled immediately
when jobs scheduler disabled via the `jobs.scheduler.enabled` setting.

Fixes cockroachdb#77248

Release Note (enterprise change): Currently executing schedules are
cancelled immediately when jobs scheduler disabled.

Release Justification: stability improvement.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 2, 2022
1 parent 54492ec commit 3661ab1
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 4 deletions.
60 changes: 56 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -375,6 +376,54 @@ func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool {
return enabled
}

type syncCancelFunc struct {
syncutil.Mutex
context.CancelFunc
}

// newCancelWhenDisabled arranges for scheduler enabled setting callback to cancel
// currently executing context.
func newCancelWhenDisabled(sv *settings.Values) *syncCancelFunc {
sf := &syncCancelFunc{}
schedulerEnabledSetting.SetOnChange(sv, func(ctx context.Context) {
if !schedulerEnabledSetting.Get(sv) {
sf.Lock()
if sf.CancelFunc != nil {
sf.CancelFunc()
}
sf.Unlock()
}
})
return sf
}

// withCancelOnDisabled executes provided function with the context which will be cancelled
// if scheduler is disabled.
func (sf *syncCancelFunc) withCancelOnDisabled(
ctx context.Context, sv *settings.Values, f func(ctx context.Context) error,
) error {
ctx, cancel := func() (context.Context, context.CancelFunc) {
sf.Lock()
defer sf.Unlock()

ctx, cancel := context.WithCancel(ctx)
sf.CancelFunc = cancel

if !schedulerEnabledSetting.Get(sv) {
cancel()
}

return ctx, func() {
sf.Lock()
defer sf.Unlock()
cancel()
sf.CancelFunc = nil
}
}()
defer cancel()
return f(ctx)
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) {
initialDelay := getInitialScanDelay(s.TestingKnobs)
Expand All @@ -384,6 +433,8 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
log.Errorf(ctx, "error registering executor metrics: %+v", err)
}

whenDisabled := newCancelWhenDisabled(&s.Settings.SV)

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) {
select {
Expand All @@ -395,10 +446,11 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

maxSchedules := schedulerMaxJobsPerIterationSetting.Get(&s.Settings.SV)
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
if err != nil {
if err := whenDisabled.withCancelOnDisabled(ctx, &s.Settings.SV, func(ctx context.Context) error {
return s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
}); err != nil {
log.Errorf(ctx, "error executing schedules: %+v", err)
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,3 +826,84 @@ func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) {
})
}
}

type blockUntilCancelledExecutor struct {
started, done chan struct{}
}

var _ ScheduledJobExecutor = (*blockUntilCancelledExecutor)(nil)

func (e *blockUntilCancelledExecutor) ExecuteJob(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
txn *kv.Txn,
) error {
defer close(e.done)
close(e.started)
<-ctx.Done()
return ctx.Err()
}

func (e *blockUntilCancelledExecutor) NotifyJobTermination(
ctx context.Context,
jobID jobspb.JobID,
jobStatus Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
return nil
}

func (e *blockUntilCancelledExecutor) Metrics() metric.Struct {
return nil
}

func (e *blockUntilCancelledExecutor) GetCreateScheduleStatement(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
sj *ScheduledJob,
ex sqlutil.InternalExecutor,
) (string, error) {
return "", errors.AssertionFailedf("unexpected GetCreateScheduleStatement call")
}

func TestDisablingSchedulerCancelsSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const executorName = "block-until-cancelled-executor"
ex := &blockUntilCancelledExecutor{
started: make(chan struct{}),
done: make(chan struct{}),
}
defer registerScopedScheduledJobExecutor(executorName, ex)()

knobs := base.TestingKnobs{
JobsTestingKnobs: fastDaemonKnobs(overridePaceSetting(10 * time.Millisecond)),
}
ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{Knobs: knobs})
defer ts.Stopper().Stop(context.Background())

// Create schedule which blocks until its context cancelled due to disabled scheduler.
// We only need to create one schedule. This is because
// scheduler executes its batch of schedules sequentially, and so, creating more
// than one doesn't change anything since we block.
schedule := NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv)
schedule.SetScheduleLabel("test schedule")
schedule.SetOwner(security.TestUserName())
schedule.SetNextRun(timeutil.Now())
schedule.SetExecutionDetails(executorName, jobspb.ExecutionArguments{})
require.NoError(t, schedule.Create(
context.Background(), ts.InternalExecutor().(sqlutil.InternalExecutor), nil))

<-ex.started
// Disable scheduler and verify all running schedules were cancelled.
schedulerEnabledSetting.Override(context.Background(), &ts.ClusterSettings().SV, false)
<-ex.done
}

0 comments on commit 3661ab1

Please sign in to comment.