Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/operator: add operator state #647

Merged
merged 11 commits into from
May 26, 2017
21 changes: 8 additions & 13 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (c *coordinator) dispatch(region *RegionInfo) *pdpb.RegionHeartbeatResponse
if op := c.getOperator(region.GetId()); op != nil {
res, finished := op.Do(region)
if !finished {
collectOperatorCounterMetrics(op)
return res
}
c.removeOperator(op)
Expand Down Expand Up @@ -257,14 +258,14 @@ func (c *coordinator) runScheduler(s *scheduleController) {
func (c *coordinator) addOperator(op Operator) bool {
c.Lock()
defer c.Unlock()

regionID := op.GetRegionID()

if old, ok := c.operators[regionID]; ok {
if !isHigherPriorityOperator(op, old) {
return false
}
c.limiter.removeOperator(old)
old.SetState(OperatorReplaced)
c.removeOperatorLocked(old)
log.Infof("coordinator: add operator %+v with higher priority, remove operator: %+v", op, old)
}

Expand All @@ -288,12 +289,16 @@ func isHigherPriorityOperator(new Operator, old Operator) bool {
func (c *coordinator) removeOperator(op Operator) {
c.Lock()
defer c.Unlock()
c.removeOperatorLocked(op)
}

func (c *coordinator) removeOperatorLocked(op Operator) {
regionID := op.GetRegionID()
c.limiter.removeOperator(op)
delete(c.operators, regionID)

c.histories.add(regionID, op)
collectOperatorCounterMetrics(op)
}

func (c *coordinator) getOperator(regionID uint64) Operator {
Expand Down Expand Up @@ -425,21 +430,11 @@ func (s *scheduleController) AllowSchedule() bool {
}

func collectOperatorCounterMetrics(op Operator) {
metrics := make(map[string]uint64)
regionOp, ok := op.(*regionOperator)
if !ok {
return
}
for _, op := range regionOp.Ops {
switch o := op.(type) {
case *changePeerOperator:
metrics[o.Name]++
case *transferLeaderOperator:
metrics[o.Name]++
}
}

for label, value := range metrics {
operatorCounter.WithLabelValues(label).Add(float64(value))
operatorCounter.WithLabelValues(op.GetName(), op.GetState().String()).Add(1)
}
}
18 changes: 11 additions & 7 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@ import (
type testOperator struct {
RegionID uint64
Kind ResourceKind
State OperatorState
}

func newTestOperator(regionID uint64, kind ResourceKind) Operator {
region := newRegionInfo(&metapb.Region{Id: regionID}, nil)
op := &testOperator{RegionID: regionID, Kind: kind}
op := &testOperator{RegionID: regionID, Kind: kind, State: OperatorRunning}
return newRegionOperator(region, kind, op)
}

func (op *testOperator) GetRegionID() uint64 { return op.RegionID }
func (op *testOperator) GetResourceKind() ResourceKind { return op.Kind }
func (op *testOperator) GetState() OperatorState { return op.State }
func (op *testOperator) SetState(state OperatorState) { op.State = state }
func (op *testOperator) GetName() string { return "test" }
func (op *testOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, false
}
Expand Down Expand Up @@ -89,10 +93,10 @@ func (s *testCoordinatorSuite) TestDispatch(c *C) {
tc.addLeaderRegion(2, 4, 3, 2)

// Wait for schedule and turn off balance.
s.waitOperator(c, co, 1)
waitOperator(c, co, 1)
checkTransferPeer(c, co.getOperator(1), 4, 1)
c.Assert(co.removeScheduler("balance-region-scheduler"), IsNil)
s.waitOperator(c, co, 2)
waitOperator(c, co, 2)
checkTransferLeader(c, co.getOperator(2), 4, 2)
c.Assert(co.removeScheduler("balance-leader-scheduler"), IsNil)

Expand Down Expand Up @@ -185,7 +189,7 @@ func (s *testCoordinatorSuite) TestPeerState(c *C) {
tc.addLeaderRegion(1, 2, 3, 4)

// Wait for schedule.
s.waitOperator(c, co, 1)
waitOperator(c, co, 1)
checkTransferPeer(c, co.getOperator(1), 4, 1)

region := cluster.getRegion(1)
Expand Down Expand Up @@ -280,22 +284,22 @@ func (s *testCoordinatorSuite) TestAddScheduler(c *C) {
c.Assert(co.addScheduler(gls, minScheduleInterval), IsNil)

// Transfer all leaders to store 1.
s.waitOperator(c, co, 2)
waitOperator(c, co, 2)
region2 := cluster.getRegion(2)
checkTransferLeaderResp(c, co.dispatch(region2), 1)
region2.Leader = region2.GetStorePeer(1)
cluster.putRegion(region2)
c.Assert(co.dispatch(region2), IsNil)

s.waitOperator(c, co, 3)
waitOperator(c, co, 3)
region3 := cluster.getRegion(3)
checkTransferLeaderResp(c, co.dispatch(region3), 1)
region3.Leader = region3.GetStorePeer(1)
cluster.putRegion(region3)
c.Assert(co.dispatch(region3), IsNil)
}

func (s *testCoordinatorSuite) waitOperator(c *C, co *coordinator, regionID uint64) {
func waitOperator(c *C, co *coordinator, regionID uint64) {
for i := 0; i < 20; i++ {
if co.getOperator(regionID) != nil {
return
Expand Down
10 changes: 10 additions & 0 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ func (op *splitOperator) GetResourceKind() ResourceKind {
return OtherKind
}

func (op *splitOperator) GetState() OperatorState {
return OperatorFinished
}

func (op *splitOperator) SetState(_ OperatorState) {}

func (op *splitOperator) GetName() string {
return op.Name
}

// Do implements Operator.Do interface.
func (op *splitOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, true
Expand Down
2 changes: 1 addition & 1 deletion server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
Subsystem: "schedule",
Name: "operators_count",
Help: "Counter of schedule operators.",
}, []string{"type"})
}, []string{"type", "state"})

clusterStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
Loading