Skip to content

Commit

Permalink
This is an automated cherry-pick of #8147
Browse files Browse the repository at this point in the history
ref #6493, close #8095

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
JmPotato authored and ti-chi-bot committed May 20, 2024
1 parent 5a3d21d commit 9d2a577
Show file tree
Hide file tree
Showing 13 changed files with 714 additions and 28 deletions.
25 changes: 25 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,28 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
func (c *Cluster) IsPrepared() bool {
return c.coordinator.GetPrepareChecker().IsPrepared()
}
<<<<<<< HEAD
=======

// SetPrepared set the prepare check to prepared. Only for test purpose.
func (c *Cluster) SetPrepared() {
c.coordinator.GetPrepareChecker().SetPrepared()
}

// DropCacheAllRegion removes all cached regions.
func (c *Cluster) DropCacheAllRegion() {
c.ResetRegionCache()
}

// DropCacheRegion removes a region from the cache.
func (c *Cluster) DropCacheRegion(id uint64) {
c.RemoveRegionIfExist(id)
}

// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the microservice scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
func (c *Cluster) IsSchedulingHalted() bool {
return c.persistConfig.IsSchedulingHalted()
}
>>>>>>> 740f15e65 (*: individually check the scheduling halt for online unsafe recovery (#8147))
4 changes: 4 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ func (o *PersistConfig) SetSplitMergeInterval(splitMergeInterval time.Duration)
o.SetScheduleConfig(v)
}

// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
// TODO: support this metrics for the scheduling service in the future.
func (*PersistConfig) SetSchedulingAllowanceStatus(bool, string) {}

// SetHaltScheduling set HaltScheduling.
func (o *PersistConfig) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
Expand Down
144 changes: 144 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,150 @@ func (s *Service) StoreHeartbeat(ctx context.Context, request *schedulingpb.Stor
return &schedulingpb.StoreHeartbeatResponse{Header: &schedulingpb.ResponseHeader{ClusterId: s.clusterID}}, nil
}

<<<<<<< HEAD
=======
// SplitRegions split regions by the given split keys
func (s *Service) SplitRegions(ctx context.Context, request *schedulingpb.SplitRegionsRequest) (*schedulingpb.SplitRegionsResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.SplitRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}
finishedPercentage, newRegionIDs := c.GetRegionSplitter().SplitRegions(ctx, request.GetSplitKeys(), int(request.GetRetryLimit()))
return &schedulingpb.SplitRegionsResponse{
Header: s.header(),
RegionsId: newRegionIDs,
FinishedPercentage: uint64(finishedPercentage),
}, nil
}

// ScatterRegions implements gRPC SchedulingServer.
func (s *Service) ScatterRegions(_ context.Context, request *schedulingpb.ScatterRegionsRequest) (*schedulingpb.ScatterRegionsResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.ScatterRegionsResponse{Header: s.notBootstrappedHeader()}, nil
}

opsCount, failures, err := c.GetRegionScatterer().ScatterRegionsByID(request.GetRegionsId(), request.GetGroup(), int(request.GetRetryLimit()), request.GetSkipStoreLimit())
if err != nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: err.Error(),
})
return &schedulingpb.ScatterRegionsResponse{Header: header}, nil
}
percentage := 100
if len(failures) > 0 {
percentage = 100 - 100*len(failures)/(opsCount+len(failures))
log.Debug("scatter regions", zap.Errors("failures", func() []error {
r := make([]error, 0, len(failures))
for _, err := range failures {
r = append(r, err)
}
return r
}()))
}
return &schedulingpb.ScatterRegionsResponse{
Header: s.header(),
FinishedPercentage: uint64(percentage),
}, nil
}

// GetOperator gets information about the operator belonging to the specify region.
func (s *Service) GetOperator(_ context.Context, request *schedulingpb.GetOperatorRequest) (*schedulingpb.GetOperatorResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.GetOperatorResponse{Header: s.notBootstrappedHeader()}, nil
}

opController := c.GetCoordinator().GetOperatorController()
requestID := request.GetRegionId()
r := opController.GetOperatorStatus(requestID)
if r == nil {
header := s.errorHeader(&schedulingpb.Error{
Type: schedulingpb.ErrorType_UNKNOWN,
Message: "region not found",
})
return &schedulingpb.GetOperatorResponse{Header: header}, nil
}

return &schedulingpb.GetOperatorResponse{
Header: s.header(),
RegionId: requestID,
Desc: []byte(r.Desc()),
Kind: []byte(r.Kind().String()),
Status: r.Status,
}, nil
}

