-
Notifications
You must be signed in to change notification settings - Fork 726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: add hot region schedule #611
Conversation
server/region_cache.go
Outdated
@@ -297,3 +302,107 @@ func (c *fifoCache) len() int { | |||
|
|||
return c.ll.Len() | |||
} | |||
|
|||
type queueCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we abstract this to a LRUCache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be implemented with https://golang.org/pkg/container/heap/#example__intHeap
server/region_cache.go
Outdated
} | ||
|
||
func (q *queueCache) Push(key interface{}, value interface{}) { | ||
q.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer q.Unlock()
server/region_cache.go
Outdated
return q.head == q.tail | ||
} | ||
|
||
func (q *queueCache) Push(key interface{}, value interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need return bool to indicate that if push success?
server/region_cache.go
Outdated
return | ||
} | ||
v := kvItem{key: key, value: value} | ||
q.items[q.tail] = v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need check if q.tail is in not out of range?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I check is it full firstly.
server/region_cache.go
Outdated
return v.value | ||
} | ||
|
||
func (q *queueCache) Delete(key interface{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add bool return value to indicate if deleted or not exist.
server/region_cache.go
Outdated
} | ||
|
||
func (q *queueCache) Delete(key interface{}) { | ||
q.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defer q.Unlock()
server/region_cache.go
Outdated
|
||
func (q *queueCache) Delete(key interface{}) { | ||
q.Lock() | ||
id, ok := q.index[key] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/id/idx
server/balancer.go
Outdated
func (m MetaRegionStatus) Less(i, j int) bool { return m[i].WriteBytes > m[j].WriteBytes } | ||
|
||
type RegionHotStatus struct { | ||
TotalWriteBytes uint64 `json:"total_write"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the difference between TotalWriteBytes
and MetaStatus.WriteBytes
?
server/balancer.go
Outdated
type balanceHotRegionScheduler struct { | ||
opt *scheduleOption | ||
limit uint64 | ||
scoreStatus map[uint64]RegionHotStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comment for this map, what the key is.
func newBalanceHotRegionScheduler(opt *scheduleOption) *balanceHotRegionScheduler { | ||
return &balanceHotRegionScheduler{ | ||
opt: opt, | ||
limit: 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we modify the limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a const var
server/api/router.go
Outdated
@@ -60,6 +60,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { | |||
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET") | |||
router.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET") | |||
|
|||
hotStatusHandler := newHotStatusHandler(svr, rd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hot or hotspot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/api/v1/hotspot/host
/api/v1/hotspot/store?
server/balancer.go
Outdated
items := cluster.writeStatus.GetList() | ||
for _, item := range items { | ||
r := item.value.(RegionStateValue) | ||
if r.UpdateTimes < 5 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this 5 a const?
server/balancer.go
Outdated
|
||
type RegionStateValue struct { | ||
RegionID uint64 `json:"region_id"` | ||
WriteBytes uint64 `json:"write_bytes"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Written
server/region_cache.go
Outdated
@@ -297,3 +302,107 @@ func (c *fifoCache) len() int { | |||
|
|||
return c.ll.Len() | |||
} | |||
|
|||
type queueCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be implemented with https://golang.org/pkg/container/heap/#example__intHeap
server/balancer.go
Outdated
} | ||
|
||
func (l *balanceHotRegionScheduler) clearScore() { | ||
for sid, status := range l.scoreStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer clear the map directly.
server/region_cache.go
Outdated
return q.isFull() | ||
} | ||
|
||
func (q *hashCache) isFull() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why there are two isFull?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isFullLocked
server/region.go
Outdated
@@ -50,6 +51,7 @@ func (r *RegionInfo) clone() *RegionInfo { | |||
Leader: proto.Clone(r.Leader).(*metapb.Peer), | |||
DownPeers: downPeers, | |||
PendingPeers: pendingPeers, | |||
WriteBytes: r.WriteBytes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WrittenBytes
server/coordinator.go
Outdated
Scheduler: s, | ||
opt: c.opt, | ||
limiter: c.limiter, | ||
interval: minInterval, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why there are two interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add comments.
server/cache.go
Outdated
res := make(map[uint64]uint64) | ||
for _, s := range c.getStores() { | ||
res[s.GetId()] = s.status.GetBytesWritten() | ||
res[0] += s.status.GetBytesWritten() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this line. If you need total written bytes, please sum all the element in the map.
server/balancer.go
Outdated
type RegionStateValue struct { | ||
RegionID uint64 `json:"region_id"` | ||
WriteBytes uint64 `json:"write_bytes"` | ||
UpdateTimes int `json:"update_times"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UpdateCount
server/balancer.go
Outdated
UpdateTimes int `json:"update_times"` | ||
LastUpdateTime time.Time `json:"last_update_time"` | ||
StoreID uint64 `json:"-"` | ||
antiTimes int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is antiTimes?
server/region_cache.go
Outdated
@@ -297,3 +297,70 @@ func (c *fifoCache) len() int { | |||
|
|||
return c.ll.Len() | |||
} | |||
|
|||
type hashCache struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please us another PR and add test for this structure
server/region_cache.go
Outdated
q.Lock() | ||
defer q.Unlock() | ||
if q.isFull() { | ||
log.Info("Debug queue is full") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better to return a full error.
Btw, what's Debug mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove those log later. just for observe in test now.
server/region_cache.go
Outdated
return item | ||
} | ||
|
||
func (q *hashCache) GetList() []interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/GetList/List/s
server/coordinator.go
Outdated
if !s.AllowSchedule() { | ||
continue | ||
} | ||
if op := s.Schedule(c.cluster); op != nil { | ||
c.addOperator(op) | ||
} | ||
if s.Scheduler.GetName() == "balance-hot-region-scheduler" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a const var
LGTM. |
PTAL @andelf @zhangjinpeng1987 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…cap/pd into hot-region-schedule
@@ -60,6 +60,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { | |||
router.HandleFunc("/api/v1/labels", labelsHandler.Get).Methods("GET") | |||
router.HandleFunc("/api/v1/labels/stores", labelsHandler.GetStores).Methods("GET") | |||
|
|||
hotStatusHandler := newHotStatusHandler(handler, rd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let pd-ctl support this later.
server/balancer.go
Outdated
status, ok := h.scoreStatus[storeID] | ||
if !ok { | ||
status = &StoreHotRegions{ | ||
RegionsStat: make(RegionsStat, 0, 100), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use const for 100
server/balancer.go
Outdated
// the hottest region in the store not move | ||
// randomly pick a region from 1 .. length-1 | ||
// TODO: consider hot degree when pick | ||
rr := h.r.Int31n(int32(length-1)) + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem you can use Intn(length - 1) + 1
server/balancer.go
Outdated
|
||
avgRegionCount := hotRegionTotalCount / float64(len(h.scoreStatus)) | ||
// Multiplied by 0.75 to avoid transfer back and forth | ||
limit := uint64((float64(s.RegionsStat.Len()) - avgRegionCount) * 0.75) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define a const var for 0.75
Rest LGTM |
server/coordinator.go
Outdated
|
||
writeStatLRUMaxLen = 1000 | ||
storeHotRegionsDefaultLen = 100 | ||
hotRegionLimitCoeff = 0.75 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
factor may be better than coeff?
now, collect region writes status from the heartbeat. randomly select a hot region in best write hot store then do some transfer.
cc @siddontang @disksing @zhangjinpeng1987