Skip to content

Commit

Permalink
Merge branch 'master' into improve_trace_region_flow
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jun 4, 2021
2 parents e1293db + da4cdd4 commit e6f7eec
Show file tree
Hide file tree
Showing 40 changed files with 726 additions and 558 deletions.
5 changes: 5 additions & 0 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (mc *Cluster) SetLocationLabels(v []string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.LocationLabels = v })
}

// SetIsolationLevel updates the IsolationLevel configuration.
func (mc *Cluster) SetIsolationLevel(v string) {
mc.updateReplicationConfig(func(r *config.ReplicationConfig) { r.IsolationLevel = v })
}

func (mc *Cluster) updateScheduleConfig(f func(*config.ScheduleConfig)) {
s := mc.GetScheduleConfig().Clone()
f(s)
Expand Down
21 changes: 3 additions & 18 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,12 +755,7 @@ func (mc *Cluster) CheckRegionRead(region *core.RegionInfo) []*statistics.HotPee
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand All @@ -777,12 +772,7 @@ func (mc *Cluster) CheckRegionWrite(region *core.RegionInfo) []*statistics.HotPe
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckWritePeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand All @@ -799,12 +789,7 @@ func (mc *Cluster) CheckRegionLeaderRead(region *core.RegionInfo) []*statistics.
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
peer := region.GetLeader()
peerInfo := core.NewPeerInfo(peer,
region.GetBytesWritten(),
region.GetKeysWritten(),
region.GetBytesRead(),
region.GetKeysRead(),
interval)
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := mc.HotCache.CheckReadPeerSync(peerInfo, region)
if item != nil {
items = append(items, item)
Expand Down
116 changes: 91 additions & 25 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,78 @@ import (
"go.uber.org/zap"
)

// MetaPeer is api compatible with *metapb.Peer.
type MetaPeer struct {
*metapb.Peer
// RoleName is `Role.String()`.
// Since Role is serialized as int by json by default,
// introducing it will make the output of pd-ctl easier to identify Role.
RoleName string `json:"role_name"`
// IsLearner is `Role == "Learner"`.
// Since IsLearner was changed to Role in kvproto in 5.0, this field was introduced to ensure api compatibility.
IsLearner bool `json:"is_learner,omitempty"`
}

// PDPeerStats is api compatible with *pdpb.PeerStats.
type PDPeerStats struct {
*pdpb.PeerStats
Peer MetaPeer `json:"peer"`
}

func fromPeer(peer *metapb.Peer) MetaPeer {
return MetaPeer{
Peer: peer,
RoleName: peer.GetRole().String(),
IsLearner: core.IsLearner(peer),
}
}

func fromPeerSlice(peers []*metapb.Peer) []MetaPeer {
if peers == nil {
return nil
}
slice := make([]MetaPeer, len(peers))
for i, peer := range peers {
slice[i] = fromPeer(peer)
}
return slice
}

func fromPeerStats(peer *pdpb.PeerStats) PDPeerStats {
return PDPeerStats{
PeerStats: peer,
Peer: fromPeer(peer.Peer),
}
}

func fromPeerStatsSlice(peers []*pdpb.PeerStats) []PDPeerStats {
if peers == nil {
return nil
}
slice := make([]PDPeerStats, len(peers))
for i, peer := range peers {
slice[i] = fromPeerStats(peer)
}
return slice
}

// RegionInfo records detail region info for api usage.
type RegionInfo struct {
ID uint64 `json:"id"`
StartKey string `json:"start_key"`
EndKey string `json:"end_key"`
RegionEpoch *metapb.RegionEpoch `json:"epoch,omitempty"`
Peers []*metapb.Peer `json:"peers,omitempty"`

Leader *metapb.Peer `json:"leader,omitempty"`
DownPeers []*pdpb.PeerStats `json:"down_peers,omitempty"`
PendingPeers []*metapb.Peer `json:"pending_peers,omitempty"`
WrittenBytes uint64 `json:"written_bytes"`
ReadBytes uint64 `json:"read_bytes"`
WrittenKeys uint64 `json:"written_keys"`
ReadKeys uint64 `json:"read_keys"`
ApproximateSize int64 `json:"approximate_size"`
ApproximateKeys int64 `json:"approximate_keys"`
Peers []MetaPeer `json:"peers,omitempty"`

Leader MetaPeer `json:"leader,omitempty"`
DownPeers []PDPeerStats `json:"down_peers,omitempty"`
PendingPeers []MetaPeer `json:"pending_peers,omitempty"`
WrittenBytes uint64 `json:"written_bytes"`
ReadBytes uint64 `json:"read_bytes"`
WrittenKeys uint64 `json:"written_keys"`
ReadKeys uint64 `json:"read_keys"`
ApproximateSize int64 `json:"approximate_size"`
ApproximateKeys int64 `json:"approximate_keys"`

ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"`
}
Expand All @@ -69,7 +124,7 @@ func fromPBReplicationStatus(s *replication_modepb.RegionReplicationStatus) *Rep
return nil
}
return &ReplicationStatus{
State: replication_modepb.RegionReplicationState_name[int32(s.GetState())],
State: s.GetState().String(),
StateID: s.GetStateId(),
}
}
Expand All @@ -89,10 +144,10 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo {
s.StartKey = core.HexRegionKeyStr(r.GetStartKey())
s.EndKey = core.HexRegionKeyStr(r.GetEndKey())
s.RegionEpoch = r.GetRegionEpoch()
s.Peers = r.GetPeers()
s.Leader = r.GetLeader()
s.DownPeers = r.GetDownPeers()
s.PendingPeers = r.GetPendingPeers()
s.Peers = fromPeerSlice(r.GetPeers())
s.Leader = fromPeer(r.GetLeader())
s.DownPeers = fromPeerStatsSlice(r.GetDownPeers())
s.PendingPeers = fromPeerSlice(r.GetPendingPeers())
s.WrittenBytes = r.GetBytesWritten()
s.WrittenKeys = r.GetKeysWritten()
s.ReadBytes = r.GetBytesRead()
Expand All @@ -104,10 +159,26 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo {
return s
}

// Adjust is only used in testing, in order to compare the data from json deserialization.
func (r *RegionInfo) Adjust() {
for _, peer := range r.DownPeers {
// Since api.PDPeerStats uses the api.MetaPeer type variable Peer to overwrite PeerStats.Peer,
// it needs to be restored after deserialization to be completely consistent with the original.
peer.PeerStats.Peer = peer.Peer.Peer
}
}

// RegionsInfo contains some regions with the detailed region info.
type RegionsInfo struct {
Count int `json:"count"`
Regions []*RegionInfo `json:"regions"`
Count int `json:"count"`
Regions []RegionInfo `json:"regions"`
}

// Adjust is only used in testing, in order to compare the data from json deserialization.
func (s *RegionsInfo) Adjust() {
for _, r := range s.Regions {
r.Adjust()
}
}

type regionHandler struct {
Expand Down Expand Up @@ -177,17 +248,12 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler {

func convertToAPIRegions(regions []*core.RegionInfo) *RegionsInfo {
regionInfos := make([]RegionInfo, len(regions))
regionInfosRefs := make([]*RegionInfo, len(regions))

for i := 0; i < len(regions); i++ {
regionInfosRefs[i] = &regionInfos[i]
}
for i, r := range regions {
regionInfosRefs[i] = InitRegion(r, regionInfosRefs[i])
InitRegion(r, &regionInfos[i])
}
return &RegionsInfo{
Count: len(regions),
Regions: regionInfosRefs,
Regions: regionInfos,
}
}

Expand Down
111 changes: 73 additions & 38 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,54 @@ import (
"github.com/tikv/pd/server/core"
)

var _ = Suite(&testRegionStructSuite{})

type testRegionStructSuite struct{}

func (s *testRegionStructSuite) TestPeer(c *C) {
peers := []*metapb.Peer{
{Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter},
{Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner},
{Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter},
{Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter},
}
// float64 is the default numeric type for JSON
expected := []map[string]interface{}{
{"id": float64(1), "store_id": float64(10), "role_name": "Voter"},
{"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true},
{"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"},
{"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"},
}

data, err := json.Marshal(fromPeerSlice(peers))
c.Assert(err, IsNil)
var ret []map[string]interface{}
c.Assert(json.Unmarshal(data, &ret), IsNil)
c.Assert(ret, DeepEquals, expected)
}

func (s *testRegionStructSuite) TestPeerStats(c *C) {
peers := []*pdpb.PeerStats{
{Peer: &metapb.Peer{Id: 1, StoreId: 10, Role: metapb.PeerRole_Voter}, DownSeconds: 0},
{Peer: &metapb.Peer{Id: 2, StoreId: 20, Role: metapb.PeerRole_Learner}, DownSeconds: 1},
{Peer: &metapb.Peer{Id: 3, StoreId: 30, Role: metapb.PeerRole_IncomingVoter}, DownSeconds: 2},
{Peer: &metapb.Peer{Id: 4, StoreId: 40, Role: metapb.PeerRole_DemotingVoter}, DownSeconds: 3},
}
// float64 is the default numeric type for JSON
expected := []map[string]interface{}{
{"peer": map[string]interface{}{"id": float64(1), "store_id": float64(10), "role_name": "Voter"}},
{"peer": map[string]interface{}{"id": float64(2), "store_id": float64(20), "role": float64(1), "role_name": "Learner", "is_learner": true}, "down_seconds": float64(1)},
{"peer": map[string]interface{}{"id": float64(3), "store_id": float64(30), "role": float64(2), "role_name": "IncomingVoter"}, "down_seconds": float64(2)},
{"peer": map[string]interface{}{"id": float64(4), "store_id": float64(40), "role": float64(3), "role_name": "DemotingVoter"}, "down_seconds": float64(3)},
}

data, err := json.Marshal(fromPeerStatsSlice(peers))
c.Assert(err, IsNil)
var ret []map[string]interface{}
c.Assert(json.Unmarshal(data, &ret), IsNil)
c.Assert(ret, DeepEquals, expected)
}

var _ = Suite(&testRegionSuite{})

type testRegionSuite struct {
Expand Down Expand Up @@ -84,20 +132,19 @@ func (s *testRegionSuite) TestRegion(c *C) {
url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID())
r1 := &RegionInfo{}
r1m := make(map[string]interface{})
err := readJSON(testDialClient, url, r1)
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, r1), IsNil)
r1.Adjust()
c.Assert(r1, DeepEquals, NewRegionInfo(r))
err = readJSON(testDialClient, url, &r1m)
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, &r1m), IsNil)
c.Assert(r1m["written_bytes"].(float64), Equals, float64(r.GetBytesWritten()))
c.Assert(r1m["written_keys"].(float64), Equals, float64(r.GetKeysWritten()))
c.Assert(r1m["read_bytes"].(float64), Equals, float64(r.GetBytesRead()))
c.Assert(r1m["read_keys"].(float64), Equals, float64(r.GetKeysRead()))

url = fmt.Sprintf("%s/region/key/%s", s.urlPrefix, "a")
r2 := &RegionInfo{}
err = readJSON(testDialClient, url, r2)
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, r2), IsNil)
r2.Adjust()
c.Assert(r2, DeepEquals, NewRegionInfo(r))
}

