Skip to content

Commit

Permalink
cluster: update the region cache when flow changed (#1759) (#1741) (#…
Browse files Browse the repository at this point in the history
…1641) (#2103)

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
HunDunDM authored and sre-bot committed Jan 19, 2020
1 parent 488afe4 commit 5cf2711
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 2 deletions.
4 changes: 4 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type RegionInfo struct {
PendingPeers []*metapb.Peer `json:"pending_peers,omitempty"`
WrittenBytes uint64 `json:"written_bytes,omitempty"`
ReadBytes uint64 `json:"read_bytes,omitempty"`
WrittenKeys uint64 `json:"written_keys,omitempty"`
ReadKeys uint64 `json:"read_keys,omitempty"`
ApproximateSize int64 `json:"approximate_size,omitempty"`
ApproximateKeys int64 `json:"approximate_keys,omitempty"`
}
Expand All @@ -64,6 +66,8 @@ func InitRegion(r *core.RegionInfo, s *RegionInfo) *RegionInfo {
s.PendingPeers = r.GetPendingPeers()
s.WrittenBytes = r.GetBytesWritten()
s.ReadBytes = r.GetBytesRead()
s.WrittenKeys = r.GetKeysWritten()
s.ReadKeys = r.GetKeysRead()
s.ApproximateSize = r.GetApproximateSize()
s.ApproximateKeys = r.GetApproximateKeys()

Expand Down
11 changes: 11 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func newTestRegionInfo(regionID, storeID uint64, start, end []byte, opts ...core
newOpts := []core.RegionCreateOption{
core.SetApproximateKeys(10),
core.SetApproximateSize(10),
core.SetWrittenBytes(100 * 1024 * 1024),
core.SetWrittenKeys(1 * 1024 * 1024),
core.SetReadBytes(200 * 1024 * 1024),
core.SetReadKeys(2 * 1024 * 1024),
}
newOpts = append(newOpts, opts...)
region := core.NewRegionInfo(metaRegion, leader, newOpts...)
Expand All @@ -76,9 +80,16 @@ func (s *testRegionSuite) TestRegion(c *C) {
mustRegionHeartbeat(c, s.svr, r)
url := fmt.Sprintf("%s/region/id/%d", s.urlPrefix, r.GetID())
r1 := &RegionInfo{}
r1m := make(map[string]interface{})
err := readJSONWithURL(url, r1)
c.Assert(err, IsNil)
c.Assert(r1, DeepEquals, NewRegionInfo(r))
err = readJSONWithURL(url, &r1m)
c.Assert(err, IsNil)
c.Assert(r1m["written_bytes"].(float64), Equals, float64(r.GetBytesWritten()))
c.Assert(r1m["written_keys"].(float64), Equals, float64(r.GetKeysWritten()))
c.Assert(r1m["read_bytes"].(float64), Equals, float64(r.GetBytesRead()))
c.Assert(r1m["read_keys"].(float64), Equals, float64(r.GetKeysRead()))

url = fmt.Sprintf("%s/region/key/%s", s.urlPrefix, "a")
r2 := &RegionInfo{}
Expand Down
12 changes: 10 additions & 2 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,10 +586,16 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
if len(region.GetPeers()) != len(origin.GetPeers()) {
saveKV, saveCache = true, true
}
if region.GetApproximateSize() != origin.GetApproximateSize() {

if region.GetApproximateSize() != origin.GetApproximateSize() ||
region.GetApproximateKeys() != origin.GetApproximateKeys() {
saveCache = true
}
if region.GetApproximateKeys() != origin.GetApproximateKeys() {

if region.GetBytesWritten() != origin.GetBytesWritten() ||
region.GetBytesRead() != origin.GetBytesRead() ||
region.GetKeysWritten() != origin.GetKeysWritten() ||
region.GetKeysRead() != origin.GetKeysRead() {
saveCache = true
}
}
Expand All @@ -603,6 +609,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
zap.Stringer("region-meta", core.RegionToHexMeta(region.GetMeta())),
zap.Error(err))
}
regionEventCounter.WithLabelValues("update_kv").Inc()
select {
case c.changedRegions <- region:
default:
Expand Down Expand Up @@ -646,6 +653,7 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error {
for _, p := range region.GetPeers() {
c.updateStoreStatusLocked(p.GetStoreId())
}
regionEventCounter.WithLabelValues("update_cache").Inc()
}

if c.regionStats != nil {
Expand Down
43 changes: 43 additions & 0 deletions server/cluster_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

func checkRegion(c *C, a *core.RegionInfo, b *core.RegionInfo) {
c.Assert(a, DeepEquals, b)
c.Assert(a.GetMeta(), DeepEquals, b.GetMeta())
c.Assert(a.GetLeader(), DeepEquals, b.GetLeader())
c.Assert(a.GetPeers(), DeepEquals, b.GetPeers())
Expand Down Expand Up @@ -427,6 +428,48 @@ func (s *testClusterInfoSuite) TestRegionHeartbeat(c *C) {
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
checkRegionsKV(c, cluster.kv, regions[:i+1])

// Change leader.
region = region.Clone(core.WithLeader(region.GetPeers()[1]))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change ApproximateSize.
region = region.Clone(core.SetApproximateSize(144))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change ApproximateKeys.
region = region.Clone(core.SetApproximateKeys(144000))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change bytes written.
region = region.Clone(core.SetWrittenBytes(24000))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys written.
region = region.Clone(core.SetWrittenKeys(240))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change bytes read.
region = region.Clone(core.SetReadBytes(1080000))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])

// Change keys read.
region = region.Clone(core.SetReadKeys(1080))
regions[i] = region
c.Assert(cluster.handleRegionHeartbeat(region), IsNil)
checkRegions(c, cluster.core.Regions, regions[:i+1])
}

regionCounts := make(map[uint64]int)
Expand Down
16 changes: 16 additions & 0 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type RegionInfo struct {
pendingPeers []*metapb.Peer
writtenBytes uint64
readBytes uint64
writtenKeys uint64
readKeys uint64
approximateSize int64
approximateKeys int64
}
Expand Down Expand Up @@ -91,6 +93,8 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest) *RegionInfo {
pendingPeers: heartbeat.GetPendingPeers(),
writtenBytes: heartbeat.GetBytesWritten(),
readBytes: heartbeat.GetBytesRead(),
writtenKeys: heartbeat.GetKeysWritten(),
readKeys: heartbeat.GetKeysRead(),
approximateSize: int64(regionSize),
approximateKeys: int64(heartbeat.GetApproximateKeys()),
}
Expand All @@ -117,6 +121,8 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
pendingPeers: pendingPeers,
writtenBytes: r.writtenBytes,
readBytes: r.readBytes,
writtenKeys: r.writtenKeys,
readKeys: r.readKeys,
approximateSize: r.approximateSize,
approximateKeys: r.approximateKeys,
}
Expand Down Expand Up @@ -332,6 +338,16 @@ func (r *RegionInfo) GetBytesWritten() uint64 {
return r.writtenBytes
}

// GetKeysRead returns the read keys of the region.
func (r *RegionInfo) GetKeysRead() uint64 {
return r.readKeys
}

// GetKeysWritten returns the written keys of the region.
func (r *RegionInfo) GetKeysWritten() uint64 {
return r.writtenKeys
}

// GetLeader returns the leader of the region.
func (r *RegionInfo) GetLeader() *metapb.Peer {
return r.leader
Expand Down
14 changes: 14 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ func SetWrittenBytes(v uint64) RegionCreateOption {
}
}

// SetWrittenKeys sets the written keys for the region.
func SetWrittenKeys(v uint64) RegionCreateOption {
return func(region *RegionInfo) {
region.writtenKeys = v
}
}

// WithRemoveStorePeer removes the specified peer for the region.
func WithRemoveStorePeer(storeID uint64) RegionCreateOption {
return func(region *RegionInfo) {
Expand All @@ -166,6 +173,13 @@ func SetReadBytes(v uint64) RegionCreateOption {
}
}

// SetReadKeys sets the read keys for the region.
func SetReadKeys(v uint64) RegionCreateOption {
return func(region *RegionInfo) {
region.readKeys = v
}
}

// SetApproximateSize sets the approximate size for the region.
func SetApproximateSize(v int64) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
9 changes: 9 additions & 0 deletions server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ var (
Help: "Counter of region hearbeat.",
}, []string{"address", "store", "type", "status"})

regionEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pd",
Subsystem: "cluster",
Name: "region_event",
Help: "Counter of the region event",
}, []string{"event"})

regionHeartbeatLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "pd",
Expand Down Expand Up @@ -131,6 +139,7 @@ func init() {
prometheus.MustRegister(timeJumpBackCounter)
prometheus.MustRegister(schedulerStatusGauge)
prometheus.MustRegister(regionHeartbeatCounter)
prometheus.MustRegister(regionEventCounter)
prometheus.MustRegister(regionHeartbeatLatency)
prometheus.MustRegister(hotSpotStatusGauge)
prometheus.MustRegister(tsoCounter)
Expand Down

0 comments on commit 5cf2711

Please sign in to comment.