Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6427
Browse files Browse the repository at this point in the history
close tikv#6426

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
nolouch authored and ti-chi-bot committed Feb 18, 2024
1 parent b092996 commit 0423dfc
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 15 deletions.
13 changes: 13 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,20 @@ import (

var (
// DefaultMinResolvedTSPersistenceInterval is the default value of min resolved ts persistence interval.
<<<<<<< HEAD
DefaultMinResolvedTSPersistenceInterval = 10 * time.Second
=======
// If interval in config is zero, it means not to persist resolved ts and check config with this DefaultMinResolvedTSPersistenceInterval
DefaultMinResolvedTSPersistenceInterval = config.DefaultMinResolvedTSPersistenceInterval
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
regionUpdateCacheEventCounter = regionEventCounter.WithLabelValues("update_cache")
regionUpdateKVEventCounter = regionEventCounter.WithLabelValues("update_kv")
regionCacheMissCounter = bucketEventCounter.WithLabelValues("region_cache_miss")
versionNotMatchCounter = bucketEventCounter.WithLabelValues("version_not_match")
updateFailedCounter = bucketEventCounter.WithLabelValues("update_failed")

denySchedulersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("schedulers", "deny")
>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427))
)

// regionLabelGCInterval is the interval to run region-label's GC work.
Expand Down
25 changes: 25 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,9 +892,34 @@ func (s *scheduleController) Schedule() []*operator.Operator {
default:
}
cacheCluster := newCacheCluster(s.cluster)
<<<<<<< HEAD
// If we have schedule, reset interval to the minimal interval.
if ops := s.Scheduler.Schedule(cacheCluster); len(ops) > 0 {
=======
// we need only process diagnostic once in the retry loop
diagnosable = diagnosable && i == 0
ops, plans := s.Scheduler.Schedule(cacheCluster, diagnosable)
if diagnosable {
s.diagnosticRecorder.setResultFromPlans(ops, plans)
}
foundDisabled := false
for _, op := range ops {
if labelMgr := s.cluster.GetRegionLabeler(); labelMgr != nil {
if labelMgr.ScheduleDisabled(s.cluster.GetRegion(op.RegionID())) {
denySchedulersByLabelerCounter.Inc()
foundDisabled = true
break
}
}
}
if len(ops) > 0 {
// If we have schedule, reset interval to the minimal interval.
>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427))
s.nextInterval = s.Scheduler.GetMinInterval()
// try regenerating operators
if foundDisabled {
continue
}
return ops
}
}
Expand Down
67 changes: 67 additions & 0 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,11 +373,31 @@ func (s *testCoordinatorSuite) TestCheckRegionWithScheduleDeny(c *C) {
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})

<<<<<<< HEAD
c.Assert(labelerManager.ScheduleDisabled(region), IsTrue)
s.checkRegion(c, tc, co, 1, 0)
labelerManager.DeleteLabelRule("schedulelabel")
c.Assert(labelerManager.ScheduleDisabled(region), IsFalse)
s.checkRegion(c, tc, co, 1, 1)
=======
// should allow to do rule checker
re.True(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 1, 1)

// should not allow to merge
tc.opt.SetSplitMergeInterval(time.Duration(0))

re.NoError(tc.addLeaderRegion(2, 2, 3, 4))
re.NoError(tc.addLeaderRegion(3, 2, 3, 4))
region = tc.GetRegion(2)
re.True(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 2, 0)

// delete label rule, should allow to do merge
labelerManager.DeleteLabelRule("schedulelabel")
re.False(labelerManager.ScheduleDisabled(region))
checkRegionAndOperator(re, tc, co, 2, 2)
>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427))
}

func (s *testCoordinatorSuite) TestCheckerIsBusy(c *C) {
Expand Down Expand Up @@ -812,7 +832,54 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) {
c.Assert(co.schedulers, HasLen, 3)
}

<<<<<<< HEAD
func (s *testCoordinatorSuite) TestRemoveScheduler(c *C) {
=======
func TestDenyScheduler(t *testing.T) {
re := require.New(t)

tc, co, cleanup := prepare(nil, nil, func(co *coordinator) {
labelerManager := co.cluster.GetRegionLabeler()
labelerManager.SetLabelRule(&labeler.LabelRule{
ID: "schedulelabel",
Labels: []labeler.RegionLabel{{Key: "schedule", Value: "deny"}},
RuleType: labeler.KeyRange,
Data: []interface{}{map[string]interface{}{"start_key": "", "end_key": ""}},
})
co.run()
}, re)
defer cleanup()

re.Len(co.schedulers, len(config.DefaultSchedulers))

// Transfer peer from store 4 to store 1 if not set deny.
re.NoError(tc.addRegionStore(4, 40))
re.NoError(tc.addRegionStore(3, 30))
re.NoError(tc.addRegionStore(2, 20))
re.NoError(tc.addRegionStore(1, 10))
re.NoError(tc.addLeaderRegion(1, 2, 3, 4))

// Transfer leader from store 4 to store 2 if not set deny.
re.NoError(tc.updateLeaderCount(4, 1000))
re.NoError(tc.updateLeaderCount(3, 50))
re.NoError(tc.updateLeaderCount(2, 20))
re.NoError(tc.updateLeaderCount(1, 10))
re.NoError(tc.addLeaderRegion(2, 4, 3, 2))

// there should no balance leader/region operator
for i := 0; i < 10; i++ {
re.Nil(co.opController.GetOperator(1))
re.Nil(co.opController.GetOperator(2))
time.Sleep(10 * time.Millisecond)
}
}

func TestRemoveScheduler(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427))
tc, co, cleanup := prepare(func(cfg *config.ScheduleConfig) {
cfg.ReplicaScheduleLimit = 0
}, nil, func(co *coordinator) { co.run() }, c)
Expand Down
18 changes: 11 additions & 7 deletions server/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
// DefaultCacheSize is the default length of waiting list.
const DefaultCacheSize = 1000

