Skip to content

Commit

Permalink
*: picks some hot-region improvements to release 3.1 (#2342)
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
Signed-off-by: lhy1024 <[email protected]>
  • Loading branch information
nolouch authored Apr 10, 2020
1 parent d26bf96 commit 8b7475c
Show file tree
Hide file tree
Showing 21 changed files with 1,693 additions and 633 deletions.
18 changes: 3 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,20 @@ require (
github.com/coreos/go-semver v0.2.0
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf
github.com/docker/go-units v0.4.0
github.com/dustin/go-humanize v0.0.0-20180421182945-02af3965c54e // indirect
github.com/eknkc/amber v0.0.0-20171010120322-cdade1c07385 // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20181024230925-c65c006176ff // indirect
github.com/golang/protobuf v1.3.2
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/gorilla/mux v1.7.3
github.com/gorilla/websocket v1.2.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/json-iterator/go v1.1.9 // indirect
github.com/juju/ratelimit v1.0.1
github.com/leodido/go-urn v1.2.0 // indirect
github.com/mattn/go-isatty v0.0.11 // indirect
github.com/mattn/go-shellwords v1.0.3
github.com/montanaflynn/stats v0.0.0-20151014174947-eeaced052adb
github.com/onsi/gomega v1.4.2 // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/phf/go-queue v0.0.0-20170504031614-9abe38d0371d
github.com/pingcap-incubator/tidb-dashboard v0.0.0-20200218115603-7ab5f06db73d
Expand All @@ -44,17 +38,11 @@ require (
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.1
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
github.com/tmc/grpc-websocket-proxy v0.0.0-20171017195756-830351dc03c6 // indirect
github.com/unrolled/render v0.0.0-20171102162132-65450fb6b2d3
github.com/urfave/negroni v0.3.0
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/goleak v0.10.0
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 // indirect
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1 // indirect
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2 // indirect
google.golang.org/grpc v1.25.1
gopkg.in/go-playground/validator.v9 v9.31.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.8 // indirect
)
65 changes: 26 additions & 39 deletions go.sum

Large diffs are not rendered by default.

72 changes: 70 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en
}

// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithReadInfo(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckRead(r, mc.StoresStats)
for _, item := range items {
Expand All @@ -287,9 +292,14 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64,
}

// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckWrite(r, mc.StoresStats)
for _, item := range items {
Expand Down Expand Up @@ -383,11 +393,40 @@ func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio
mc.PutStore(newStore)
}

// UpdateStorageWrittenStats updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadStats updates store written bytes.
func (mc *Cluster) UpdateStorageReadStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesWritten
newStats.KeysRead = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenBytes updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand All @@ -401,6 +440,35 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageWrittenKeys updates store written keys.
func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadKeys updates store read bytes.
func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand Down
41 changes: 40 additions & 1 deletion server/api/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,46 @@ func (s *testScheduleSuite) TestAPI(c *C) {
extraTestFunc func(name string, c *C)
}{
{name: "balance-leader-scheduler"},
{name: "balance-hot-region-scheduler"},
{
name: "balance-hot-region-scheduler",
extraTestFunc: func(name string, c *C) {
resp := make(map[string]interface{})
listURL := fmt.Sprintf("%s%s%s/%s/list", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
c.Assert(readJSON(listURL, &resp), IsNil)
expectMap := map[string]float64{
"min-hot-byte-rate": 100,
"min-hot-key-rate": 10,
"max-zombie-rounds": 3,
"max-peer-number": 1000,
"byte-rate-rank-step-ratio": 0.05,
"key-rate-rank-step-ratio": 0.05,
"count-rank-step-ratio": 0.01,
"great-dec-ratio": 0.95,
"minor-dec-ratio": 0.99,
}
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
dataMap := make(map[string]interface{})
dataMap["max-zombie-rounds"] = 5.0
expectMap["max-zombie-rounds"] = 5.0
updateURL := fmt.Sprintf("%s%s%s/%s/config", s.svr.GetAddr(), apiPrefix, server.SchedulerConfigHandlerPath, name)
body, err := json.Marshal(dataMap)
c.Assert(err, IsNil)
c.Assert(postJSON(updateURL, body), IsNil)
resp = make(map[string]interface{})
c.Assert(readJSON(listURL, &resp), IsNil)
for key := range expectMap {
c.Assert(resp[key], DeepEquals, expectMap[key])
}
// update again
err = postJSON(updateURL, body, func(res []byte, code int) {
c.Assert(string(res), Equals, "no changed")
c.Assert(code, Equals, 200)
})
c.Assert(err, IsNil)
},
},
{name: "balance-region-scheduler"},
{name: "shuffle-leader-scheduler"},
{name: "shuffle-region-scheduler"},
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
"go.uber.org/zap"
)

var backgroundJobInterval = time.Minute
var backgroundJobInterval = 10 * time.Second

const (
clientTimeout = 3 * time.Second
Expand Down
10 changes: 10 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,24 +402,30 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsPeer[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_peer").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_peer").Set(0)
}

stat, ok = status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_written_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_write_region_as_leader").Set(0)
}

infl := pendings[storeID]
// TODO: add to tidb-ansible after merging pending influence into operator influence.
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
}

// Collects hot read region metrics.
Expand All @@ -432,14 +438,18 @@ func (c *coordinator) collectHotSpotMetrics() {
stat, ok := status.AsLeader[storeID]
if ok {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(stat.TotalBytesRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(stat.TotalKeysRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(float64(stat.Count))
} else {
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_bytes_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "total_read_keys_as_leader").Set(0)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "hot_read_region_as_leader").Set(0)
}

infl := pendings[storeID]
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
}
}

Expand Down
Loading

0 comments on commit 8b7475c

Please sign in to comment.