Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: individually check the scheduling halt for online unsafe recovery (#8147) #8155

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,3 +615,10 @@ func (c *Cluster) DropCacheAllRegion() {
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
func (c *Cluster) IsSchedulingHalted() bool {
return c.persistConfig.IsSchedulingHalted()
}
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration)
o.SetScheduleConfig(v)
}

// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
// TODO: support this metrics for the scheduling service in the future.
func (*PersistConfig) SetSchedulingAllowanceStatus(bool, string) {}

// SetHaltScheduling set HaltScheduling.
func (o *PersistConfig) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (s *Service) AskBatchSplit(ctx context.Context, request *schedulingpb.AskBa
}, nil
}

if c.persistConfig.IsSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.persistConfig.IsTikvRegionSplitEnabled() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func IsSchedulerRegistered(name string) bool {
type SchedulerConfigProvider interface {
SharedConfigProvider

IsSchedulingHalted() bool
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.isSchedulingHalted() {
if c.cluster.IsSchedulingHalted() {
continue
}

Expand Down Expand Up @@ -207,10 +207,6 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SchedulerCluster interface {
GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
IsSchedulingHalted() bool
}

// CheckerCluster is an aggregate interface that wraps multiple interfaces
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (c *Controller) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !c.isSchedulingHalted() {
if !s.IsPaused() && !c.cluster.IsSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand All @@ -131,10 +131,6 @@ func (c *Controller) CollectSchedulerMetrics() {
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
Expand Down Expand Up @@ -526,7 +522,7 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.isSchedulingHalted() {
if s.cluster.IsSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.SetResultFromStatus(Halted)
}
Expand All @@ -541,10 +537,6 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *ScheduleController) isSchedulingHalted() bool {
return s.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// IsPaused returns if a scheduler is paused.
func (s *ScheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
9 changes: 4 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,12 +493,11 @@ func (u *Controller) GetStage() stage {
}

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
// If the running stage changes, update the scheduling allowance status to add or remove "online-unsafe-recovery" halt.
if running := isRunning(stage); running != isRunning(u.stage) {
u.cluster.GetSchedulerConfig().SetSchedulingAllowanceStatus(running, "online-unsafe-recovery")
}
u.stage = stage

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
8 changes: 8 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,14 @@ func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) {
c.opt.SetPDServerConfig(cfg)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the PD scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
// - Online unsafe recovery is running.
func (c *RaftCluster) IsSchedulingHalted() bool {
return c.opt.IsSchedulingHalted() || c.unsafeRecoveryController.IsRunning()
}

// GetUnsafeRecoveryController returns the unsafe recovery controller.
func (c *RaftCluster) GetUnsafeRecoveryController() *unsaferecovery.Controller {
return c.unsafeRecoveryController
Expand Down
8 changes: 2 additions & 6 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down Expand Up @@ -97,13 +97,9 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}

// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -987,11 +987,8 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
func (*PersistOptions) SetSchedulingAllowanceStatus(halt bool, source string) {
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
Expand All @@ -1001,6 +998,14 @@ func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
}
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
o.SetSchedulingAllowanceStatus(halt, source)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
Expand Down
2 changes: 1 addition & 1 deletion server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func forwardRegionHeartbeatToScheduling(rc *cluster.RaftCluster, forwardStream s
return
}
// TODO: find a better way to halt scheduling immediately.
if rc.GetOpts().IsSchedulingHalted() {
if rc.IsSchedulingHalted() {
continue
}
// The error types defined for schedulingpb and pdpb are different, so we need to convert them.
Expand Down
3 changes: 3 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
}

// SetScheduleConfig sets the balance config information.
// This function is exported to be used by the API.
func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error {
if err := cfg.Validate(); err != nil {
return err
Expand All @@ -1061,6 +1062,8 @@ func (s *Server) SetScheduleConfig(cfg sc.ScheduleConfig) error {
errs.ZapError(err))
return err
}
// Update the scheduling halt status at the same time.
s.persistOptions.SetSchedulingAllowanceStatus(cfg.HaltScheduling, "manually")
log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}
Expand Down
Loading