Skip to content

Commit

Permalink
Merge pull request #77313 from miretskiy/backport21.2-77306
Browse files Browse the repository at this point in the history
release-21.2: jobs: Ensure schedules are cancelled when scheduler disabled.
  • Loading branch information
miretskiy authored Mar 7, 2022
2 parents c72adc7 + 1d0d2e2 commit be313aa
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 4 deletions.
60 changes: 56 additions & 4 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -345,6 +346,54 @@ func (s *jobScheduler) executeSchedules(
return err
}

type syncCancelFunc struct {
syncutil.Mutex
context.CancelFunc
}

// newCancelWhenDisabled arranges for scheduler enabled setting callback to cancel
// currently executing context.
func newCancelWhenDisabled(sv *settings.Values) *syncCancelFunc {
sf := &syncCancelFunc{}
schedulerEnabledSetting.SetOnChange(sv, func(ctx context.Context) {
if !schedulerEnabledSetting.Get(sv) {
sf.Lock()
if sf.CancelFunc != nil {
sf.CancelFunc()
}
sf.Unlock()
}
})
return sf
}

// withCancelOnDisabled executes provided function with the context which will be canceled
// if scheduler is disabled.
func (sf *syncCancelFunc) withCancelOnDisabled(
ctx context.Context, sv *settings.Values, f func(ctx context.Context) error,
) error {
ctx, cancel := func() (context.Context, context.CancelFunc) {
sf.Lock()
defer sf.Unlock()

cancellableCtx, cancel := context.WithCancel(ctx)
sf.CancelFunc = cancel

if !schedulerEnabledSetting.Get(sv) {
cancel()
}

return cancellableCtx, func() {
sf.Lock()
defer sf.Unlock()
cancel()
sf.CancelFunc = nil
}
}()
defer cancel()
return f(ctx)
}

func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "job-scheduler", func(ctx context.Context) {
initialDelay := getInitialScanDelay(s.TestingKnobs)
Expand All @@ -354,6 +403,8 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
log.Errorf(ctx, "error registering executor metrics: %+v", err)
}

whenDisabled := newCancelWhenDisabled(&s.Settings.SV)

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) {
select {
Expand All @@ -366,10 +417,11 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
}

maxSchedules := schedulerMaxJobsPerIterationSetting.Get(&s.Settings.SV)
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
if err != nil {
if err := whenDisabled.withCancelOnDisabled(ctx, &s.Settings.SV, func(ctx context.Context) error {
return s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
return s.executeSchedules(ctx, maxSchedules, txn)
})
}); err != nil {
log.Errorf(ctx, "error executing schedules: %+v", err)
}

Expand Down
81 changes: 81 additions & 0 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,84 @@ INSERT INTO defaultdb.foo VALUES(1, 1)
updated := h.loadSchedule(t, schedule.ScheduleID())
require.Equal(t, "", updated.ScheduleStatus())
}

type blockUntilCancelledExecutor struct {
started, done chan struct{}
}

var _ ScheduledJobExecutor = (*blockUntilCancelledExecutor)(nil)

func (e *blockUntilCancelledExecutor) ExecuteJob(
ctx context.Context,
cfg *scheduledjobs.JobExecutionConfig,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
txn *kv.Txn,
) error {
defer close(e.done)
close(e.started)
<-ctx.Done()
return ctx.Err()
}

func (e *blockUntilCancelledExecutor) NotifyJobTermination(
ctx context.Context,
jobID jobspb.JobID,
jobStatus Status,
details jobspb.Details,
env scheduledjobs.JobSchedulerEnv,
schedule *ScheduledJob,
ex sqlutil.InternalExecutor,
txn *kv.Txn,
) error {
return nil
}

func (e *blockUntilCancelledExecutor) Metrics() metric.Struct {
return nil
}

func (e *blockUntilCancelledExecutor) GetCreateScheduleStatement(
ctx context.Context,
env scheduledjobs.JobSchedulerEnv,
txn *kv.Txn,
sj *ScheduledJob,
ex sqlutil.InternalExecutor,
) (string, error) {
return "", errors.AssertionFailedf("unexpected GetCreateScheduleStatement call")
}

func TestDisablingSchedulerCancelsSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const executorName = "block-until-canceled-executor"
ex := &blockUntilCancelledExecutor{
started: make(chan struct{}),
done: make(chan struct{}),
}
defer registerScopedScheduledJobExecutor(executorName, ex)()

knobs := base.TestingKnobs{
JobsTestingKnobs: fastDaemonKnobs(overridePaceSetting(10 * time.Millisecond)),
}
ts, _, _ := serverutils.StartServer(t, base.TestServerArgs{Knobs: knobs})
defer ts.Stopper().Stop(context.Background())

// Create schedule which blocks until its context canceled due to disabled scheduler.
// We only need to create one schedule. This is because
// scheduler executes its batch of schedules sequentially, and so, creating more
// than one doesn't change anything since we block.
schedule := NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv)
schedule.SetScheduleLabel("test schedule")
schedule.SetOwner(security.TestUserName())
schedule.SetNextRun(timeutil.Now())
schedule.SetExecutionDetails(executorName, jobspb.ExecutionArguments{})
require.NoError(t, schedule.Create(
context.Background(), ts.InternalExecutor().(sqlutil.InternalExecutor), nil))

<-ex.started
// Disable scheduler and verify all running schedules were canceled.
schedulerEnabledSetting.Override(context.Background(), &ts.ClusterSettings().SV, false)
<-ex.done
}

0 comments on commit be313aa

Please sign in to comment.