// AskBatchSplit implements gRPC SchedulingServer.
func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatchSplitRequest) (*schedulingpb.AskBatchSplitResponse, error) {
c := s.GetCluster()
if c == nil {
return &schedulingpb.AskBatchSplitResponse{Header: s.notBootstrappedHeader()}, nil
}

if request.GetRegion() == nil {
return &schedulingpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN,
"missing region for split"),
}, nil
}

if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.persistConfig.IsTikvRegionSplitEnabled() {
return nil, errs.ErrSchedulerTiKVSplitDisabled.FastGenByArgs()
}
reqRegion := request.GetRegion()
splitCount := request.GetSplitCount()
err := c.ValidRegion(reqRegion)
if err != nil {
return nil, err
}
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)

for i := 0; i < int(splitCount); i++ {
newRegionID, err := c.AllocID()
if err != nil {
return nil, errs.ErrSchedulerNotFound.FastGenByArgs()
}

peerIDs := make([]uint64, len(request.Region.Peers))
for i := 0; i < len(peerIDs); i++ {
if peerIDs[i], err = c.AllocID(); err != nil {
return nil, err
}
}

recordRegions = append(recordRegions, newRegionID)
splitIDs = append(splitIDs, &pdpb.SplitID{
NewRegionId: newRegionID,
NewPeerIds: peerIDs,
})

log.Info("alloc ids for region split", zap.Uint64("region-id", newRegionID), zap.Uint64s("peer-ids", peerIDs))
}

recordRegions = append(recordRegions, reqRegion.GetId())
if versioninfo.IsFeatureSupported(c.persistConfig.GetClusterVersion(), versioninfo.RegionMerge) {
// Disable merge the regions in a period of time.
c.GetCoordinator().GetMergeChecker().RecordRegionSplit(recordRegions)
}

// If region splits during the scheduling process, regions with abnormal
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddSuspectRegions(recordRegions...)

return &schedulingpb.AskBatchSplitResponse{
Header: s.header(),
Ids: splitIDs,
}, nil
}