Expand All @@ -108,62 +155,50 @@ func (s *testRegionSuite) TestRegionCheck(c *C) {
mustRegionHeartbeat(c, s.svr, r)
url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID())
r1 := &RegionInfo{}
err := readJSON(testDialClient, url, r1)
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, r1), IsNil)
r1.Adjust()
c.Assert(r1, DeepEquals, NewRegionInfo(r))

url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "down-peer")
r2 := &RegionsInfo{}
err = readJSON(testDialClient, url, r2)
c.Assert(err, IsNil)
c.Assert(r2, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}})
c.Assert(readJSON(testDialClient, url, r2), IsNil)
r2.Adjust()
c.Assert(r2, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}})

url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "pending-peer")
r3 := &RegionsInfo{}
err = readJSON(testDialClient, url, r3)
c.Assert(err, IsNil)
c.Assert(r3, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}})
c.Assert(readJSON(testDialClient, url, r3), IsNil)
r3.Adjust()
c.Assert(r3, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}})

url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "offline-peer")
r4 := &RegionsInfo{}
err = readJSON(testDialClient, url, r4)
c.Assert(err, IsNil)
c.Assert(r4, DeepEquals, &RegionsInfo{Count: 0, Regions: []*RegionInfo{}})
c.Assert(readJSON(testDialClient, url, r4), IsNil)
r4.Adjust()
c.Assert(r4, DeepEquals, &RegionsInfo{Count: 0, Regions: []RegionInfo{}})

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(c, s.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "empty-region")
r5 := &RegionsInfo{}
err = readJSON(testDialClient, url, r5)
c.Assert(err, IsNil)
c.Assert(r5, DeepEquals, &RegionsInfo{Count: 1, Regions: []*RegionInfo{NewRegionInfo(r)}})
c.Assert(readJSON(testDialClient, url, r5), IsNil)
r5.Adjust()
c.Assert(r5, DeepEquals, &RegionsInfo{Count: 1, Regions: []RegionInfo{*NewRegionInfo(r)}})

