Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server/operator: add operator state #647

Merged
merged 11 commits into from
May 26, 2017
6 changes: 6 additions & 0 deletions server/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ func (c *coordinator) addOperator(op Operator) bool {
c.limiter.addOperator(op)
c.operators[regionID] = op
collectOperatorCounterMetrics(op)
collectOperatorStateCounterMetrics(op)
return true
}

Expand All @@ -260,6 +261,7 @@ func (c *coordinator) removeOperator(op Operator) {
delete(c.operators, regionID)

c.histories.add(regionID, op)
collectOperatorStateCounterMetrics(op)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the operator is replaced, its status will still be Doing.

}

func (c *coordinator) getOperator(regionID uint64) Operator {
Expand Down Expand Up @@ -409,3 +411,7 @@ func collectOperatorCounterMetrics(op Operator) {
operatorCounter.WithLabelValues(label).Add(float64(value))
}
}

func collectOperatorStateCounterMetrics(op Operator) {
operatorCounter.WithLabelValues(op.GetState().String()).Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to use labels (operator, state) here.

}
4 changes: 3 additions & 1 deletion server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ import (
type testOperator struct {
RegionID uint64
Kind ResourceKind
State OperatorState
}

func newTestOperator(regionID uint64, kind ResourceKind) Operator {
region := newRegionInfo(&metapb.Region{Id: regionID}, nil)
op := &testOperator{RegionID: regionID, Kind: kind}
op := &testOperator{RegionID: regionID, Kind: kind, State: OperatorDoing}
return newRegionOperator(region, kind, op)
}

func (op *testOperator) GetRegionID() uint64 { return op.RegionID }
func (op *testOperator) GetResourceKind() ResourceKind { return op.Kind }
func (op *testOperator) GetState() OperatorState { return op.State }
func (op *testOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, false
}
Expand Down
4 changes: 4 additions & 0 deletions server/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (op *splitOperator) GetResourceKind() ResourceKind {
return OtherKind
}

func (op *splitOperator) GetState() OperatorState {
return OperatorFinished
}

// Do implements Operator.Do interface.
func (op *splitOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
return nil, true
Expand Down
115 changes: 102 additions & 13 deletions server/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ package server

import (
"fmt"
"strconv"
"time"

log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)
Expand Down Expand Up @@ -77,24 +79,82 @@ func ParseResourceKind(name string) ResourceKind {
return UnKnownKind
}

// OperatorState indicates state of the operator
type OperatorState int

const (
// OperatorUnKnownState indicates the unknown state
OperatorUnKnownState OperatorState = iota
// OperatorDoing indicates the doing state
OperatorDoing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/Doing/Running/s

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or Wait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or Wait as the initial state, after the operator is sent to tikv, update to Running.

// OperatorFinished indicates the finished state
OperatorFinished
// OperatorTimeOut indicates the time_out state
OperatorTimeOut
)

var operatorStateToName = map[int]string{
0: "unknown",
1: "doing",
2: "finished",
3: "time_out",
}

var operatorStateNameToValue = map[string]OperatorState{
"unknown": OperatorUnKnownState,
"doing": OperatorDoing,
"finished": OperatorFinished,
"time_out": OperatorTimeOut,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout

}

func (o OperatorState) String() string {
s, ok := operatorStateToName[int(o)]
if ok {
return s
}
return operatorStateToName[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/0/OperatorUnKnownState/s

}

// MarshalJSON returns the state as a JSON string
func (o OperatorState) MarshalJSON() ([]byte, error) {
return []byte(strconv.Quote(o.String())), nil
}

// UnmarshalJSON parses a JSON string into the OperatorState
func (o *OperatorState) UnmarshalJSON(text []byte) error {
s, err := strconv.Unquote(string(text))
if err != nil {
return errors.Trace(err)
}
state, ok := operatorStateNameToValue[s]
if !ok {
*o = OperatorUnKnownState
}
*o = state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should in else closure.

return nil
}

// Operator is an interface to schedule region.
type Operator interface {
GetRegionID() uint64
GetResourceKind() ResourceKind
GetState() OperatorState
Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool)
}

type adminOperator struct {
Region *RegionInfo `json:"region"`
Start time.Time `json:"start"`
Ops []Operator `json:"ops"`
Region *RegionInfo `json:"region"`
Start time.Time `json:"start"`
Ops []Operator `json:"ops"`
State OperatorState `json:"state"`
}

func newAdminOperator(region *RegionInfo, ops ...Operator) *adminOperator {
return &adminOperator{
Region: region,
Start: time.Now(),
Ops: ops,
State: OperatorDoing,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Doing here?

}
}

Expand All @@ -110,6 +170,10 @@ func (op *adminOperator) GetResourceKind() ResourceKind {
return AdminKind
}

func (op *adminOperator) GetState() OperatorState {
return op.State
}

func (op *adminOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
// Update region.
op.Region = region.clone()
Expand All @@ -122,16 +186,18 @@ func (op *adminOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse,
}

// Admin operator never ends, remove it from the API.
op.State = OperatorFinished
return nil, false
}

type regionOperator struct {
Region *RegionInfo `json:"region"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
Index int `json:"index"`
Ops []Operator `json:"ops"`
Kind ResourceKind `json:"kind"`
Region *RegionInfo `json:"region"`
Start time.Time `json:"start"`
End time.Time `json:"end"`
Index int `json:"index"`
Ops []Operator `json:"ops"`
Kind ResourceKind `json:"kind"`
State OperatorState `json:"state"`
}

func newRegionOperator(region *RegionInfo, kind ResourceKind, ops ...Operator) *regionOperator {
Expand All @@ -145,6 +211,7 @@ func newRegionOperator(region *RegionInfo, kind ResourceKind, ops ...Operator) *
Start: time.Now(),
Ops: ops,
Kind: kind,
State: OperatorDoing,
}
}

Expand All @@ -160,9 +227,14 @@ func (op *regionOperator) GetResourceKind() ResourceKind {
return op.Kind
}

func (op *regionOperator) GetState() OperatorState {
return op.State
}

func (op *regionOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
if time.Since(op.Start) > maxOperatorWaitTime {
log.Errorf("[region %d] Operator timeout:%s", region.GetId(), op)
op.State = OperatorTimeOut
return nil, true
}

Expand All @@ -177,13 +249,15 @@ func (op *regionOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse,
}

op.End = time.Now()
op.State = OperatorFinished
return nil, true
}

type changePeerOperator struct {
Name string `json:"name"`
RegionID uint64 `json:"region_id"`
ChangePeer *pdpb.ChangePeer `json:"change_peer"`
State OperatorState `json:"state"`
}

func newAddPeerOperator(regionID uint64, peer *metapb.Peer) *changePeerOperator {
Expand All @@ -195,6 +269,7 @@ func newAddPeerOperator(regionID uint64, peer *metapb.Peer) *changePeerOperator
ChangeType: pdpb.ConfChangeType_AddNode,
Peer: peer,
},
State: OperatorDoing,
}
}

Expand All @@ -207,6 +282,7 @@ func newRemovePeerOperator(regionID uint64, peer *metapb.Peer) *changePeerOperat
ChangeType: pdpb.ConfChangeType_RemoveNode,
Peer: peer,
},
State: OperatorDoing,
}
}

Expand All @@ -222,6 +298,10 @@ func (op *changePeerOperator) GetResourceKind() ResourceKind {
return RegionKind
}

func (op *changePeerOperator) GetState() OperatorState {
return op.State
}

func (op *changePeerOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
// Check if operator is finished.
peer := op.ChangePeer.GetPeer()
Expand All @@ -233,11 +313,13 @@ func (op *changePeerOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatRespo
}
if region.GetPeer(peer.GetId()) != nil {
// Peer is added and finished.
op.State = OperatorFinished
return nil, true
}
case pdpb.ConfChangeType_RemoveNode:
if region.GetPeer(peer.GetId()) == nil {
// Peer is removed.
op.State = OperatorFinished
return nil, true
}
}
Expand All @@ -251,10 +333,11 @@ func (op *changePeerOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatRespo
}

type transferLeaderOperator struct {
Name string `json:"name"`
RegionID uint64 `json:"region_id"`
OldLeader *metapb.Peer `json:"old_leader"`
NewLeader *metapb.Peer `json:"new_leader"`
Name string `json:"name"`
RegionID uint64 `json:"region_id"`
OldLeader *metapb.Peer `json:"old_leader"`
NewLeader *metapb.Peer `json:"new_leader"`
State OperatorState `json:"state"`
}

func newTransferLeaderOperator(regionID uint64, oldLeader, newLeader *metapb.Peer) *transferLeaderOperator {
Expand All @@ -263,6 +346,7 @@ func newTransferLeaderOperator(regionID uint64, oldLeader, newLeader *metapb.Pee
RegionID: regionID,
OldLeader: oldLeader,
NewLeader: newLeader,
State: OperatorDoing,
}
}

Expand All @@ -278,9 +362,14 @@ func (op *transferLeaderOperator) GetResourceKind() ResourceKind {
return LeaderKind
}

func (op *transferLeaderOperator) GetState() OperatorState {
return op.State
}

func (op *transferLeaderOperator) Do(region *RegionInfo) (*pdpb.RegionHeartbeatResponse, bool) {
// Check if operator is finished.
if region.Leader.GetId() == op.NewLeader.GetId() {
op.State = OperatorFinished
return nil, true
}

Expand Down
35 changes: 35 additions & 0 deletions server/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package server

import (
"encoding/json"

. "github.com/pingcap/check"
)

Expand All @@ -38,6 +40,7 @@ func (s *testResouceKindSuite) TestString(c *C) {
c.Assert(t.value.String(), Equals, t.name)
}
}

func (s *testResouceKindSuite) TestParseResouceKind(c *C) {
tbl := []struct {
name string
Expand All @@ -55,3 +58,35 @@ func (s *testResouceKindSuite) TestParseResouceKind(c *C) {
c.Assert(ParseResourceKind(t.name), Equals, t.value)
}
}

var _ = Suite(&testOperatorSuite{})

type testOperatorSuite struct{}

func (o *testOperatorSuite) TestOperatorStateString(c *C) {
tbl := []struct {
value OperatorState
name string
}{
{OperatorUnKnownState, "unknown"},
{OperatorDoing, "doing"},
{OperatorFinished, "finished"},
{OperatorTimeOut, "time_out"},
{OperatorState(404), "unknown"},
}
for _, t := range tbl {
c.Assert(t.value.String(), Equals, t.name)
}
}

func (o *testOperatorSuite) TestOperatorStateMarshal(c *C) {
states := []OperatorState{OperatorUnKnownState, OperatorDoing, OperatorFinished, OperatorTimeOut}
for _, s := range states {
data, err := json.Marshal(s)
c.Assert(err, IsNil)
var newState OperatorState
err = json.Unmarshal(data, &newState)
c.Assert(err, IsNil)
c.Assert(newState, Equals, s)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use an invalid marshaled data to check Unmarshal.