var denyCheckersByLabelerCounter = schedule.LabelerEventCounter.WithLabelValues("checkers", "deny")

// Controller is used to manage all checkers.
type Controller struct {
cluster schedule.Cluster
Expand Down Expand Up @@ -80,13 +82,6 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
return []*operator.Operator{op}
}

if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
return nil
}
}

if op := c.splitChecker.Check(region); op != nil {
return []*operator.Operator{op}
}
Expand All @@ -112,6 +107,15 @@ func (c *Controller) CheckRegion(region *core.RegionInfo) []*operator.Operator {
c.regionWaitingList.Put(region.GetID(), nil)
}
}
// skip the joint checker, split checker and rule checker when region label is set to "schedule=deny".
// those checkers is help to make region health, it's necessary to skip them when region is set to deny.
if cl, ok := c.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
denyCheckersByLabelerCounter.Inc()
return nil
}
}

if c.mergeChecker != nil {
allowed := opController.OperatorCount(operator.OpMerge) < c.opts.GetMergeScheduleLimit()
Expand Down
10 changes: 10 additions & 0 deletions server/schedule/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ var (
Name: "scatter_distribution",
Help: "Counter of the distribution in scatter.",
}, []string{"store", "is_leader", "engine"})

// LabelerEventCounter is a counter of the scheduler labeler system.
LabelerEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "schedule",
Name: "labeler_event_counter",
Help: "Counter of the scheduler label.",
}, []string{"type", "event"})
)

func init() {
Expand All @@ -94,4 +103,5 @@ func init() {
prometheus.MustRegister(scatterCounter)
prometheus.MustRegister(scatterDistributionCounter)
prometheus.MustRegister(operatorSizeHist)
prometheus.MustRegister(LabelerEventCounter)
}
15 changes: 7 additions & 8 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,19 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/errs"
<<<<<<< HEAD:server/schedule/operator_controller.go

Check failure on line 31 in server/schedule/operator_controller.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'

Check failure on line 31 in server/schedule/operator_controller.go

View workflow job for this annotation

GitHub Actions / statics

expected 'STRING', found '<<'
"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/core/storelimit"
"github.com/tikv/pd/server/schedule/hbstream"
"github.com/tikv/pd/server/schedule/labeler"
"github.com/tikv/pd/server/schedule/operator"
=======
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/utils/syncutil"
"github.com/tikv/pd/pkg/versioninfo"
>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427)):pkg/schedule/operator_controller.go
"go.uber.org/zap"
)

Expand Down Expand Up @@ -421,14 +428,6 @@ func (oc *OperatorController) checkAddOperator(isPromoting bool, ops ...*operato
if op.SchedulerKind() == operator.OpAdmin || op.IsLeaveJointStateOperator() {
continue
}
if cl, ok := oc.cluster.(interface{ GetRegionLabeler() *labeler.RegionLabeler }); ok {
l := cl.GetRegionLabeler()
if l.ScheduleDisabled(region) {
log.Debug("schedule disabled", zap.Uint64("region-id", op.RegionID()))
operatorWaitCounter.WithLabelValues(op.Desc(), "schedule-disabled").Inc()
return false
}
}
}
expired := false
for _, op := range ops {
Expand Down
26 changes: 26 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
Data: []interface{}{map[string]interface{}{"start_key": "1a", "end_key": "1b"}},
})

<<<<<<< HEAD:server/schedule/operator_controller_test.go
c.Assert(labelerManager.ScheduleDisabled(source), IsTrue)
// add operator should be failed since it is labeled with `schedule=deny`.
c.Assert(controller.AddWaitingOperator(ops...), Equals, 0)
Expand All @@ -771,4 +772,29 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {

// no space left, new operator can not be added.
c.Assert(controller.AddWaitingOperator(addPeerOp(0)), Equals, 0)
=======
suite.True(labelerManager.ScheduleDisabled(source))
// add operator should be success since it is not check in addWaitingOperator
suite.Equal(2, controller.AddWaitingOperator(ops...))
}

// issue #5279
func (suite *operatorControllerTestSuite) TestInvalidStoreId() {
opt := mockconfig.NewTestOptions()
tc := mockcluster.NewCluster(suite.ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */)
oc := NewOperatorController(suite.ctx, tc, stream)
// If PD and store 3 are gone, PD will not have info of store 3 after recreating it.
tc.AddRegionStore(1, 1)
tc.AddRegionStore(2, 1)
tc.AddRegionStore(4, 1)
tc.AddLeaderRegionWithRange(1, "", "", 1, 2, 3, 4)
steps := []operator.OpStep{
operator.RemovePeer{FromStore: 3, PeerID: 3, IsDownStore: false},
}
op := operator.NewTestOperator(1, &metapb.RegionEpoch{}, operator.OpRegion, steps...)
suite.True(oc.addOperatorLocked(op))
// Although store 3 does not exist in PD, PD can also send op to TiKV.
suite.Equal(pdpb.OperatorStatus_RUNNING, oc.GetOperatorStatus(1).Status)
>>>>>>> 2e12b960a (checker: fix unhealth region skip the rule check (#6427)):pkg/schedule/operator_controller_test.go
}

0 comments on commit 0423dfc

Please sign in to comment.