Skip to content

Commit

Permalink
server,jobs: remove the logic to run the scheduler on 1 node
Browse files Browse the repository at this point in the history
- The mechanism to limit the scheduler execution to 1 node
  was only effective on KV nodes, with direct access to
  the leaseholder information for the liveness table.

  However, we want uniform behavior in a multitenant environment, so a
  mechanism specific to KV nodes is not acceptable any more.

- Since this was first implemented, we've found the job scheduler
  not to be disruptive any more, so it's now safe to run it on
  every node.

Release note: None
  • Loading branch information
knz committed Dec 19, 2022
1 parent 93ed655 commit 388dee7
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 109 deletions.
38 changes: 3 additions & 35 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,30 +351,6 @@ func (s *jobScheduler) executeSchedules(ctx context.Context, maxSchedules int64)
return err
}

// An internal, safety valve setting to revert scheduler execution to distributed mode.
// This setting should be removed once scheduled job system no longer locks tables for excessive
// periods of time.
var schedulerRunsOnSingleNode = settings.RegisterBoolSetting(
settings.TenantReadOnly,
"jobs.scheduler.single_node_scheduler.enabled",
"execute scheduler on a single node in a cluster",
false,
)

func (s *jobScheduler) schedulerEnabledOnThisNode(ctx context.Context) bool {
if s.ShouldRunScheduler == nil || !schedulerRunsOnSingleNode.Get(&s.Settings.SV) {
return true
}

enabled, err := s.ShouldRunScheduler(ctx, s.DB.Clock().NowAsClockTimestamp())
if err != nil {
log.Errorf(ctx, "error determining if the scheduler enabled: %v; will recheck after %s",
err, recheckEnabledAfterPeriod)
return false
}
return enabled
}

