Skip to content

Commit

Permalink
init scheduler when disabling coordinator
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Sep 5, 2023
1 parent d300c88 commit ca20f98
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
27 changes: 16 additions & 11 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func (c *Coordinator) Run() {
}
}
log.Info("Coordinator starts to run schedulers")
c.initSchedulers()
c.InitSchedulers(true)

c.wg.Add(4)
// Starts to patrol regions.
Expand All @@ -391,7 +391,8 @@ func (c *Coordinator) Run() {
go c.driveSlowNodeScheduler()
}

func (c *Coordinator) initSchedulers() {
// InitSchedulers initializes schedulers.
func (c *Coordinator) InitSchedulers(needRun bool) {
var (
scheduleNames []string
configs []string
Expand All @@ -401,7 +402,7 @@ func (c *Coordinator) initSchedulers() {
scheduleNames, configs, err = c.cluster.GetStorage().LoadAllScheduleConfig()
select {
case <-c.ctx.Done():
log.Info("Coordinator stops running")
log.Info("init schedulers has been stopped")
return
default:
}
Expand Down Expand Up @@ -439,8 +440,10 @@ func (c *Coordinator) initSchedulers() {
continue
}
log.Info("create scheduler with independent configuration", zap.String("scheduler-name", s.GetName()))
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
if needRun {
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can not add scheduler with independent configuration", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", cfg.Args), errs.ZapError(err))
}
}
}

Expand All @@ -461,12 +464,14 @@ func (c *Coordinator) initSchedulers() {
}

log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args))
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
if needRun {
if err = c.schedulers.AddScheduler(s, schedulerCfg.Args...); err != nil && !errors.ErrorEqual(err, errs.ErrSchedulerExisted.FastGenByArgs()) {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", schedulerCfg.Args), errs.ZapError(err))
} else {
// Only records the valid scheduler config.
scheduleCfg.Schedulers[k] = schedulerCfg
k++
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ func (c *RaftCluster) Start(s Server) error {
go c.runCoordinator()
go c.runStatsBackgroundJobs()
go c.runMetricsCollectionJob()
} else {
c.initSchedulers()
}
c.wg.Add(7)
go c.runNodeStateCheckJob()
Expand Down Expand Up @@ -849,6 +851,10 @@ func (c *RaftCluster) GetOpts() sc.ConfProvider {
return c.opt
}

func (c *RaftCluster) initSchedulers() {
c.coordinator.InitSchedulers(false)
}

// GetScheduleConfig returns scheduling configurations.
func (c *RaftCluster) GetScheduleConfig() *sc.ScheduleConfig {
return c.opt.GetScheduleConfig()
Expand Down
35 changes: 26 additions & 9 deletions server/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,18 @@ func (h *Handler) AddScheduler(name string, args ...string) error {
return err
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args))
if err = c.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
} else if err = h.opt.Persist(c.GetStorage()); err != nil {
if !h.s.IsAPIServiceMode() {
if err = c.AddScheduler(s, args...); err != nil {
log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Strings("scheduler-args", args), errs.ZapError(err))
return err
}
}
if err = h.opt.Persist(c.GetStorage()); err != nil {
log.Error("can not persist scheduler config", errs.ZapError(err))
} else {
log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
return err
}
return err
log.Info("add scheduler successfully", zap.String("scheduler-name", name), zap.Strings("scheduler-args", args))
return nil
}

// RemoveScheduler removes a scheduler by name.
Expand All @@ -252,10 +256,23 @@ func (h *Handler) RemoveScheduler(name string) error {
if err != nil {
return err
}
if err = c.RemoveScheduler(name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
if !h.s.IsAPIServiceMode() {
if err = c.RemoveScheduler(name); err != nil {
log.Error("can not remove scheduler", zap.String("scheduler-name", name), errs.ZapError(err))
} else {
log.Info("remove scheduler successfully", zap.String("scheduler-name", name))
}
} else {
log.Info("remove scheduler successfully", zap.String("scheduler-name", name))
conf := c.GetSchedulerConfig()
if err := conf.Persist(c.GetStorage()); err != nil {
log.Error("the option can not persist scheduler config", errs.ZapError(err))
return err
}

if err := c.GetStorage().RemoveScheduleConfig(name); err != nil {
log.Error("can not remove the scheduler config", errs.ZapError(err))
return err
}
}
return err
}
Expand Down
9 changes: 1 addition & 8 deletions tests/integrations/mcs/scheduling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,10 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() {
re.NoError(err)
// Get all default scheduler names.
var (
schedulerNames []string
schedulerController = suite.pdLeaderServer.GetRaftCluster().GetCoordinator().GetSchedulersController()
schedulerNames, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllScheduleConfig()
)
testutil.Eventually(re, func() bool {
schedulerNames = schedulerController.GetSchedulerNames()
targetCount := len(sc.DefaultSchedulers)
// In the previous case, StoreConfig of raft-kv2 has been persisted. So, it might
// have EvictSlowTrendName.
if exists, _ := schedulerController.IsSchedulerExisted(schedulers.EvictSlowTrendName); exists {
targetCount += 1
}
return len(schedulerNames) == targetCount
})
// Check all default schedulers' configs.
Expand Down

0 comments on commit ca20f98

Please sign in to comment.