diff --git a/pkg/mock/mockcluster/config.go b/pkg/mock/mockcluster/config.go index 166cc9f4fe8..c41e45f5e77 100644 --- a/pkg/mock/mockcluster/config.go +++ b/pkg/mock/mockcluster/config.go @@ -128,6 +128,11 @@ func (mc *Cluster) SetLocationLabels(v []string) { mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.LocationLabels = v }) } +// SetIsolationLevel updates the IsolationLevel configuration. +func (mc *Cluster) SetIsolationLevel(v string) { + mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.IsolationLevel = v }) +} + func (mc *Cluster) updateScheduleConfig(f func(*config.ScheduleConfig)) { s := mc.GetScheduleConfig().Clone() f(s) diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 64a8918fac9..9cb1f95a7b8 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -755,12 +755,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), - region.GetKeysWritten(), - region.GetBytesRead(), - region.GetKeysRead(), - interval) + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := mc.HotCache.CheckReadPeerSync(peerInfo, region) if item != nil { items = append(items, item) @@ -777,12 +772,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), - region.GetKeysWritten(), - region.GetBytesRead(), - region.GetKeysRead(), - interval) + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := mc.HotCache.CheckWritePeerSync(peerInfo, region) if item != nil { items = append(items, item) @@ -799,12 +789,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics. reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() peer := region.GetLeader() - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), - region.GetKeysWritten(), - region.GetBytesRead(), - region.GetKeysRead(), - interval) + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := mc.HotCache.CheckReadPeerSync(peerInfo, region) if item != nil { items = append(items, item) diff --git a/server/api/region.go b/server/api/region.go index f2ad70c60e9..a1f0490d9d3 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -37,23 +37,78 @@ import ( "go.uber.org/zap" ) +// MetaPeer is api compatible with *metapb.Peer. +type MetaPeer struct { + *metapb.Peer + // RoleName is `Role.String()`. + // Since Role is serialized as int by json by default, + // introducing it will make the output of pd-ctl easier to identify Role. + RoleName string `json:"role_name"` + // IsLearner is `Role == "Learner"`. + // Since IsLearner was changed to Role in kvproto in 5.0, this field was introduced to ensure api compatibility. + IsLearner bool `json:"is_learner,omitempty"` +} + +// PDPeerStats is api compatible with *pdpb.PeerStats. +type PDPeerStats struct { + *pdpb.PeerStats + Peer MetaPeer `json:"peer"` +} + +func fromPeer(peer *metapb.Peer) MetaPeer { + return MetaPeer{ + Peer: peer, + RoleName: peer.GetRole().String(), + IsLearner: core.IsLearner(peer), + } +} + +func fromPeerSlice(peers []*metapb.Peer) []MetaPeer { + if peers == nil { + return nil + } + slice := make([]MetaPeer, len(peers)) + for i, peer := range peers { + slice[i] = fromPeer(peer) + } + return slice +} + +func fromPeerStats(peer *pdpb.PeerStats) PDPeerStats { + return PDPeerStats{ + PeerStats: peer, + Peer: fromPeer(peer.Peer), + } +} + +func fromPeerStatsSlice(peers []*pdpb.PeerStats) []PDPeerStats { + if peers == nil { + return nil + } + slice := make([]PDPeerStats, len(peers)) + for i, peer := range peers { + slice[i] = fromPeerStats(peer) + } + return slice +} + // RegionInfo records detail region info for api usage. type RegionInfo struct { ID uint64 `json:"id"` StartKey string `json:"start_key"` EndKey string `json:"end_key"` RegionEpoch *metapb.RegionEpoch `json:"epoch,omitempty"` - Peers []*metapb.Peer `json:"peers,omitempty"` - - Leader *metapb.Peer `json:"leader,omitempty"` - DownPeers []*pdpb.PeerStats `json:"down_peers,omitempty"` - PendingPeers []*metapb.Peer `json:"pending_peers,omitempty"` - WrittenBytes uint64 `json:"written_bytes"` - ReadBytes uint64 `json:"read_bytes"` - WrittenKeys uint64 `json:"written_keys"` - ReadKeys uint64 `json:"read_keys"` - ApproximateSize int64 `json:"approximate_size"` - ApproximateKeys int64 `json:"approximate_keys"` + Peers []MetaPeer `json:"peers,omitempty"` + + Leader MetaPeer `json:"leader,omitempty"` + DownPeers []PDPeerStats `json:"down_peers,omitempty"` + PendingPeers []MetaPeer `json:"pending_peers,omitempty"` + WrittenBytes uint64 `json:"written_bytes"` + ReadBytes uint64 `json:"read_bytes"` + WrittenKeys uint64 `json:"written_keys"` + ReadKeys uint64 `json:"read_keys"` + ApproximateSize int64 `json:"approximate_size"` + ApproximateKeys int64 `json:"approximate_keys"` ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` } @@ -69,7 +124,7 @@ func fromPBReplicationStatus(s *replication_modepb.RegionReplicationStatus) *Rep return nil } return &ReplicationStatus{ - State: replication_modepb.RegionReplicationState_name[int32(s.GetState())], + State: s.GetState().String(), StateID: s.GetStateId(), } } @@ -89,10 +144,10 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo { s.StartKey = core.HexRegionKeyStr(r.GetStartKey()) s.EndKey = core.HexRegionKeyStr(r.GetEndKey()) s.RegionEpoch = r.GetRegionEpoch() - s.Peers = r.GetPeers() - s.Leader = r.GetLeader() - s.DownPeers = r.GetDownPeers() - s.PendingPeers = r.GetPendingPeers() + s.Peers = fromPeerSlice(r.GetPeers()) + s.Leader = fromPeer(r.GetLeader()) + s.DownPeers = fromPeerStatsSlice(r.GetDownPeers()) + s.PendingPeers = fromPeerSlice(r.GetPendingPeers()) s.WrittenBytes = r.GetBytesWritten() s.WrittenKeys = r.GetKeysWritten() s.ReadBytes = r.GetBytesRead() @@ -104,10 +159,26 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo { return s } +// Adjust is only used in testing, in order to compare the data from json deserialization. +func (r *RegionInfo) Adjust() { + for _, peer := range r.DownPeers { + // Since api.PDPeerStats uses the api.MetaPeer type variable Peer to overwrite PeerStats.Peer, + // it needs to be restored after deserialization to be completely consistent with the original. + peer.PeerStats.Peer = peer.Peer.Peer + } +} + // RegionsInfo contains some regions with the detailed region info. type RegionsInfo struct { - Count int `json:"count"` - Regions []*RegionInfo `json:"regions"` + Count int `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// Adjust is only used in testing, in order to compare the data from json deserialization. +func (s *RegionsInfo) Adjust() { + for _, r := range s.Regions { + r.Adjust() + } } type regionHandler struct { @@ -177,17 +248,12 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler { func convertToAPIRegions(regions []*core.RegionInfo) *RegionsInfo { regionInfos := make([]RegionInfo, len(regions)) - regionInfosRefs := make([]*RegionInfo, len(regions)) - - for i := 0; i < len(regions); i++ { - regionInfosRefs[i] = ®ionInfos[i] - } for i, r := range regions { - regionInfosRefs[i] = InitRegion(r, regionInfosRefs[i]) + InitRegion(r, ®ionInfos[i]) } return &RegionsInfo{ Count: len(regions), - Regions: regionInfosRefs, + Regions: regionInfos, } } diff --git a/server/api/region_test.go b/server/api/region_test.go index ecec8dcf489..7920f88d7ae 100644 --- a/server/api/region_test.go +++ b/server/api/region_test.go @@ -31,6 +31,54 @@ import ( "github.com/tikv/pd/server/core" ) +var _ = Suite(&testRegionStructSuite{}) + +type testRegionStructSuite struct{} + +func (s *testRegionStructSuite) TestPeer(c *C) { + peers := []*metapb.Peer{ + {Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, + {Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, + {Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, + {Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, + } + // float64 is the default numeric type for JSON + expected := []map[string]interface{}{ + {"id": float64(1), "store_id": float64(10), "role_name": "Voter"}, + {"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, + {"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, + {"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, + } + + data, err := json.Marshal(fromPeerSlice(peers)) + c.Assert(err, IsNil) + var ret []map[string]interface{} + c.Assert(json.Unmarshal(data, &ret), IsNil) + c.Assert(ret, DeepEquals, expected) +} + +func (s *testRegionStructSuite) TestPeerStats(c *C) { + peers := []*pdpb.PeerStats{ + {Peer: &metapb.Peer{Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, DownSeconds: 0}, + {Peer: &metapb.Peer{Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, DownSeconds: 1}, + {Peer: &metapb.Peer{Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, DownSeconds: 2}, + {Peer: &metapb.Peer{Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, DownSeconds: 3}, + } + // float64 is the default numeric type for JSON + expected := []map[string]interface{}{ + {"peer": map[string]interface{}{"id": float64(1), "store_id": float64(10), "role_name": "Voter"}}, + {"peer": map[string]interface{}{"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, "down_seconds": float64(1)}, + {"peer": map[string]interface{}{"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, "down_seconds": float64(2)}, + {"peer": map[string]interface{}{"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, "down_seconds": float64(3)}, + } + + data, err := json.Marshal(fromPeerStatsSlice(peers)) + c.Assert(err, IsNil) + var ret []map[string]interface{} + c.Assert(json.Unmarshal(data, &ret), IsNil) + c.Assert(ret, DeepEquals, expected) +} + var _ = Suite(&testRegionSuite{}) type testRegionSuite struct { @@ -84,11 +132,10 @@ func (s *testRegionSuite) TestRegion(c *C) { url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID()) r1 := &RegionInfo{} r1m := make(map[string]interface{}) - err := readJSON(testDialClient, url, r1) - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, r1), IsNil) + r1.Adjust() c.Assert(r1, DeepEquals, NewRegionInfo(r)) - err = readJSON(testDialClient, url, &r1m) - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, &r1m), IsNil) c.Assert(r1m["written_bytes"].(float64), Equals, float64(r.GetBytesWritten())) c.Assert(r1m["written_keys"].(float64), Equals, float64(r.GetKeysWritten())) c.Assert(r1m["read_bytes"].(float64), Equals, float64(r.GetBytesRead())) @@ -96,8 +143,8 @@ func (s *testRegionSuite) TestRegion(c *C) { url = fmt.Sprintf("%s/region/key/%s", s.urlPrefix, "a") r2 := &RegionInfo{} - err = readJSON(testDialClient, url, r2) - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, r2), IsNil) + r2.Adjust() c.Assert(r2, DeepEquals, NewRegionInfo(r)) } @@ -108,62 +155,50 @@ func (s *testRegionSuite) TestRegionCheck(c *C) { mustRegionHeartbeat(c, s.svr, r) url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID()) r1 := &RegionInfo{} - err := readJSON(testDialClient, url, r1) - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, r1), IsNil) + r1.Adjust() c.Assert(r1, DeepEquals, NewRegionInfo(r)) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "down-peer") r2 := &RegionsInfo{} - err = readJSON(testDialClient, url, r2) - c.Assert(err, IsNil) - c.Assert(r2, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}}) + c.Assert(readJSON(testDialClient, url, r2), IsNil) + r2.Adjust() + c.Assert(r2, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}}) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "pending-peer") r3 := &RegionsInfo{} - err = readJSON(testDialClient, url, r3) - c.Assert(err, IsNil) - c.Assert(r3, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}}) + c.Assert(readJSON(testDialClient, url, r3), IsNil) + r3.Adjust() + c.Assert(r3, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}}) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "offline-peer") r4 := &RegionsInfo{} - err = readJSON(testDialClient, url, r4) - c.Assert(err, IsNil) - c.Assert(r4, DeepEquals, &RegionsInfo{Count: 0, Regions: []*RegionInfo{}}) + c.Assert(readJSON(testDialClient, url, r4), IsNil) + r4.Adjust() + c.Assert(r4, DeepEquals, &RegionsInfo{Count: 0, Regions: []RegionInfo{}}) r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(c, s.svr, r) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "empty-region") r5 := &RegionsInfo{} - err = readJSON(testDialClient, url, r5) - c.Assert(err, IsNil) - c.Assert(r5, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}}) + c.Assert(readJSON(testDialClient, url, r5), IsNil) + r5.Adjust() + c.Assert(r5, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}}) r = r.Clone(core.SetApproximateSize(1)) mustRegionHeartbeat(c, s.svr, r) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "hist-size") r6 := make([]*histItem, 1) - err = readJSON(testDialClient, url, &r6) - histSizes := make([]*histItem, 1) - histSize := &histItem{} - histSize.Start = 1 - histSize.End = 1 - histSize.Count = 1 - histSizes[0] = histSize - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, &r6), IsNil) + histSizes := []*histItem{{Start: 1, End: 1, Count: 1}} c.Assert(r6, DeepEquals, histSizes) r = r.Clone(core.SetApproximateKeys(1000)) mustRegionHeartbeat(c, s.svr, r) url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "hist-keys") r7 := make([]*histItem, 1) - err = readJSON(testDialClient, url, &r7) - histKeys := make([]*histItem, 1) - histKey := &histItem{} - histKey.Start = 1000 - histKey.End = 1999 - histKey.Count = 1 - histKeys[0] = histKey - c.Assert(err, IsNil) + c.Assert(readJSON(testDialClient, url, &r7), IsNil) + histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}} c.Assert(r7, DeepEquals, histKeys) } @@ -173,9 +208,9 @@ func (s *testRegionSuite) TestRegions(c *C) { newTestRegionInfo(3, 1, []byte("b"), []byte("c")), newTestRegionInfo(4, 2, []byte("c"), []byte("d")), } - regions := make([]*RegionInfo, 0, len(rs)) + regions := make([]RegionInfo, 0, len(rs)) for _, r := range rs { - regions = append(regions, NewRegionInfo(r)) + regions = append(regions, *NewRegionInfo(r)) mustRegionHeartbeat(c, s.svr, r) } url := fmt.Sprintf("%s/regions", s.urlPrefix) diff --git a/server/api/trend.go b/server/api/trend.go index 60357965004..23af6ef328f 100644 --- a/server/api/trend.go +++ b/server/api/trend.go @@ -140,21 +140,21 @@ func (h *trendHandler) getTrendStores() ([]trendStore, error) { LastHeartbeatTS: info.Status.LastHeartbeatTS, Uptime: info.Status.Uptime, } - s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, statistics.RegionReadBytes, store.GetID()) - s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, statistics.RegionWriteBytes, store.GetID()) + s.HotReadFlow, s.HotReadRegionFlows = h.getStoreFlow(readStats, store.GetID()) + s.HotWriteFlow, s.HotWriteRegionFlows = h.getStoreFlow(writeStats, store.GetID()) trendStores = append(trendStores, s) } return trendStores, nil } -func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, regionStatKind statistics.RegionStatKind, storeID uint64) (storeFlow float64, regionFlows []float64) { +func (h *trendHandler) getStoreFlow(stats statistics.StoreHotPeersStat, storeID uint64) (storeByteFlow float64, regionByteFlows []float64) { if stats == nil { return } if stat, ok := stats[storeID]; ok { - storeFlow = stat.TotalLoads[regionStatKind] + storeByteFlow = stat.TotalBytesRate for _, flow := range stat.Stats { - regionFlows = append(regionFlows, flow.GetLoad(regionStatKind)) + regionByteFlows = append(regionByteFlows, flow.ByteRate) } } return diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index b0f7c0b0ca7..9f1b05678ae 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -555,8 +555,13 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { zap.Uint64("store-id", storeID)) continue } - peerInfo := core.NewPeerInfo(peer, 0, 0, - peerStat.GetReadBytes(), peerStat.GetReadKeys(), interval) + loads := []float64{ + statistics.RegionReadBytes: float64(peerStat.GetReadBytes()), + statistics.RegionReadKeys: float64(peerStat.GetReadKeys()), + statistics.RegionWriteBytes: 0, + statistics.RegionWriteKeys: 0, + } + peerInfo := core.NewPeerInfo(peer, loads, interval) item := statistics.NewPeerInfoItem(peerInfo, region) c.hotStat.CheckReadAsync(item) } @@ -588,10 +593,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), region.GetKeysWritten(), - 0, 0, - interval) + peerInfo := core.NewPeerInfo(peer, region.GetWriteLoads(), interval) item := statistics.NewPeerInfoItem(peerInfo, region) c.hotStat.CheckWriteAsync(item) } diff --git a/server/core/kind.go b/server/core/kind.go index a35f6749870..c28de0624c8 100644 --- a/server/core/kind.go +++ b/server/core/kind.go @@ -129,11 +129,3 @@ func StringToKeyType(input string) KeyType { panic("invalid key type: " + input) } } - -// FlowStat indicates the stats of the flow -type FlowStat interface { - GetKeysWritten() uint64 - GetBytesWritten() uint64 - GetBytesRead() uint64 - GetKeysRead() uint64 -} diff --git a/server/core/peer.go b/server/core/peer.go index 0c27624e4e6..59864844913 100644 --- a/server/core/peer.go +++ b/server/core/peer.go @@ -76,52 +76,29 @@ func CountInJointState(peers ...*metapb.Peer) int { // PeerInfo provides peer information type PeerInfo struct { *metapb.Peer - writtenBytes uint64 - writtenKeys uint64 - readBytes uint64 - readKeys uint64 - interval uint64 + loads []float64 + interval uint64 } // NewPeerInfo creates PeerInfo -func NewPeerInfo(meta *metapb.Peer, - writtenBytes, writtenKeys, readBytes, readKeys uint64, - interval uint64) *PeerInfo { +func NewPeerInfo(meta *metapb.Peer, loads []float64, interval uint64) *PeerInfo { return &PeerInfo{ - Peer: meta, - writtenBytes: writtenBytes, - writtenKeys: writtenKeys, - readBytes: readBytes, - readKeys: readKeys, - interval: interval, + Peer: meta, + loads: loads, + interval: interval, } } -// GetKeysWritten provides peer written keys -func (p *PeerInfo) GetKeysWritten() uint64 { - return p.writtenKeys -} - -// GetBytesWritten provides peer written bytes -func (p *PeerInfo) GetBytesWritten() uint64 { - return p.writtenBytes -} - -// GetBytesRead provides peer read bytes -func (p *PeerInfo) GetBytesRead() uint64 { - return p.readBytes -} - -// GetKeysRead provides read keys -func (p *PeerInfo) GetKeysRead() uint64 { - return p.readKeys -} - // GetStoreID provides located storeID func (p *PeerInfo) GetStoreID() uint64 { return p.GetStoreId() } +// GetLoads provides loads +func (p *PeerInfo) GetLoads() []float64 { + return p.loads +} + // GetPeerID provides peer id func (p *PeerInfo) GetPeerID() uint64 { return p.GetId() diff --git a/server/core/region.go b/server/core/region.go index 20dcd92c8fe..81ed7a4bb64 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -459,154 +459,36 @@ func (r *RegionInfo) GetReplicationStatus() *replication_modepb.RegionReplicatio } // regionMap wraps a map[uint64]*core.RegionInfo and supports randomly pick a region. -type regionMap struct { - m map[uint64]*RegionInfo - totalSize int64 - totalKeys int64 -} - -func newRegionMap() *regionMap { - return ®ionMap{ - m: make(map[uint64]*RegionInfo), - } -} - -func (rm *regionMap) Len() int { - if rm == nil { - return 0 - } - return len(rm.m) -} - -func (rm *regionMap) Get(id uint64) *RegionInfo { - if rm == nil { - return nil - } - if r, ok := rm.m[id]; ok { - return r - } - return nil -} - -func (rm *regionMap) Put(region *RegionInfo) { - if old, ok := rm.m[region.GetID()]; ok { - rm.totalSize -= old.approximateSize - rm.totalKeys -= old.approximateKeys - } - rm.m[region.GetID()] = region - rm.totalSize += region.approximateSize - rm.totalKeys += region.approximateKeys -} - -func (rm *regionMap) Delete(id uint64) { - if rm == nil { - return - } - if old, ok := rm.m[id]; ok { - delete(rm.m, id) - rm.totalSize -= old.approximateSize - rm.totalKeys -= old.approximateKeys - } -} +type regionMap map[uint64]*RegionInfo -func (rm *regionMap) TotalSize() int64 { - if rm.Len() == 0 { - return 0 - } - return rm.totalSize +func newRegionMap() regionMap { + return make(map[uint64]*RegionInfo) } -// regionSubTree is used to manager different types of regions. -type regionSubTree struct { - *regionTree - totalSize int64 - totalKeys int64 +func (rm regionMap) Len() int { + return len(rm) } -func newRegionSubTree() *regionSubTree { - return ®ionSubTree{ - regionTree: newRegionTree(), - totalSize: 0, - } +func (rm regionMap) Get(id uint64) *RegionInfo { + return rm[id] } -func (rst *regionSubTree) TotalSize() int64 { - if rst.length() == 0 { - return 0 - } - return rst.totalSize +func (rm regionMap) Put(region *RegionInfo) { + rm[region.GetID()] = region } -func (rst *regionSubTree) scanRanges() []*RegionInfo { - if rst.length() == 0 { - return nil - } - var res []*RegionInfo - rst.scanRange([]byte(""), func(region *RegionInfo) bool { - res = append(res, region) - return true - }) - return res -} - -func (rst *regionSubTree) update(region *RegionInfo) { - overlaps := rst.regionTree.update(region) - rst.totalSize += region.approximateSize - rst.totalKeys += region.approximateKeys - for _, r := range overlaps { - rst.totalSize -= r.approximateSize - rst.totalKeys -= r.approximateKeys - } -} - -func (rst *regionSubTree) remove(region *RegionInfo) { - if rst.length() == 0 { - return - } - if rst.regionTree.remove(region) != nil { - rst.totalSize -= region.approximateSize - rst.totalKeys -= region.approximateKeys - } -} - -func (rst *regionSubTree) length() int { - if rst == nil { - return 0 - } - return rst.regionTree.length() -} - -func (rst *regionSubTree) RandomRegion(ranges []KeyRange) *RegionInfo { - if rst.length() == 0 { - return nil - } - - return rst.regionTree.RandomRegion(ranges) -} - -func (rst *regionSubTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { - if rst.length() == 0 { - return nil - } - - regions := make([]*RegionInfo, 0, n) - - for i := 0; i < n; i++ { - if region := rst.regionTree.RandomRegion(ranges); region != nil { - regions = append(regions, region) - } - } - return regions +func (rm regionMap) Delete(id uint64) { + delete(rm, id) } // RegionsInfo for export type RegionsInfo struct { tree *regionTree - regions *regionMap // regionID -> regionInfo - leaders map[uint64]*regionSubTree // storeID -> regionSubTree - followers map[uint64]*regionSubTree // storeID -> regionSubTree - learners map[uint64]*regionSubTree // storeID -> regionSubTree - pendingPeers map[uint64]*regionSubTree // storeID -> regionSubTree + regions regionMap // regionID -> regionInfo + leaders map[uint64]*regionTree // storeID -> sub regionTree + followers map[uint64]*regionTree // storeID -> sub regionTree + learners map[uint64]*regionTree // storeID -> sub regionTree + pendingPeers map[uint64]*regionTree // storeID -> sub regionTree } // NewRegionsInfo creates RegionsInfo with tree, regions, leaders and followers @@ -614,10 +496,10 @@ func NewRegionsInfo() *RegionsInfo { return &RegionsInfo{ tree: newRegionTree(), regions: newRegionMap(), - leaders: make(map[uint64]*regionSubTree), - followers: make(map[uint64]*regionSubTree), - learners: make(map[uint64]*regionSubTree), - pendingPeers: make(map[uint64]*regionSubTree), + leaders: make(map[uint64]*regionTree), + followers: make(map[uint64]*regionTree), + learners: make(map[uint64]*regionTree), + pendingPeers: make(map[uint64]*regionTree), } } @@ -643,13 +525,13 @@ func (r *RegionsInfo) SetRegion(region *RegionInfo) []*RegionInfo { return r.AddRegion(region) } -// Length returns the RegionsInfo length -func (r *RegionsInfo) Length() int { +// Len returns the RegionsInfo length +func (r *RegionsInfo) Len() int { return r.regions.Len() } -// TreeLength returns the RegionsInfo tree length(now only used in test) -func (r *RegionsInfo) TreeLength() int { +// TreeLen returns the RegionsInfo tree length(now only used in test) +func (r *RegionsInfo) TreeLen() int { return r.tree.length() } @@ -670,6 +552,7 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { if bytes.Equal(regionOld.region.GetStartKey(), region.GetStartKey()) && bytes.Equal(regionOld.region.GetEndKey(), region.GetEndKey()) && regionOld.region.GetID() == region.GetID() { + r.tree.updateStat(regionOld.region, region) regionOld.region = region treeNeedAdd = false } @@ -692,7 +575,7 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { // Add leader peer to leaders. store, ok := r.leaders[storeID] if !ok { - store = newRegionSubTree() + store = newRegionTree() r.leaders[storeID] = store } store.update(region) @@ -700,7 +583,7 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { // Add follower peer to followers. store, ok := r.followers[storeID] if !ok { - store = newRegionSubTree() + store = newRegionTree() r.followers[storeID] = store } store.update(region) @@ -712,7 +595,7 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { storeID := peer.GetStoreId() store, ok := r.learners[storeID] if !ok { - store = newRegionSubTree() + store = newRegionTree() r.learners[storeID] = store } store.update(region) @@ -722,7 +605,7 @@ func (r *RegionsInfo) AddRegion(region *RegionInfo) []*RegionInfo { storeID := peer.GetStoreId() store, ok := r.pendingPeers[storeID] if !ok { - store = newRegionSubTree() + store = newRegionTree() r.pendingPeers[storeID] = store } store.update(region) @@ -841,7 +724,7 @@ func (r *RegionsInfo) SearchPrevRegion(regionKey []byte) *RegionInfo { // GetRegions gets all RegionInfo from regionMap func (r *RegionsInfo) GetRegions() []*RegionInfo { regions := make([]*RegionInfo, 0, r.regions.Len()) - for _, region := range r.regions.m { + for _, region := range r.regions { regions = append(regions, region) } return regions @@ -885,7 +768,7 @@ func (r *RegionsInfo) GetStoreRegionSize(storeID uint64) int64 { // GetMetaRegions gets a set of metapb.Region from regionMap func (r *RegionsInfo) GetMetaRegions() []*metapb.Region { regions := make([]*metapb.Region, 0, r.regions.Len()) - for _, region := range r.regions.m { + for _, region := range r.regions { regions = append(regions, proto.Clone(region.meta).(*metapb.Region)) } return regions @@ -977,6 +860,26 @@ func (r *RegionsInfo) GetFollower(storeID uint64, region *RegionInfo) *RegionInf return nil } +// GetLoads returns loads from region +func (r *RegionInfo) GetLoads() []float64 { + return []float64{ + float64(r.GetBytesRead()), + float64(r.GetKeysRead()), + float64(r.GetBytesWritten()), + float64(r.GetKeysWritten()), + } +} + +// GetWriteLoads returns write loads from region +func (r *RegionInfo) GetWriteLoads() []float64 { + return []float64{ + 0, + 0, + float64(r.GetBytesWritten()), + float64(r.GetKeysWritten()), + } +} + // ScanRange scans regions intersecting [start key, end key), returns at most // `limit` regions. limit <= 0 means no limit. func (r *RegionsInfo) ScanRange(startKey, endKey []byte, limit int) []*RegionInfo { @@ -1016,10 +919,10 @@ func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *Regi // GetAverageRegionSize returns the average region approximate size. func (r *RegionsInfo) GetAverageRegionSize() int64 { - if r.regions.Len() == 0 { + if r.tree.length() == 0 { return 0 } - return r.regions.TotalSize() / int64(r.regions.Len()) + return r.tree.TotalSize() / int64(r.tree.length()) } // DiffRegionPeersInfo return the difference of peers info between two RegionInfo diff --git a/server/core/region_test.go b/server/core/region_test.go index 759ab0f8849..7219bb36702 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -184,10 +184,6 @@ var _ = Suite(&testRegionMapSuite{}) type testRegionMapSuite struct{} func (s *testRegionMapSuite) TestRegionMap(c *C) { - var empty *regionMap - c.Assert(empty.Len(), Equals, 0) - c.Assert(empty.Get(1), IsNil) - rm := newRegionMap() s.check(c, rm) rm.Put(s.regionInfo(1)) @@ -219,7 +215,7 @@ func (s *testRegionMapSuite) regionInfo(id uint64) *RegionInfo { } } -func (s *testRegionMapSuite) check(c *C, rm *regionMap, ids ...uint64) { +func (s *testRegionMapSuite) check(c *C, rm regionMap, ids ...uint64) { // Check Get. for _, id := range ids { c.Assert(rm.Get(id).GetID(), Equals, id) @@ -232,16 +228,10 @@ func (s *testRegionMapSuite) check(c *C, rm *regionMap, ids ...uint64) { expect[id] = struct{}{} } set1 := make(map[uint64]struct{}) - for _, r := range rm.m { + for _, r := range rm { set1[r.GetID()] = struct{}{} } c.Assert(set1, DeepEquals, expect) - // Check region size. - var total int64 - for _, id := range ids { - total += int64(id) - } - c.Assert(rm.TotalSize(), Equals, total) } var _ = Suite(&testRegionKey{}) @@ -349,8 +339,7 @@ func (*testRegionKey) TestSetRegion(c *C) { c.Assert(regions.tree.length(), Equals, 96) c.Assert(len(regions.GetRegions()), Equals, 96) c.Assert(regions.GetRegion(201), NotNil) - c.Assert(regions.regions.totalKeys, Equals, int64(20)) - c.Assert(regions.regions.totalSize, Equals, int64(30)) + c.Assert(regions.tree.TotalSize(), Equals, int64(30)) } func (*testRegionKey) TestShouldRemoveFromSubTree(c *C) { diff --git a/server/core/region_tree.go b/server/core/region_tree.go index f3abef95c14..2e582bf034d 100644 --- a/server/core/region_tree.go +++ b/server/core/region_tree.go @@ -49,15 +49,21 @@ const ( type regionTree struct { tree *btree.BTree + // Statistics + totalSize int64 } func newRegionTree() *regionTree { return ®ionTree{ - tree: btree.New(defaultBTreeDegree), + tree: btree.New(defaultBTreeDegree), + totalSize: 0, } } func (t *regionTree) length() int { + if t == nil { + return 0 + } return t.tree.Len() } @@ -92,13 +98,16 @@ func (t *regionTree) getOverlaps(region *RegionInfo) []*RegionInfo { // It finds and deletes all the overlapped regions first, and then // insert the region. func (t *regionTree) update(region *RegionInfo) []*RegionInfo { + t.totalSize += region.approximateSize + overlaps := t.getOverlaps(region) - for _, item := range overlaps { + for _, old := range overlaps { log.Debug("overlapping region", - zap.Uint64("region-id", item.GetID()), - logutil.ZapRedactStringer("delete-region", RegionToHexMeta(item.GetMeta())), + zap.Uint64("region-id", old.GetID()), + logutil.ZapRedactStringer("delete-region", RegionToHexMeta(old.GetMeta())), logutil.ZapRedactStringer("update-region", RegionToHexMeta(region.GetMeta()))) - t.tree.Delete(®ionItem{item}) + t.tree.Delete(®ionItem{old}) + t.totalSize -= old.approximateSize } t.tree.ReplaceOrInsert(®ionItem{region: region}) @@ -106,6 +115,12 @@ func (t *regionTree) update(region *RegionInfo) []*RegionInfo { return overlaps } +// updateStat is used to update statistics when regionItem.region is directly replaced. +func (t *regionTree) updateStat(origin *RegionInfo, region *RegionInfo) { + t.totalSize += region.approximateSize + t.totalSize -= origin.approximateSize +} + // remove removes a region if the region is in the tree. // It will do nothing if it cannot find the region or the found region // is not the same with the region. @@ -118,6 +133,8 @@ func (t *regionTree) remove(region *RegionInfo) btree.Item { return nil } + t.totalSize -= region.approximateSize + return t.tree.Delete(result) } @@ -180,6 +197,18 @@ func (t *regionTree) scanRange(startKey []byte, f func(*RegionInfo) bool) { }) } +func (t *regionTree) scanRanges() []*RegionInfo { + if t.length() == 0 { + return nil + } + var res []*RegionInfo + t.scanRange([]byte(""), func(region *RegionInfo) bool { + res = append(res, region) + return true + }) + return res +} + func (t *regionTree) getAdjacentRegions(region *RegionInfo) (*regionItem, *regionItem) { item := ®ionItem{region: &RegionInfo{meta: &metapb.Region{StartKey: region.GetStartKey()}}} var prev, next *regionItem @@ -246,6 +275,28 @@ func (t *regionTree) RandomRegion(ranges []KeyRange) *RegionInfo { return nil } +func (t *regionTree) RandomRegions(n int, ranges []KeyRange) []*RegionInfo { + if t.length() == 0 { + return nil + } + + regions := make([]*RegionInfo, 0, n) + + for i := 0; i < n; i++ { + if region := t.RandomRegion(ranges); region != nil { + regions = append(regions, region) + } + } + return regions +} + +func (t *regionTree) TotalSize() int64 { + if t.length() == 0 { + return 0 + } + return t.totalSize +} + func init() { rand.Seed(time.Now().UnixNano()) } diff --git a/server/core/region_tree_test.go b/server/core/region_tree_test.go index 2169e852564..2a3a8ed20ae 100644 --- a/server/core/region_tree_test.go +++ b/server/core/region_tree_test.go @@ -120,36 +120,28 @@ func (s *testRegionSuite) newRegionWithStat(start, end string, size, keys int64) return region } -func (s *testRegionSuite) TestRegionSubTree(c *C) { - tree := newRegionSubTree() +func (s *testRegionSuite) TestRegionTreeStat(c *C) { + tree := newRegionTree() c.Assert(tree.totalSize, Equals, int64(0)) - c.Assert(tree.totalKeys, Equals, int64(0)) tree.update(s.newRegionWithStat("a", "b", 1, 2)) c.Assert(tree.totalSize, Equals, int64(1)) - c.Assert(tree.totalKeys, Equals, int64(2)) tree.update(s.newRegionWithStat("b", "c", 3, 4)) c.Assert(tree.totalSize, Equals, int64(4)) - c.Assert(tree.totalKeys, Equals, int64(6)) tree.update(s.newRegionWithStat("b", "e", 5, 6)) c.Assert(tree.totalSize, Equals, int64(6)) - c.Assert(tree.totalKeys, Equals, int64(8)) tree.remove(s.newRegionWithStat("a", "b", 1, 2)) c.Assert(tree.totalSize, Equals, int64(5)) - c.Assert(tree.totalKeys, Equals, int64(6)) tree.remove(s.newRegionWithStat("f", "g", 1, 2)) c.Assert(tree.totalSize, Equals, int64(5)) - c.Assert(tree.totalKeys, Equals, int64(6)) } -func (s *testRegionSuite) TestRegionSubTreeMerge(c *C) { - tree := newRegionSubTree() +func (s *testRegionSuite) TestRegionTreeMerge(c *C) { + tree := newRegionTree() tree.update(s.newRegionWithStat("a", "b", 1, 2)) tree.update(s.newRegionWithStat("b", "c", 3, 4)) c.Assert(tree.totalSize, Equals, int64(4)) - c.Assert(tree.totalKeys, Equals, int64(6)) tree.update(s.newRegionWithStat("a", "c", 5, 5)) c.Assert(tree.totalSize, Equals, int64(5)) - c.Assert(tree.totalKeys, Equals, int64(5)) } func (s *testRegionSuite) TestRegionTree(c *C) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 8b9cca6729c..f033b35048b 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1311,8 +1311,17 @@ func (s *Server) SyncMaxTS(ctx context.Context, request *pdpb.SyncMaxTSRequest) } syncedDCs = append(syncedDCs, allocator.GetDCLocation()) } - // Found a bigger maxLocalTS, return it directly. - if tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) > 0 { + // Found a bigger or equal maxLocalTS, return it directly. + cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs()) + if cmpResult >= 0 { + // Found an equal maxLocalTS, plus 1 to logical part before returning it. + // For example, we have a Global TSO t1 and a Local TSO t2, they have the + // same physical and logical parts. After being differentiating with suffix, + // there will be (t1.logical << suffixNum + 0) < (t2.logical << suffixNum + N), + // where N is bigger than 0, which will cause a Global TSO fallback than the previous Local TSO. + if cmpResult == 0 { + maxLocalTS.Logical += 1 + } return &pdpb.SyncMaxTSResponse{ Header: s.header(), MaxLocalTs: maxLocalTS, diff --git a/server/schedule/checker/replica_checker.go b/server/schedule/checker/replica_checker.go index ff8227e1648..4f93058d905 100644 --- a/server/schedule/checker/replica_checker.go +++ b/server/schedule/checker/replica_checker.go @@ -237,7 +237,7 @@ func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status } regionStores := r.cluster.GetRegionStores(region) - target := r.strategy(region).SelectStoreToReplace(regionStores, storeID) + target := r.strategy(region).SelectStoreToFix(regionStores, storeID) if target == 0 { reason := fmt.Sprintf("no-store-%s", status) checkerCounter.WithLabelValues("replica_checker", reason).Inc() diff --git a/server/schedule/checker/replica_checker_test.go b/server/schedule/checker/replica_checker_test.go index 38c5259cb6f..e3fa693ee01 100644 --- a/server/schedule/checker/replica_checker_test.go +++ b/server/schedule/checker/replica_checker_test.go @@ -523,3 +523,62 @@ func (s *testReplicaCheckerSuite) TestOpts(c *C) { tc.SetEnableReplaceOfflineReplica(false) c.Assert(rc.Check(region), IsNil) } + +// See issue: https://github.com/tikv/pd/issues/3705 +func (s *testReplicaCheckerSuite) TestFixDownPeer(c *C) { + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(s.ctx, opt) + tc.DisableFeature(versioninfo.JointConsensus) + tc.SetLocationLabels([]string{"zone"}) + rc := NewReplicaChecker(tc, cache.NewDefaultCache(10)) + + tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + tc.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + tc.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + + tc.AddLeaderRegion(1, 1, 3, 4) + region := tc.GetRegion(1) + c.Assert(rc.Check(region), IsNil) + + tc.SetStoreDown(4) + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + {Peer: region.GetStorePeer(4), DownSeconds: 6000}, + })) + testutil.CheckTransferPeer(c, rc.Check(region), operator.OpRegion, 4, 5) + + tc.SetStoreDown(5) + testutil.CheckTransferPeer(c, rc.Check(region), operator.OpRegion, 4, 2) + + tc.SetIsolationLevel("zone") + c.Assert(rc.Check(region), IsNil) +} + +// See issue: https://github.com/tikv/pd/issues/3705 +func (s *testReplicaCheckerSuite) TestFixOfflinePeer(c *C) { + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(s.ctx, opt) + tc.DisableFeature(versioninfo.JointConsensus) + tc.SetLocationLabels([]string{"zone"}) + rc := NewReplicaChecker(tc, cache.NewDefaultCache(10)) + + tc.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + tc.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + tc.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + tc.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + tc.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + + tc.AddLeaderRegion(1, 1, 3, 4) + region := tc.GetRegion(1) + c.Assert(rc.Check(region), IsNil) + + tc.SetStoreOffline(4) + testutil.CheckTransferPeer(c, rc.Check(region), operator.OpRegion, 4, 5) + + tc.SetStoreOffline(5) + testutil.CheckTransferPeer(c, rc.Check(region), operator.OpRegion, 4, 2) + + tc.SetIsolationLevel("zone") + c.Assert(rc.Check(region), IsNil) +} diff --git a/server/schedule/checker/replica_strategy.go b/server/schedule/checker/replica_strategy.go index 52cc24cad4c..1adbf383ad6 100644 --- a/server/schedule/checker/replica_strategy.go +++ b/server/schedule/checker/replica_strategy.go @@ -80,13 +80,12 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e return target.GetID() } -// SelectStoreToReplace returns a store to replace oldStore. The location -// placement after scheduling should be not worse than original. -func (s *ReplicaStrategy) SelectStoreToReplace(coLocationStores []*core.StoreInfo, old uint64) uint64 { +// SelectStoreToFix returns a store to replace down/offline old peer. The location +// placement after scheduling is allowed to be worse than original. +func (s *ReplicaStrategy) SelectStoreToFix(coLocationStores []*core.StoreInfo, old uint64) uint64 { // trick to avoid creating a slice with `old` removed. s.swapStoreToFirst(coLocationStores, old) - safeGuard := filter.NewLocationSafeguard(s.checkerName, s.locationLabels, coLocationStores, s.cluster.GetStore(old)) - return s.SelectStoreToAdd(coLocationStores[1:], safeGuard) + return s.SelectStoreToAdd(coLocationStores[1:]) } // SelectStoreToImprove returns a store to replace oldStore. The location diff --git a/server/schedule/checker/rule_checker.go b/server/schedule/checker/rule_checker.go index 0fb3ad7d3f0..fafaa8b376c 100644 --- a/server/schedule/checker/rule_checker.go +++ b/server/schedule/checker/rule_checker.go @@ -145,7 +145,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, rf *placement.RuleFit func (c *RuleChecker) replaceRulePeer(region *core.RegionInfo, rf *placement.RuleFit, peer *metapb.Peer, status string) (*operator.Operator, error) { ruleStores := c.getRuleFitStores(rf) - store := c.strategy(region, rf.Rule).SelectStoreToReplace(ruleStores, peer.GetStoreId()) + store := c.strategy(region, rf.Rule).SelectStoreToFix(ruleStores, peer.GetStoreId()) if store == 0 { checkerCounter.WithLabelValues("rule_checker", "no-store-replace").Inc() c.regionWaitingList.Put(region.GetID(), nil) diff --git a/server/schedule/checker/rule_checker_test.go b/server/schedule/checker/rule_checker_test.go index a16a15c690b..9dfb04b695e 100644 --- a/server/schedule/checker/rule_checker_test.go +++ b/server/schedule/checker/rule_checker_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/mock/mockcluster" + "github.com/tikv/pd/pkg/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/schedule/operator" @@ -512,3 +513,72 @@ func (s *testRuleCheckerSuite) TestIssue3299(c *C) { } } } + +// See issue: https://github.com/tikv/pd/issues/3705 +func (s *testRuleCheckerSuite) TestFixDownPeer(c *C) { + s.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + s.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + s.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + s.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + s.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + s.cluster.AddLeaderRegion(1, 1, 3, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone"}, + } + s.ruleManager.SetRule(rule) + + region := s.cluster.GetRegion(1) + c.Assert(s.rc.Check(region), IsNil) + + s.cluster.SetStoreDown(4) + region = region.Clone(core.WithDownPeers([]*pdpb.PeerStats{ + {Peer: region.GetStorePeer(4), DownSeconds: 6000}, + })) + testutil.CheckTransferPeer(c, s.rc.Check(region), operator.OpRegion, 4, 5) + + s.cluster.SetStoreDown(5) + testutil.CheckTransferPeer(c, s.rc.Check(region), operator.OpRegion, 4, 2) + + rule.IsolationLevel = "zone" + s.ruleManager.SetRule(rule) + c.Assert(s.rc.Check(region), IsNil) +} + +// See issue: https://github.com/tikv/pd/issues/3705 +func (s *testRuleCheckerSuite) TestFixOfflinePeer(c *C) { + s.cluster.AddLabelsStore(1, 1, map[string]string{"zone": "z1"}) + s.cluster.AddLabelsStore(2, 1, map[string]string{"zone": "z1"}) + s.cluster.AddLabelsStore(3, 1, map[string]string{"zone": "z2"}) + s.cluster.AddLabelsStore(4, 1, map[string]string{"zone": "z3"}) + s.cluster.AddLabelsStore(5, 1, map[string]string{"zone": "z3"}) + s.cluster.AddLeaderRegion(1, 1, 3, 4) + rule := &placement.Rule{ + GroupID: "pd", + ID: "test", + Index: 100, + Override: true, + Role: placement.Voter, + Count: 3, + LocationLabels: []string{"zone"}, + } + s.ruleManager.SetRule(rule) + + region := s.cluster.GetRegion(1) + c.Assert(s.rc.Check(region), IsNil) + + s.cluster.SetStoreOffline(4) + testutil.CheckTransferPeer(c, s.rc.Check(region), operator.OpRegion, 4, 5) + + s.cluster.SetStoreOffline(5) + testutil.CheckTransferPeer(c, s.rc.Check(region), operator.OpRegion, 4, 2) + + rule.IsolationLevel = "zone" + s.ruleManager.SetRule(rule) + c.Assert(s.rc.Check(region), IsNil) +} diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index f2215d2df20..0a262bacd72 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -398,19 +398,55 @@ type storeLoadDetail struct { } func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat { - peers := make([]statistics.HotPeerStat, 0, len(li.HotPeers)) totalLoads := make([]float64, statistics.RegionStatCount) + if len(li.HotPeers) == 0 { + return &statistics.HotPeersStat{ + TotalLoads: totalLoads, + TotalBytesRate: 0.0, + TotalKeysRate: 0.0, + Count: 0, + Stats: make([]statistics.HotPeerStatShow, 0), + } + } + kind := write + if li.HotPeers[0].Kind == statistics.ReadFlow { + kind = read + } + + peers := make([]statistics.HotPeerStatShow, 0, len(li.HotPeers)) for _, peer := range li.HotPeers { if peer.HotDegree > 0 { - peers = append(peers, *peer.Clone()) + peers = append(peers, toHotPeerStatShow(peer, kind)) for i := range totalLoads { totalLoads[i] += peer.GetLoad(statistics.RegionStatKind(i)) } } } + + b, k := getRegionStatKind(kind, statistics.ByteDim), getRegionStatKind(kind, statistics.KeyDim) + byteRate := totalLoads[b] + keyRate := totalLoads[k] + return &statistics.HotPeersStat{ - TotalLoads: totalLoads, - Count: len(peers), - Stats: peers, + TotalLoads: totalLoads, + TotalBytesRate: byteRate, + TotalKeysRate: keyRate, + Count: len(peers), + Stats: peers, + } +} + +func toHotPeerStatShow(p *statistics.HotPeerStat, kind rwType) statistics.HotPeerStatShow { + b, k := getRegionStatKind(kind, statistics.ByteDim), getRegionStatKind(kind, statistics.KeyDim) + byteRate := p.Loads[b] + keyRate := p.Loads[k] + return statistics.HotPeerStatShow{ + StoreID: p.StoreID, + RegionID: p.RegionID, + HotDegree: p.HotDegree, + ByteRate: byteRate, + KeyRate: keyRate, + AntiCount: p.AntiCount, + LastUpdateTime: p.LastUpdateTime, } } diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 5a510ae51f8..004dc299a3b 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -162,7 +162,7 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf return nil } storeID := peer.GetStoreID() - deltaLoads := getFlowDeltaLoads(peer) + deltaLoads := peer.GetLoads() f.collectPeerMetrics(deltaLoads, interval) loads := make([]float64, len(deltaLoads)) for i := range deltaLoads { @@ -462,23 +462,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return newItem } -func getFlowDeltaLoads(stat core.FlowStat) []float64 { - ret := make([]float64, RegionStatCount) - for k := RegionStatKind(0); k < RegionStatCount; k++ { - switch k { - case RegionReadBytes: - ret[k] = float64(stat.GetBytesRead()) - case RegionReadKeys: - ret[k] = float64(stat.GetKeysRead()) - case RegionWriteBytes: - ret[k] = float64(stat.GetBytesWritten()) - case RegionWriteKeys: - ret[k] = float64(stat.GetKeysWritten()) - } - } - return ret -} - func (f *hotPeerCache) putInheritItem(item *HotPeerStat) { f.inheritItem[item.RegionID] = item } diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 324c9dd12ee..57208ad829e 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -110,12 +110,7 @@ func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect i interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.CollectExpiredItems(region)...) for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), - region.GetKeysWritten(), - region.GetBytesRead(), - region.GetKeysRead(), - interval) + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := cache.CheckPeerFlow(peerInfo, region) if item != nil { res = append(res, item) @@ -345,12 +340,7 @@ func BenchmarkCheckRegionFlow(b *testing.B) { core.SetReadKeys(300000*10)) peerInfos := make([]*core.PeerInfo, 0) for _, peer := range newRegion.GetPeers() { - peerInfo := core.NewPeerInfo(peer, - region.GetBytesWritten(), - region.GetKeysWritten(), - region.GetBytesRead(), - region.GetKeysRead(), - 10) + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) peerInfos = append(peerInfos, peerInfo) } b.ResetTimer() diff --git a/server/statistics/hot_regions_stat.go b/server/statistics/hot_regions_stat.go index b48afb28357..314470fad1c 100644 --- a/server/statistics/hot_regions_stat.go +++ b/server/statistics/hot_regions_stat.go @@ -13,9 +13,26 @@ package statistics +import ( + "time" +) + // HotPeersStat records all hot regions statistics type HotPeersStat struct { - TotalLoads []float64 `json:"total_loads"` - Count int `json:"regions_count"` - Stats []HotPeerStat `json:"statistics"` + TotalLoads []float64 `json:"-"` + TotalBytesRate float64 `json:"total_flow_bytes"` + TotalKeysRate float64 `json:"total_flow_keys"` + Count int `json:"regions_count"` + Stats []HotPeerStatShow `json:"statistics"` +} + +// HotPeerStatShow records the hot region statistics for output +type HotPeerStatShow struct { + StoreID uint64 `json:"store_id"` + RegionID uint64 `json:"region_id"` + HotDegree int `json:"hot_degree"` + ByteRate float64 `json:"flow_bytes"` + KeyRate float64 `json:"flow_keys"` + AntiCount int `json:"anti_count"` + LastUpdateTime time.Time `json:"last_update_time"` } diff --git a/server/statistics/kind_test.go b/server/statistics/kind_test.go new file mode 100644 index 00000000000..64104f749c2 --- /dev/null +++ b/server/statistics/kind_test.go @@ -0,0 +1,46 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/server/core" +) + +var _ = Suite(&testRegionInfoSuite{}) + +type testRegionInfoSuite struct{} + +func (s *testRegionInfoSuite) TestGetLoads(c *C) { + regionA := core.NewRegionInfo(&metapb.Region{Id: 100, Peers: []*metapb.Peer{}}, nil, + core.SetReadBytes(1), + core.SetReadKeys(2), + core.SetWrittenBytes(3), + core.SetWrittenKeys(4)) + loads := regionA.GetLoads() + c.Assert(loads, HasLen, int(RegionStatCount)) + c.Assert(float64(regionA.GetBytesRead()), Equals, loads[RegionReadBytes]) + c.Assert(float64(regionA.GetKeysRead()), Equals, loads[RegionReadKeys]) + c.Assert(float64(regionA.GetBytesWritten()), Equals, loads[RegionWriteBytes]) + c.Assert(float64(regionA.GetKeysWritten()), Equals, loads[RegionWriteKeys]) + + loads = regionA.GetWriteLoads() + c.Assert(loads, HasLen, int(RegionStatCount)) + c.Assert(0.0, Equals, loads[RegionReadBytes]) + c.Assert(0.0, Equals, loads[RegionReadKeys]) + c.Assert(float64(regionA.GetBytesWritten()), Equals, loads[RegionWriteBytes]) + c.Assert(float64(regionA.GetKeysWritten()), Equals, loads[RegionWriteKeys]) + +} diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 8adb9bb2515..797b1cbf185 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -178,7 +178,7 @@ func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) ) for i := 0; i < maxRetryCount; i++ { // TODO: add a switch to control whether to enable the MaxTSO estimation. - // 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT if enableGlobalTSOEstimation. + // 1. Estimate a MaxTS among all Local TSO Allocator leaders according to the RTT. estimatedMaxTSO, shouldRetry, err = gta.estimateMaxTS(count, suffixBits) if err != nil { return pdpb.Timestamp{}, err diff --git a/tests/dashboard/service_test.go b/tests/dashboard/service_test.go index 70bc5e11da1..24ff453065e 100644 --- a/tests/dashboard/service_test.go +++ b/tests/dashboard/service_test.go @@ -30,6 +30,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" // Register schedulers. _ "github.com/tikv/pd/server/schedulers" @@ -135,7 +136,7 @@ func (s *dashboardTestSuite) testDashboard(c *C, internalProxy bool) { err = cluster.RunInitialServers() c.Assert(err, IsNil) - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() cluster.WaitLeader() servers := cluster.GetServers() diff --git a/tests/pdctl/cluster/cluster_test.go b/tests/pdctl/cluster/cluster_test.go index 6589f512661..637e9ac8dfd 100644 --- a/tests/pdctl/cluster/cluster_test.go +++ b/tests/pdctl/cluster/cluster_test.go @@ -26,6 +26,7 @@ import ( clusterpkg "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -53,7 +54,7 @@ func (s *clusterTestSuite) TestClusterAndPing(c *C) { pdAddr := cluster.GetConfig().GetClientURL() i := strings.Index(pdAddr, "//") pdAddr = pdAddr[i+2:] - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() defer cluster.Destroy() // cluster @@ -63,8 +64,6 @@ func (s *clusterTestSuite) TestClusterAndPing(c *C) { ci := &metapb.Cluster{} c.Assert(json.Unmarshal(output, ci), IsNil) c.Assert(ci, DeepEquals, cluster.GetCluster()) - echo := pdctl.GetEcho([]string{"-u", pdAddr, "--cacert=ca.pem", "cluster"}) - c.Assert(strings.Contains(echo, "no such file or directory"), IsTrue) // cluster info args = []string{"-u", pdAddr, "cluster"} @@ -94,4 +93,9 @@ func (s *clusterTestSuite) TestClusterAndPing(c *C) { output, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) c.Assert(output, NotNil) + + // does not exist + args = []string{"-u", pdAddr, "--cacert=ca.pem", "cluster"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, ErrorMatches, ".*no such file or directory.*") } diff --git a/tests/pdctl/completion/completion_test.go b/tests/pdctl/completion/completion_test.go index 120cc3f1e87..8aa6f43bec9 100644 --- a/tests/pdctl/completion/completion_test.go +++ b/tests/pdctl/completion/completion_test.go @@ -18,6 +18,7 @@ import ( . "github.com/pingcap/check" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -29,7 +30,7 @@ var _ = Suite(&completionTestSuite{}) type completionTestSuite struct{} func (s *completionTestSuite) TestCompletion(c *C) { - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() // completion command args := []string{"completion", "bash"} diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 64ee95ab258..97aa40d6ad7 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/server/schedule/placement" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -68,7 +69,7 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -245,7 +246,7 @@ func (s *configTestSuite) TestPlacementRules(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -333,7 +334,7 @@ func (s *configTestSuite) TestPlacementRuleGroups(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -397,7 +398,7 @@ func (s *configTestSuite) TestPlacementRuleBundle(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -535,7 +536,7 @@ func (s *configTestSuite) TestReplicationMode(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -591,7 +592,7 @@ func (s *configTestSuite) TestUpdateDefaultReplicaConfig(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, diff --git a/tests/pdctl/health/health_test.go b/tests/pdctl/health/health_test.go index cd09615eb2c..4efa57ea43f 100644 --- a/tests/pdctl/health/health_test.go +++ b/tests/pdctl/health/health_test.go @@ -24,6 +24,7 @@ import ( "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -49,7 +50,7 @@ func (s *healthTestSuite) TestHealth(c *C) { leaderServer := tc.GetServer(tc.GetLeader()) c.Assert(leaderServer.BootstrapCluster(), IsNil) pdAddr := tc.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() defer tc.Destroy() client := tc.GetEtcdClient() diff --git a/tests/pdctl/helper.go b/tests/pdctl/helper.go index e8edf92070e..6a93ff23403 100644 --- a/tests/pdctl/helper.go +++ b/tests/pdctl/helper.go @@ -17,8 +17,6 @@ import ( "bytes" "context" "fmt" - "os" - "path/filepath" "sort" "github.com/gogo/protobuf/proto" @@ -31,46 +29,13 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/versioninfo" "github.com/tikv/pd/tests" - "github.com/tikv/pd/tools/pd-ctl/pdctl" - "github.com/tikv/pd/tools/pd-ctl/pdctl/command" ) -// InitCommand is used to initialize command. -func InitCommand() *cobra.Command { - commandFlags := pdctl.CommandFlags{} - rootCmd := &cobra.Command{} - rootCmd.PersistentFlags().StringVarP(&commandFlags.URL, "pd", "u", "", "") - rootCmd.Flags().StringVar(&commandFlags.CAPath, "cacert", "", "") - rootCmd.Flags().StringVar(&commandFlags.CertPath, "cert", "", "") - rootCmd.Flags().StringVar(&commandFlags.KeyPath, "key", "", "") - rootCmd.AddCommand( - command.NewConfigCommand(), - command.NewRegionCommand(), - command.NewStoreCommand(), - command.NewStoresCommand(), - command.NewMemberCommand(), - command.NewExitCommand(), - command.NewLabelCommand(), - command.NewPingCommand(), - command.NewOperatorCommand(), - command.NewSchedulerCommand(), - command.NewTSOCommand(), - command.NewHotSpotCommand(), - command.NewClusterCommand(), - command.NewHealthCommand(), - command.NewLogCommand(), - command.NewPluginCommand(), - command.NewCompletionCommand(), - ) - return rootCmd -} - // ExecuteCommand is used for test purpose. func ExecuteCommand(root *cobra.Command, args ...string) (output []byte, err error) { buf := new(bytes.Buffer) root.SetOutput(buf) root.SetArgs(args) - err = root.Execute() return buf.Bytes(), err } @@ -93,8 +58,15 @@ func CheckStoresInfo(c *check.C, stores []*api.StoreInfo, want []*metapb.Store) } } +// CheckRegionInfo is used to check the test results. +func CheckRegionInfo(c *check.C, output *api.RegionInfo, expected *core.RegionInfo) { + region := api.NewRegionInfo(expected) + output.Adjust() + c.Assert(output, check.DeepEquals, region) +} + // CheckRegionsInfo is used to check the test results. -func CheckRegionsInfo(c *check.C, output api.RegionsInfo, expected []*core.RegionInfo) { +func CheckRegionsInfo(c *check.C, output *api.RegionsInfo, expected []*core.RegionInfo) { c.Assert(output.Count, check.Equals, len(expected)) got := output.Regions sort.Slice(got, func(i, j int) bool { @@ -104,7 +76,7 @@ func CheckRegionsInfo(c *check.C, output api.RegionsInfo, expected []*core.Regio return expected[i].GetID() < expected[j].GetID() }) for i, region := range expected { - c.Assert(api.NewRegionInfo(region), check.DeepEquals, got[i]) + CheckRegionInfo(c, &got[i], region) } } @@ -138,17 +110,3 @@ func MustPutRegion(c *check.C, cluster *tests.TestCluster, regionID, storeID uin c.Assert(err, check.IsNil) return r } - -// GetEcho is used to get echo from stdout. -func GetEcho(args []string) string { - filename := filepath.Join(os.TempDir(), "stdout") - old := os.Stdout - temp, _ := os.Create(filename) - os.Stdout = temp - pdctl.Start(args) - temp.Close() - os.Stdout = old - out, _ := os.ReadFile(filename) - _ = os.Remove(filename) - return string(out) -} diff --git a/tests/pdctl/hot/hot_test.go b/tests/pdctl/hot/hot_test.go index 6eeff42cecf..8aca38c0910 100644 --- a/tests/pdctl/hot/hot_test.go +++ b/tests/pdctl/hot/hot_test.go @@ -29,6 +29,7 @@ import ( "github.com/tikv/pd/server/statistics" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -53,7 +54,7 @@ func (s *hotTestSuite) TestHot(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, diff --git a/tests/pdctl/label/label_test.go b/tests/pdctl/label/label_test.go index bf2491cc668..204cdcd14df 100644 --- a/tests/pdctl/label/label_test.go +++ b/tests/pdctl/label/label_test.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -50,7 +51,7 @@ func (s *labelTestSuite) TestLabel(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() stores := []*metapb.Store{ { diff --git a/tests/pdctl/log/log_test.go b/tests/pdctl/log/log_test.go index 0bb9e5670de..9bc5711ea50 100644 --- a/tests/pdctl/log/log_test.go +++ b/tests/pdctl/log/log_test.go @@ -23,6 +23,7 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -46,7 +47,7 @@ func (s *logTestSuite) TestLog(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, diff --git a/tests/pdctl/member/member_test.go b/tests/pdctl/member/member_test.go index 06450f40ba4..15c2af0b42d 100644 --- a/tests/pdctl/member/member_test.go +++ b/tests/pdctl/member/member_test.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/server" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -53,7 +54,7 @@ func (s *memberTestSuite) TestMember(c *C) { c.Assert(leaderServer.BootstrapCluster(), IsNil) pdAddr := cluster.GetConfig().GetClientURL() c.Assert(err, IsNil) - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() svr := cluster.GetServer("pd2") id := svr.GetServerID() name := svr.GetServer().Name() diff --git a/tests/pdctl/operator/operator_test.go b/tests/pdctl/operator/operator_test.go index 2f6dfce6bc9..2cb1b21ad0e 100644 --- a/tests/pdctl/operator/operator_test.go +++ b/tests/pdctl/operator/operator_test.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -59,7 +60,7 @@ func (s *operatorTestSuite) TestOperator(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() stores := []*metapb.Store{ { @@ -219,8 +220,9 @@ func (s *operatorTestSuite) TestOperator(c *C) { output, err = pdctl.ExecuteCommand(cmd, "operator", "add", "transfer-region", "1", "2", "leader", "3", "follower") c.Assert(err, IsNil) c.Assert(strings.Contains(string(output), "Success!"), IsTrue) - echo := pdctl.GetEcho([]string{"-u", pdAddr, "operator", "remove", "1"}) - c.Assert(strings.Contains(echo, "Success!"), IsTrue) + output, err = pdctl.ExecuteCommand(cmd, "-u", pdAddr, "operator", "remove", "1") + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "Success!"), IsTrue) _, err = pdctl.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "false") c.Assert(err, IsNil) @@ -237,7 +239,7 @@ func (s *operatorTestSuite) TestOperator(c *C) { c.Assert(strings.Contains(string(output), "scatter-region"), IsTrue) // test echo, as the scatter region result is random, both region 1 and region 3 can be the region to be scattered - echo1 := pdctl.GetEcho([]string{"-u", pdAddr, "operator", "remove", "1"}) - echo2 := pdctl.GetEcho([]string{"-u", pdAddr, "operator", "remove", "3"}) - c.Assert(strings.Contains(echo1, "Success!") || strings.Contains(echo2, "Success!"), IsTrue) + output1, _ := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "operator", "remove", "1") + output2, _ := pdctl.ExecuteCommand(cmd, "-u", pdAddr, "operator", "remove", "3") + c.Assert(strings.Contains(string(output1), "Success!") || strings.Contains(string(output2), "Success!"), IsTrue) } diff --git a/tests/pdctl/region/region_test.go b/tests/pdctl/region/region_test.go index 9786fe0560d..d6abe1b1cdc 100644 --- a/tests/pdctl/region/region_test.go +++ b/tests/pdctl/region/region_test.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/server/core" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -60,8 +61,10 @@ func (s *regionTestSuite) TestRegionKeyFormat(c *C) { c.Assert(leaderServer.BootstrapCluster(), IsNil) pdctl.MustPutStore(c, leaderServer.GetServer(), store) - echo := pdctl.GetEcho([]string{"-u", url, "region", "key", "--format=raw", " "}) - c.Assert(strings.Contains(echo, "unknown flag"), IsFalse) + cmd := pdctlCmd.GetRootCmd() + output, e := pdctl.ExecuteCommand(cmd, "-u", url, "region", "key", "--format=raw", " ") + c.Assert(e, IsNil) + c.Assert(strings.Contains(string(output), "unknown flag"), IsFalse) } func (s *regionTestSuite) TestRegion(c *C) { @@ -73,7 +76,7 @@ func (s *regionTestSuite) TestRegion(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() store := &metapb.Store{ Id: 1, @@ -150,31 +153,31 @@ func (s *regionTestSuite) TestRegion(c *C) { args := append([]string{"-u", pdAddr}, testCase.args...) output, e := pdctl.ExecuteCommand(cmd, args...) c.Assert(e, IsNil) - regionsInfo := api.RegionsInfo{} - c.Assert(json.Unmarshal(output, ®ionsInfo), IsNil) - pdctl.CheckRegionsInfo(c, regionsInfo, testCase.expect) + regions := &api.RegionsInfo{} + c.Assert(json.Unmarshal(output, regions), IsNil) + pdctl.CheckRegionsInfo(c, regions, testCase.expect) } var testRegionCases = []struct { args []string - expect *api.RegionInfo + expect *core.RegionInfo }{ // region command - {[]string{"region", "1"}, api.NewRegionInfo(leaderServer.GetRegionInfoByID(1))}, + {[]string{"region", "1"}, leaderServer.GetRegionInfoByID(1)}, // region key --format=raw command - {[]string{"region", "key", "--format=raw", "b"}, api.NewRegionInfo(r2)}, + {[]string{"region", "key", "--format=raw", "b"}, r2}, // region key --format=hex command - {[]string{"region", "key", "--format=hex", "62"}, api.NewRegionInfo(r2)}, + {[]string{"region", "key", "--format=hex", "62"}, r2}, // issue #2351 - {[]string{"region", "key", "--format=hex", "622f62"}, api.NewRegionInfo(r2)}, + {[]string{"region", "key", "--format=hex", "622f62"}, r2}, } for _, testCase := range testRegionCases { args := append([]string{"-u", pdAddr}, testCase.args...) output, e := pdctl.ExecuteCommand(cmd, args...) c.Assert(e, IsNil) - regionInfo := api.RegionInfo{} - c.Assert(json.Unmarshal(output, ®ionInfo), IsNil) - c.Assert(®ionInfo, DeepEquals, testCase.expect) + region := &api.RegionInfo{} + c.Assert(json.Unmarshal(output, region), IsNil) + pdctl.CheckRegionInfo(c, region, testCase.expect) } } diff --git a/tests/pdctl/scheduler/scheduler_test.go b/tests/pdctl/scheduler/scheduler_test.go index 89925d2b9b6..ef3f9b172d1 100644 --- a/tests/pdctl/scheduler/scheduler_test.go +++ b/tests/pdctl/scheduler/scheduler_test.go @@ -26,6 +26,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -49,7 +50,7 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() stores := []*metapb.Store{ { @@ -74,13 +75,14 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { }, } - mustExec := func(args []string, v interface{}) { + mustExec := func(args []string, v interface{}) string { output, err := pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) if v == nil { - return + return string(output) } c.Assert(json.Unmarshal(output, v), IsNil) + return "" } checkSchedulerCommand := func(args []string, expected map[string]bool) { @@ -239,22 +241,22 @@ func (s *schedulerTestSuite) TestScheduler(c *C) { mustExec([]string{"-u", pdAddr, "scheduler", "config", "shuffle-region-scheduler"}, &roles) c.Assert(roles, DeepEquals, []string{"learner"}) - // test echo - echo := pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}) + // test balance region config + echo := mustExec([]string{"-u", pdAddr, "scheduler", "add", "balance-region-scheduler"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsTrue) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsTrue) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "balance-region-scheduler"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsFalse) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsTrue) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) c.Assert(strings.Contains(echo, "Success!"), IsTrue) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler-1"}, nil) c.Assert(strings.Contains(echo, "404"), IsTrue) // test hot region config - echo = pdctl.GetEcho([]string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}) + echo = mustExec([]string{"-u", pdAddr, "scheduler", "config", "evict-leader-scheduler"}, nil) c.Assert(strings.Contains(echo, "[404] scheduler not found"), IsTrue) var conf map[string]interface{} mustExec([]string{"-u", pdAddr, "scheduler", "config", "balance-hot-region-scheduler", "list"}, &conf) diff --git a/tests/pdctl/store/store_test.go b/tests/pdctl/store/store_test.go index 49d08573fe2..cdd21a2c29c 100644 --- a/tests/pdctl/store/store_test.go +++ b/tests/pdctl/store/store_test.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/server/core/storelimit" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/pdctl" + cmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -50,7 +51,7 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(err, IsNil) cluster.WaitLeader() pdAddr := cluster.GetConfig().GetClientURL() - cmd := pdctl.InitCommand() + cmd := cmd.GetRootCmd() stores := []*metapb.Store{ { @@ -223,17 +224,23 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(strings.Contains(string(output), "rate should be a number that > 0"), IsTrue) // store limit - echo := pdctl.GetEcho([]string{"-u", pdAddr, "store", "limit"}) + args = []string{"-u", pdAddr, "store", "limit"} + output, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + allAddPeerLimit := make(map[string]map[string]interface{}) - json.Unmarshal([]byte(echo), &allAddPeerLimit) + json.Unmarshal([]byte(output), &allAddPeerLimit) c.Assert(allAddPeerLimit["1"]["add-peer"].(float64), Equals, float64(20)) c.Assert(allAddPeerLimit["3"]["add-peer"].(float64), Equals, float64(20)) _, ok := allAddPeerLimit["2"]["add-peer"] c.Assert(ok, Equals, false) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "store", "limit", "remove-peer"}) + args = []string{"-u", pdAddr, "store", "limit", "remove-peer"} + output, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + allRemovePeerLimit := make(map[string]map[string]interface{}) - json.Unmarshal([]byte(echo), &allRemovePeerLimit) + json.Unmarshal([]byte(output), &allRemovePeerLimit) c.Assert(allRemovePeerLimit["1"]["remove-peer"].(float64), Equals, float64(20)) c.Assert(allRemovePeerLimit["3"]["remove-peer"].(float64), Equals, float64(25)) _, ok = allRemovePeerLimit["2"]["add-peer"] @@ -273,12 +280,20 @@ func (s *storeTestSuite) TestStore(c *C) { c.Assert(len([]*api.StoreInfo{storeInfo}), Equals, 1) // It should be called after stores remove-tombstone. - echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit"}) - c.Assert(strings.Contains(echo, "PANIC"), IsFalse) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "remove-peer"}) - c.Assert(strings.Contains(echo, "PANIC"), IsFalse) - echo = pdctl.GetEcho([]string{"-u", pdAddr, "stores", "show", "limit", "add-peer"}) - c.Assert(strings.Contains(echo, "PANIC"), IsFalse) + args = []string{"-u", pdAddr, "stores", "show", "limit"} + output, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "PANIC"), IsFalse) + + args = []string{"-u", pdAddr, "stores", "show", "limit", "remove-peer"} + output, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "PANIC"), IsFalse) + + args = []string{"-u", pdAddr, "stores", "show", "limit", "add-peer"} + output, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(strings.Contains(string(output), "PANIC"), IsFalse) // store limit-scene args = []string{"-u", pdAddr, "store", "limit-scene"} output, err = pdctl.ExecuteCommand(cmd, args...) diff --git a/tests/pdctl/tso/tso_test.go b/tests/pdctl/tso/tso_test.go index d8a537a1b78..f1368773d67 100644 --- a/tests/pdctl/tso/tso_test.go +++ b/tests/pdctl/tso/tso_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/tikv/pd/server" "github.com/tikv/pd/tests/pdctl" + pdctlCmd "github.com/tikv/pd/tools/pd-ctl/pdctl" ) func Test(t *testing.T) { @@ -37,7 +38,7 @@ func (s *tsoTestSuite) SetUpSuite(c *C) { } func (s *tsoTestSuite) TestTSO(c *C) { - cmd := pdctl.InitCommand() + cmd := pdctlCmd.GetRootCmd() const ( physicalShiftBits = 18 diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 105965ac8cc..88bbf318519 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -26,24 +26,7 @@ import ( "github.com/tikv/pd/tools/pd-ctl/pdctl/command" ) -// CommandFlags are flags that used in all Commands -type CommandFlags struct { - URL string - CAPath string - CertPath string - KeyPath string - Help bool - // Deprecated: the default mode is detach mode now. - Detach bool - Interact bool - Version bool -} - var ( - commandFlags = CommandFlags{ - URL: "http://127.0.0.1:2379", - } - readlineCompleter *readline.PrefixCompleter ) @@ -51,30 +34,17 @@ func init() { cobra.EnablePrefixMatching = true } -func pdctlRun(cmd *cobra.Command, args []string) { - if commandFlags.Version { - server.PrintPDInfo() - return - } - if commandFlags.Interact { - loop() - } -} - -func getBasicCmd() *cobra.Command { +// GetRootCmd is exposed for integration tests. But it can be embedded into another suite, too. +func GetRootCmd() *cobra.Command { rootCmd := &cobra.Command{ Use: "pd-ctl", Short: "Placement Driver control", } - rootCmd.PersistentFlags().BoolVarP(&commandFlags.Detach, "detach", "d", true, "Run pdctl without readline.") - rootCmd.PersistentFlags().BoolVarP(&commandFlags.Interact, "interact", "i", false, "Run pdctl with readline.") - rootCmd.PersistentFlags().BoolVarP(&commandFlags.Version, "version", "V", false, "Print version information and exit.") - rootCmd.PersistentFlags().StringVarP(&commandFlags.URL, "pd", "u", commandFlags.URL, "address of pd") - rootCmd.PersistentFlags().StringVar(&commandFlags.CAPath, "cacert", commandFlags.CAPath, "path of file that contains list of trusted SSL CAs") - rootCmd.PersistentFlags().StringVar(&commandFlags.CertPath, "cert", commandFlags.CertPath, "path of file that contains X509 certificate in PEM format") - rootCmd.PersistentFlags().StringVar(&commandFlags.KeyPath, "key", commandFlags.KeyPath, "path of file that contains X509 key in PEM format") - rootCmd.PersistentFlags().BoolVarP(&commandFlags.Help, "help", "h", false, "help message") + rootCmd.PersistentFlags().StringP("pd", "u", "http://127.0.0.1:2379", "address of pd") + rootCmd.PersistentFlags().String("cacert", "", "path of file that contains list of trusted SSL CAs") + rootCmd.PersistentFlags().String("cert", "", "path of file that contains X509 certificate in PEM format") + rootCmd.PersistentFlags().String("key", "", "path of file that contains X509 key in PEM format") rootCmd.AddCommand( command.NewConfigCommand(), @@ -100,70 +70,59 @@ func getBasicCmd() *cobra.Command { rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true rootCmd.SilenceErrors = true - return rootCmd -} - -func getInteractCmd(args []string) *cobra.Command { - rootCmd := getBasicCmd() - - rootCmd.SetArgs(args) - rootCmd.ParseFlags(args) - rootCmd.SetOutput(os.Stdout) - hiddenFlag(rootCmd) - - return rootCmd -} + rootCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { + CAPath, err := cmd.Flags().GetString("cacert") + if err == nil && len(CAPath) != 0 { + certPath, err := cmd.Flags().GetString("cert") + if err != nil { + return err + } -func getMainCmd(args []string) *cobra.Command { - rootCmd := getBasicCmd() - rootCmd.Run = pdctlRun + keyPath, err := cmd.Flags().GetString("key") + if err != nil { + return err + } - rootCmd.SetArgs(args) - rootCmd.ParseFlags(args) - rootCmd.SetOutput(os.Stdout) + if err := command.InitHTTPSClient(CAPath, certPath, keyPath); err != nil { + rootCmd.Println(err) + return err + } + } + return nil + } - readlineCompleter = readline.NewPrefixCompleter(genCompleter(rootCmd)...) - rootCmd.LocalFlags().MarkHidden("detach") return rootCmd } -// Hide the flags in help and usage messages. -func hiddenFlag(cmd *cobra.Command) { - cmd.LocalFlags().MarkHidden("pd") - cmd.LocalFlags().MarkHidden("cacert") - cmd.LocalFlags().MarkHidden("cert") - cmd.LocalFlags().MarkHidden("key") - cmd.LocalFlags().MarkHidden("detach") - cmd.LocalFlags().MarkHidden("interact") - cmd.LocalFlags().MarkHidden("version") -} - // MainStart start main command func MainStart(args []string) { - if err := startCmd(getMainCmd, args); err != nil { - os.Exit(1) - } -} + rootCmd := GetRootCmd() -// Start start interact command -func Start(args []string) { - _ = startCmd(getInteractCmd, args) -} + rootCmd.Flags().BoolP("interact", "i", false, "Run pdctl with readline.") + rootCmd.Flags().BoolP("version", "V", false, "Print version information and exit.") + // TODO: deprecated + rootCmd.Flags().BoolP("detach", "d", true, "Run pdctl without readline.") -func startCmd(getCmd func([]string) *cobra.Command, args []string) error { - rootCmd := getCmd(args) - if len(commandFlags.CAPath) != 0 { - if err := command.InitHTTPSClient(commandFlags.CAPath, commandFlags.CertPath, commandFlags.KeyPath); err != nil { - rootCmd.Println(err) - return err + rootCmd.Run = func(cmd *cobra.Command, args []string) { + if v, err := cmd.Flags().GetBool("version"); err == nil && v { + server.PrintPDInfo() + return + } + if v, err := cmd.Flags().GetBool("interact"); err == nil && v { + loop() } } + rootCmd.SetArgs(args) + rootCmd.ParseFlags(args) + rootCmd.SetOutput(os.Stdout) + + readlineCompleter = readline.NewPrefixCompleter(genCompleter(rootCmd)...) + if err := rootCmd.Execute(); err != nil { rootCmd.Println(err) - return err + os.Exit(1) } - return nil } func loop() { @@ -180,6 +139,9 @@ func loop() { } defer l.Close() + rootCmd := GetRootCmd() + rootCmd.SetOutput(os.Stdout) + for { line, err := l.Readline() if err != nil { @@ -198,7 +160,14 @@ func loop() { fmt.Printf("parse command err: %v\n", err) continue } - Start(args) + + rootCmd.SetArgs(args) + rootCmd.ParseFlags(args) + + if err := rootCmd.Execute(); err != nil { + rootCmd.Println(err) + os.Exit(1) + } } }