Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/operator: add operator state #647

Merged
merged 11 commits into from
May 26, 2017
21 changes: 8 additions & 13 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,22 +222,26 @@ func (c *coordinator) runScheduler(s *scheduleController) {

func (c *coordinator) addOperator(op Operator) bool {
c.Lock()
defer c.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer won't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function removeOperator also need to access lock

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about add another func called removeOperatorWithLock?


regionID := op.GetRegionID()

if old, ok := c.operators[regionID]; ok {
if !isHigherPriorityOperator(op, old) {
c.Unlock()
return false
}
c.limiter.removeOperator(old)
c.Unlock()
old.SetState(OperatorReplaced)
c.removeOperator(old)
c.Lock()
log.Infof("coordinator: add operator %+v with higher priority, remove operator: %+v", op, old)
}

c.histories.add(regionID, op)
c.limiter.addOperator(op)
c.operators[regionID] = op
collectOperatorCounterMetrics(op)
c.Unlock()
return true
}

Expand All @@ -260,6 +264,7 @@ func (c *coordinator) removeOperator(op Operator) {
delete(c.operators, regionID)

c.histories.add(regionID, op)
collectOperatorCounterMetrics(op)
}

func (c *coordinator) getOperator(regionID uint64) Operator {
Expand Down Expand Up @@ -391,21 +396,11 @@ func (s *scheduleController) AllowSchedule() bool {
}

func collectOperatorCounterMetrics(op Operator) {
metrics := make(map[string]uint64)
regionOp, ok := op.(*regionOperator)
if !ok {
return
}
for _, op := range regionOp.Ops {
switch o := op.(type) {
case *changePeerOperator:
metrics[o.Name]++
case *transferLeaderOperator:
metrics[o.Name]++
}
}

for label, value := range metrics {
operatorCounter.WithLabelValues(label).Add(float64(value))
operatorCounter.WithLabelValues(op.GetName(), op.GetState().String()).Add(1)
}
}
6 changes: 5 additions & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@ import (
type testOperator struct {
RegionID uint64
Kind ResourceKind
State OperatorState
}

func newTestOperator(regionID uint64, kind ResourceKind) Operator {
region := newRegionInfo(&metapb.Region{Id: regionID}, nil)
op := &testOperator{RegionID: regionID, Kind: kind}
op := &testOperator{RegionID: regionID, Kind: kind, State: OperatorRunning}
return newRegionOperator(region, kind, op)
}

func (op *testOperator) GetRegionID() uint64 { return op.RegionID }
func (op *testOperator) GetResourceKind() ResourceKind { return op.Kind }
func (op *testOperator) GetState() OperatorState { return op.State }
func (op *testOperator) SetState(state OperatorState) { op.State = state }
func (op *testOperator) GetName() string { return "test" }
func (op *testOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, false
}
Expand Down
12 changes: 12 additions & 0 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,18 @@ func (op *splitOperator) GetResourceKind() ResourceKind {
return OtherKind
}

func (op *splitOperator) GetState() OperatorState {
return OperatorFinished
}

func (op *splitOperator) SetState(state OperatorState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/state/_/

return
}

func (op *splitOperator) GetName() string {
return op.Name
}

// Do implements Operator.Do interface.
func (op *splitOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, true
Expand Down
2 changes: 1 addition & 1 deletion server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
Subsystem: "schedule",
Name: "operators_count",
Help: "Counter of schedule operators.",
}, []string{"type"})
}, []string{"type", "state"})

clusterStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
Loading