>>>>>>> 740f15e65 (*: individually check the scheduling halt for online unsafe recovery (#8147))
// RegisterGRPCService registers the service to gRPC server.
func (s *Service) RegisterGRPCService(g *grpc.Server) {
schedulingpb.RegisterSchedulingServer(g, s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/config/config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func IsSchedulerRegistered(name string) bool {
type SchedulerConfigProvider interface {
SharedConfigProvider

IsSchedulingHalted() bool
SetSchedulingAllowanceStatus(bool, string)
GetStoresLimit() map[uint64]StoreLimitConfig

IsSchedulerDisabled(string) bool
Expand Down
6 changes: 1 addition & 5 deletions pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (c *Coordinator) PatrolRegions() {
log.Info("patrol regions has been stopped")
return
}
if c.isSchedulingHalted() {
if c.cluster.IsSchedulingHalted() {
continue
}

Expand Down Expand Up @@ -207,10 +207,6 @@ func (c *Coordinator) PatrolRegions() {
}
}

func (c *Coordinator) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

func (c *Coordinator) checkRegions(startKey []byte) (key []byte, regions []*core.RegionInfo) {
regions = c.cluster.ScanRegions(startKey, nil, patrolScanRegionLimit)
if len(regions) == 0 {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/core/cluster_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type SchedulerCluster interface {
GetSchedulerConfig() sc.SchedulerConfigProvider
GetRegionLabeler() *labeler.RegionLabeler
GetStoreConfig() sc.StoreConfigProvider
IsSchedulingHalted() bool
}

// CheckerCluster is an aggregate interface that wraps multiple interfaces
Expand Down
12 changes: 2 additions & 10 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *Controller) CollectSchedulerMetrics() {
var allowScheduler float64
// If the scheduler is not allowed to schedule, it will disappear in Grafana panel.
// See issue #1341.
if !s.IsPaused() && !c.isSchedulingHalted() {
if !s.IsPaused() && !c.cluster.IsSchedulingHalted() {
allowScheduler = 1
}
schedulerStatusGauge.WithLabelValues(s.Scheduler.GetName(), "allow").Set(allowScheduler)
Expand All @@ -130,10 +130,6 @@ func (c *Controller) CollectSchedulerMetrics() {
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
Expand Down Expand Up @@ -518,7 +514,7 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
}
return false
}
if s.isSchedulingHalted() {
if s.cluster.IsSchedulingHalted() {
if diagnosable {
s.diagnosticRecorder.SetResultFromStatus(Halted)
}
Expand All @@ -533,10 +529,6 @@ func (s *ScheduleController) AllowSchedule(diagnosable bool) bool {
return true
}

func (s *ScheduleController) isSchedulingHalted() bool {
return s.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// IsPaused returns if a scheduler is paused.
func (s *ScheduleController) IsPaused() bool {
delayUntil := atomic.LoadInt64(&s.delayUntil)
Expand Down
9 changes: 4 additions & 5 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,12 +492,11 @@ func (u *Controller) GetStage() stage {
}

func (u *Controller) changeStage(stage stage) {
u.stage = stage
// Halt and resume the scheduling once the running state changed.
running := isRunning(stage)
if opt := u.cluster.GetSchedulerConfig(); opt.IsSchedulingHalted() != running {
opt.SetHaltScheduling(running, "online-unsafe-recovery")
// If the running stage changes, update the scheduling allowance status to add or remove "online-unsafe-recovery" halt.
if running := isRunning(stage); running != isRunning(u.stage) {
u.cluster.GetSchedulerConfig().SetSchedulingAllowanceStatus(running, "online-unsafe-recovery")
}
u.stage = stage

var output StageOutput
output.Time = time.Now().Format("2006-01-02 15:04:05.000")
Expand Down
9 changes: 9 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ func (c *RaftCluster) SetPDServerConfig(cfg *config.PDServerConfig) {
c.opt.SetPDServerConfig(cfg)
}

<<<<<<< HEAD
// AddSuspectRegions adds regions to suspect list.
func (c *RaftCluster) AddSuspectRegions(regionIDs ...uint64) {
c.coordinator.GetCheckerController().AddSuspectRegions(regionIDs...)
Expand Down Expand Up @@ -918,6 +919,14 @@ func (c *RaftCluster) GetLabelStats() *statistics.LabelStatistics {
// RemoveSuspectRegion removes region from suspect list.
func (c *RaftCluster) RemoveSuspectRegion(id uint64) {
c.coordinator.GetCheckerController().RemoveSuspectRegion(id)
=======
// IsSchedulingHalted returns whether the scheduling is halted.
// Currently, the PD scheduling is halted when:
// - The `HaltScheduling` persist option is set to true.
// - Online unsafe recovery is running.
func (c *RaftCluster) IsSchedulingHalted() bool {
return c.opt.IsSchedulingHalted() || c.unsafeRecoveryController.IsRunning()
>>>>>>> 740f15e65 (*: individually check the scheduling halt for online unsafe recovery (#8147))
}

// GetUnsafeRecoveryController returns the unsafe recovery controller.
Expand Down
7 changes: 5 additions & 2 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error {

// HandleAskSplit handles the split request.
func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down Expand Up @@ -86,6 +86,7 @@ func (c *RaftCluster) HandleAskSplit(request *pdpb.AskSplitRequest) (*pdpb.AskSp
return split, nil
}

<<<<<<< HEAD
func (c *RaftCluster) isSchedulingHalted() bool {
return c.opt.IsSchedulingHalted()
}
Expand All @@ -107,9 +108,11 @@ func (c *RaftCluster) ValidRequestRegion(reqRegion *metapb.Region) error {
return nil
}

=======
>>>>>>> 740f15e65 (*: individually check the scheduling halt for online unsafe recovery (#8147))
// HandleAskBatchSplit handles the batch split request.
func (c *RaftCluster) HandleAskBatchSplit(request *pdpb.AskBatchSplitRequest) (*pdpb.AskBatchSplitResponse, error) {
if c.isSchedulingHalted() {
if c.IsSchedulingHalted() {
return nil, errs.ErrSchedulingIsHalted.FastGenByArgs()
}
if !c.opt.IsTikvRegionSplitEnabled() {
Expand Down
15 changes: 10 additions & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,8 @@ func (o *PersistOptions) SetAllStoresLimitTTL(ctx context.Context, client *clien

var haltSchedulingStatus = schedulingAllowanceStatusGauge.WithLabelValues("halt-scheduling")

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
// SetSchedulingAllowanceStatus sets the scheduling allowance status to help distinguish the source of the halt.
func (*PersistOptions) SetSchedulingAllowanceStatus(halt bool, source string) {
if halt {
haltSchedulingStatus.Set(1)
schedulingAllowanceStatusGauge.WithLabelValues(source).Set(1)
Expand All @@ -987,6 +984,14 @@ func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
}
}

// SetHaltScheduling set HaltScheduling.
func (o *PersistOptions) SetHaltScheduling(halt bool, source string) {
v := o.GetScheduleConfig().Clone()
v.HaltScheduling = halt
o.SetScheduleConfig(v)
o.SetSchedulingAllowanceStatus(halt, source)
}

// IsSchedulingHalted returns if PD scheduling is halted.
func (o *PersistOptions) IsSchedulingHalted() bool {
if o == nil {
Expand Down
Loading

0 comments on commit 9d2a577

Please sign in to comment.