type syncCancelFunc struct {
syncutil.Mutex
context.CancelFunc
Expand Down Expand Up @@ -438,12 +414,12 @@ func (s *jobScheduler) runDaemon(ctx context.Context, stopper *stop.Stopper) {
whenDisabled := newCancelWhenDisabled(&s.Settings.SV)

for timer := time.NewTimer(initialDelay); ; timer.Reset(
getWaitPeriod(ctx, &s.Settings.SV, s.schedulerEnabledOnThisNode, jitter, s.TestingKnobs)) {
getWaitPeriod(ctx, &s.Settings.SV, jitter, s.TestingKnobs)) {
select {
case <-stopper.ShouldQuiesce():
return
case <-timer.C:
if !schedulerEnabledSetting.Get(&s.Settings.SV) || !s.schedulerEnabledOnThisNode(ctx) {
if !schedulerEnabledSetting.Get(&s.Settings.SV) {
continue
}

Expand Down Expand Up @@ -508,11 +484,7 @@ type jitterFn func(duration time.Duration) time.Duration

// Returns duration to wait before scanning system.scheduled_jobs.
func getWaitPeriod(
ctx context.Context,
sv *settings.Values,
enabledOnThisNode func(ctx context.Context) bool,
jitter jitterFn,
knobs base.ModuleTestingKnobs,
ctx context.Context, sv *settings.Values, jitter jitterFn, knobs base.ModuleTestingKnobs,
) time.Duration {
if k, ok := knobs.(*TestingKnobs); ok && k.SchedulerDaemonScanDelay != nil {
return k.SchedulerDaemonScanDelay()
Expand All @@ -522,10 +494,6 @@ func getWaitPeriod(
return recheckEnabledAfterPeriod
}

if enabledOnThisNode != nil && !enabledOnThisNode(ctx) {
return recheckEnabledAfterPeriod
}

pace := schedulerPaceSetting.Get(sv)
if pace < minPacePeriod {
if warnIfPaceTooLow.ShouldLog() {
Expand Down
72 changes: 4 additions & 68 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/types"
"github.com/robfig/cron/v3"
cron "github.com/robfig/cron/v3"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -215,23 +214,22 @@ func TestJobSchedulerDaemonGetWaitPeriod(t *testing.T) {
sv, cleanup := getScopedSettings()
defer cleanup()

var schedulerEnabled func(context.Context) bool
noJitter := func(d time.Duration) time.Duration { return d }

schedulerEnabledSetting.Override(ctx, sv, false)

// When disabled, we wait 5 minutes before rechecking.
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))
require.EqualValues(t, 5*time.Minute, getWaitPeriod(ctx, sv, noJitter, nil))
schedulerEnabledSetting.Override(ctx, sv, true)

// When pace is too low, we use something more reasonable.
schedulerPaceSetting.Override(ctx, sv, time.Nanosecond)
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))
require.EqualValues(t, minPacePeriod, getWaitPeriod(ctx, sv, noJitter, nil))

// Otherwise, we use user specified setting.
pace := 42 * time.Second
schedulerPaceSetting.Override(ctx, sv, pace)
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, schedulerEnabled, noJitter, nil))
require.EqualValues(t, pace, getWaitPeriod(ctx, sv, noJitter, nil))
}

type recordScheduleExecutor struct {
Expand Down Expand Up @@ -755,68 +753,6 @@ INSERT INTO defaultdb.foo VALUES(1, 1)
require.Equal(t, "", updated.ScheduleStatus())
}

func TestSchedulerCanBeRestrictedToSingleNode(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numNodes = 3
for _, enableSingleNode := range []bool{true, false} {
t.Run(fmt.Sprintf("runs-on-single-node=%t", enableSingleNode), func(t *testing.T) {
schedulers := struct {
syncutil.Mutex
schedulers []*jobScheduler
}{}
knobs := &TestingKnobs{
CaptureJobScheduler: func(s interface{}) {
schedulers.Lock()
defer schedulers.Unlock()
schedulers.schedulers = append(schedulers.schedulers, s.(*jobScheduler))
},
}

args := base.TestServerArgs{
Knobs: base.TestingKnobs{JobsTestingKnobs: knobs},
}

tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ServerArgs: args})
defer tc.Stopper().Stop(context.Background())

testutils.SucceedsSoon(t, func() error {
schedulers.Lock()
defer schedulers.Unlock()
if len(schedulers.schedulers) == numNodes {
return nil
}
return errors.Newf("want %d schedules, got %d", numNodes, len(schedulers.schedulers))
})

sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))
sqlDB.Exec(t, "SET CLUSTER SETTING jobs.scheduler.single_node_scheduler.enabled=$1", enableSingleNode)

schedulers.Lock()
defer schedulers.Unlock()
expectedEnabled := numNodes
if enableSingleNode {
expectedEnabled = 1
}

testutils.SucceedsSoon(t, func() error {
numEnabled := 0
for _, s := range schedulers.schedulers {
if s.schedulerEnabledOnThisNode(context.Background()) {
numEnabled++
}
}
if numEnabled == expectedEnabled {
return nil
}
return errors.Newf("expecting %d enabled, found %d", expectedEnabled, numEnabled)
})

})
}
}

type blockUntilCancelledExecutor struct {
once sync.Once
started, done chan struct{}
Expand Down
6 changes: 0 additions & 6 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,12 +1511,6 @@ func (s *SQLServer) preStart(
sessiondatapb.SessionData{},
)
},
ShouldRunScheduler: func(ctx context.Context, ts hlc.ClockTimestamp) (bool, error) {
if s.execCfg.Codec.ForSystemTenant() {
return s.isMeta1Leaseholder(ctx, ts)
}
return true, nil
},
},
scheduledjobs.ProdJobSchedulerEnv,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ var retiredSettings = map[string]struct{}{
"sql.distsql.flow_scheduler_queueing.enabled": {},
"sql.distsql.drain.cancel_after_wait.enabled": {},
"changefeed.active_protected_timestamps.enabled": {},
"jobs.scheduler.single_node_scheduler.enabled": {},
}

// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
Expand Down

0 comments on commit 388dee7

Please sign in to comment.