r = r.Clone(core.SetApproximateSize(1))
mustRegionHeartbeat(c, s.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "hist-size")
r6 := make([]*histItem, 1)
err = readJSON(testDialClient, url, &r6)
histSizes := make([]*histItem, 1)
histSize := &histItem{}
histSize.Start = 1
histSize.End = 1
histSize.Count = 1
histSizes[0] = histSize
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, &r6), IsNil)
histSizes := []*histItem{{Start: 1, End: 1, Count: 1}}
c.Assert(r6, DeepEquals, histSizes)

r = r.Clone(core.SetApproximateKeys(1000))
mustRegionHeartbeat(c, s.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", s.urlPrefix, "hist-keys")
r7 := make([]*histItem, 1)
err = readJSON(testDialClient, url, &r7)
histKeys := make([]*histItem, 1)
histKey := &histItem{}
histKey.Start = 1000
histKey.End = 1999
histKey.Count = 1
histKeys[0] = histKey
c.Assert(err, IsNil)
c.Assert(readJSON(testDialClient, url, &r7), IsNil)
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
c.Assert(r7, DeepEquals, histKeys)
}

Expand All @@ -173,9 +208,9 @@ func (s *testRegionSuite) TestRegions(c *C) {
newTestRegionInfo(3, 1, []byte("b"), []byte("c")),
newTestRegionInfo(4, 2, []byte("c"), []byte("d")),
}
regions := make([]*RegionInfo, 0, len(rs))
regions := make([]RegionInfo, 0, len(rs))
for _, r := range rs {
regions = append(regions, NewRegionInfo(r))
regions = append(regions, *NewRegionInfo(r))
mustRegionHeartbeat(c, s.svr, r)
}
url := fmt.Sprintf("%s/regions", s.urlPrefix)
Expand Down
Loading

0 comments on commit e6f7eec

Please sign in to comment.