Skip to content

Commit

Permalink
schedulers/balance_leader: use binary search instead of store in inde…
Browse files Browse the repository at this point in the history
…xMap (#5188)

close #5187

Signed-off-by: shirly <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
  • Loading branch information
AndreMouche and ti-chi-bot authored Jun 24, 2022
1 parent 37cb7e4 commit e0084c5
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 86 deletions.
10 changes: 9 additions & 1 deletion pkg/typeutil/comparison.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package typeutil

import "time"
import (
"math"
"time"
)

// MinUint64 returns the min value between two variables whose type are uint64.
func MinUint64(a, b uint64) uint64 {
Expand Down Expand Up @@ -52,3 +55,8 @@ func StringsEqual(a, b []string) bool {
}
return true
}

// Float64Equal checks if two float64 are equal.
func Float64Equal(a, b float64) bool {
return math.Abs(a-b) <= 1e-6
}
9 changes: 9 additions & 0 deletions pkg/typeutil/comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package typeutil

import (
"math/rand"
"testing"
"time"

Expand Down Expand Up @@ -44,3 +45,11 @@ func TestMinDuration(t *testing.T) {
re.Equal(time.Second, MinDuration(time.Second, time.Minute))
re.Equal(time.Second, MinDuration(time.Second, time.Second))
}

func TestEqualFloat(t *testing.T) {
t.Parallel()
re := require.New(t)
f1 := rand.Float64()
re.True(Float64Equal(f1, f1*1.000))
re.True(Float64Equal(f1, f1/1.000))
}
128 changes: 76 additions & 52 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/reflectutil"
"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/filter"
Expand Down Expand Up @@ -249,20 +250,41 @@ func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) boo
return allowed
}

// candidateStores for balance_leader, order by `getStore` `asc`
type candidateStores struct {
stores []*core.StoreInfo
storeIndexMap map[uint64]int
index int
compareOption func([]*core.StoreInfo) func(int, int) bool
stores []*core.StoreInfo
getScore func(*core.StoreInfo) float64
index int
asc bool
}

func newCandidateStores(stores []*core.StoreInfo, compareOption func([]*core.StoreInfo) func(int, int) bool) *candidateStores {
cs := &candidateStores{stores: stores, compareOption: compareOption}
cs.storeIndexMap = map[uint64]int{}
cs.initSort()
func newCandidateStores(stores []*core.StoreInfo, asc bool, getScore func(*core.StoreInfo) float64) *candidateStores {
cs := &candidateStores{stores: stores, getScore: getScore, asc: asc}
sort.Slice(cs.stores, cs.sortFunc())
return cs
}

func (cs *candidateStores) sortFunc() (less func(int, int) bool) {
less = func(i, j int) bool {
scorei := cs.getScore(cs.stores[i])
scorej := cs.getScore(cs.stores[j])
return cs.less(cs.stores[i].GetID(), scorei, cs.stores[j].GetID(), scorej)
}
return less
}

func (cs *candidateStores) less(iID uint64, scorei float64, jID uint64, scorej float64) bool {
if typeutil.Float64Equal(scorei, scorej) {
// when the stores share the same score, returns the one with the bigger ID,
// Since we assume that the bigger storeID, the newer store(which would be scheduled as soon as possible).
return iID > jID
}
if cs.asc {
return scorei < scorej
}
return scorei > scorej
}

// hasStore returns returns true when there are leftover stores.
func (cs *candidateStores) hasStore() bool {
return cs.index < len(cs.stores)
Expand All @@ -276,23 +298,47 @@ func (cs *candidateStores) next() {
cs.index++
}

func (cs *candidateStores) initSort() {
sort.Slice(cs.stores, cs.compareOption(cs.stores))
for i := 0; i < len(cs.stores); i++ {
cs.storeIndexMap[cs.stores[i].GetID()] = i
func (cs *candidateStores) binarySearch(store *core.StoreInfo) (index int) {
score := cs.getScore(store)
searchFunc := func(i int) bool {
curScore := cs.getScore(cs.stores[i])
return !cs.less(cs.stores[i].GetID(), curScore, store.GetID(), score)
}
return sort.Search(len(cs.stores)-1, searchFunc)
}

func (cs *candidateStores) reSort(stores ...*core.StoreInfo) {
// return the slice of index for the searched stores.
func (cs *candidateStores) binarySearchStores(stores ...*core.StoreInfo) (offsets []int) {
if !cs.hasStore() {
return
}
for _, store := range stores {
index, ok := cs.storeIndexMap[store.GetID()]
if !ok {
continue
index := cs.binarySearch(store)
offsets = append(offsets, index)
}
return offsets
}

// resortStoreWithPos is used to sort stores again after creating an operator.
// It will repeatedly swap the specific store and next store if they are in wrong order.
// In general, it has very few swaps. In the worst case, the time complexity is O(n).
func (cs *candidateStores) resortStoreWithPos(pos int) {
swapper := func(i, j int) { cs.stores[i], cs.stores[j] = cs.stores[j], cs.stores[i] }
score := cs.getScore(cs.stores[pos])
storeID := cs.stores[pos].GetID()
for ; pos+1 < len(cs.stores); pos++ {
curScore := cs.getScore(cs.stores[pos+1])
if cs.less(storeID, score, cs.stores[pos+1].GetID(), curScore) {
break
}
resortStores(cs.stores, cs.storeIndexMap, index, cs.compareOption(cs.stores))
swapper(pos, pos+1)
}
for ; pos > 1; pos-- {
curScore := cs.getScore(cs.stores[pos-1])
if !cs.less(storeID, score, cs.stores[pos-1].GetID(), curScore) {
break
}
swapper(pos, pos-1)
}
}

Expand All @@ -308,24 +354,11 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
plan := newBalancePlan(kind, cluster, opInfluence)

stores := cluster.GetStores()
greaterOption := func(stores []*core.StoreInfo) func(int, int) bool {
return func(i, j int) bool {
iOp := plan.GetOpInfluence(stores[i].GetID())
jOp := plan.GetOpInfluence(stores[j].GetID())
return stores[i].LeaderScore(plan.kind.Policy, iOp) >
stores[j].LeaderScore(plan.kind.Policy, jOp)
}
}
lessOption := func(stores []*core.StoreInfo) func(int, int) bool {
return func(i, j int) bool {
iOp := plan.GetOpInfluence(stores[i].GetID())
jOp := plan.GetOpInfluence(stores[j].GetID())
return stores[i].LeaderScore(plan.kind.Policy, iOp) <
stores[j].LeaderScore(plan.kind.Policy, jOp)
}
scoreFunc := func(store *core.StoreInfo) float64 {
return store.LeaderScore(plan.kind.Policy, plan.GetOpInfluence(store.GetID()))
}
sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()), greaterOption)
targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()), lessOption)
sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()), false, scoreFunc)
targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()), true, scoreFunc)
usedRegions := make(map[uint64]struct{})

result := make([]*operator.Operator, 0, batch)
Expand Down Expand Up @@ -395,26 +428,17 @@ func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLea

func makeInfluence(op *operator.Operator, plan *balancePlan, usedRegions map[uint64]struct{}, candidates ...*candidateStores) {
usedRegions[op.RegionID()] = struct{}{}
schedule.AddOpInfluence(op, plan.opInfluence, plan.Cluster)
for _, candidate := range candidates {
candidate.reSort(plan.source, plan.target)
}
}

// resortStores is used to sort stores again after creating an operator.
// It will repeatedly swap the specific store and next store if they are in wrong order.
// In general, it has very few swaps. In the worst case, the time complexity is O(n).
func resortStores(stores []*core.StoreInfo, index map[uint64]int, pos int, less func(i, j int) bool) {
swapper := func(i, j int) { stores[i], stores[j] = stores[j], stores[i] }
for ; pos+1 < len(stores) && !less(pos, pos+1); pos++ {
swapper(pos, pos+1)
index[stores[pos].GetID()] = pos
candidateUpdateStores := make([][]int, len(candidates))
for id, candidate := range candidates {
storesIDs := candidate.binarySearchStores(plan.source, plan.target)
candidateUpdateStores[id] = storesIDs
}
for ; pos > 1 && less(pos, pos-1); pos-- {
swapper(pos, pos-1)
index[stores[pos].GetID()] = pos
schedule.AddOpInfluence(op, plan.opInfluence, plan.Cluster)
for id, candidate := range candidates {
for _, pos := range candidateUpdateStores[id] {
candidate.resortStoreWithPos(pos)
}
}
index[stores[pos].GetID()] = pos
}

// transferLeaderOut transfers leader from the source store.
Expand Down
39 changes: 39 additions & 0 deletions server/schedulers/balance_leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
package schedulers

import (
"context"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockcluster"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/core"
)

func TestBalanceLeaderSchedulerConfigClone(t *testing.T) {
Expand All @@ -36,3 +41,37 @@ func TestBalanceLeaderSchedulerConfigClone(t *testing.T) {
conf2.Ranges[1] = keyRanges2[1]
re.NotEqual(conf.Ranges, conf2.Ranges)
}

func BenchmarkCandidateStores(b *testing.B) {
ctx := context.Background()
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(ctx, opt)

for id := uint64(1); id < uint64(10000); id++ {
leaderCount := int(rand.Int31n(10000))
tc.AddLeaderStore(id, leaderCount)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
updateAndResortStoresInCandidateStores(tc)
}
}

func updateAndResortStoresInCandidateStores(tc *mockcluster.Cluster) {
deltaMap := make(map[uint64]int64)
getScore := func(store *core.StoreInfo) float64 {
return store.LeaderScore(0, deltaMap[store.GetID()])
}
cs := newCandidateStores(tc.GetStores(), false, getScore)
stores := tc.GetStores()
// update score for store and reorder
for id, store := range stores {
offsets := cs.binarySearchStores(store)
if id%2 == 1 {
deltaMap[store.GetID()] = int64(rand.Int31n(10000))
} else {
deltaMap[store.GetID()] = int64(-rand.Int31n(10000))
}
cs.resortStoreWithPos(offsets[0])
}
}
78 changes: 45 additions & 33 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,43 +620,55 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestReSortStores(c *C) {
s.tc.AddLeaderStore(5, 100)
s.tc.AddLeaderStore(6, 0)
stores := s.tc.Stores.GetStores()
sort.Slice(stores, func(i, j int) bool {
return stores[i].GetID() < stores[j].GetID()
})

deltaMap := make(map[uint64]int64)
less := func(i, j int) bool {
iOp := deltaMap[stores[i].GetID()]
jOp := deltaMap[stores[j].GetID()]
return stores[i].LeaderScore(0, iOp) > stores[j].LeaderScore(0, jOp)
getScore := func(store *core.StoreInfo) float64 {
return store.LeaderScore(0, deltaMap[store.GetID()])
}

sort.Slice(stores, less)
storeIndexMap := map[uint64]int{}
for i := 0; i < len(stores); i++ {
storeIndexMap[stores[i].GetID()] = i
}
c.Assert(stores[0].GetID(), Equals, uint64(1))
c.Assert(storeIndexMap[uint64(1)], Equals, 0)
deltaMap[1] = -1

resortStores(stores, storeIndexMap, storeIndexMap[uint64(1)], less)
c.Assert(stores[0].GetID(), Equals, uint64(1))
c.Assert(storeIndexMap[uint64(1)], Equals, 0)
candidateStores := make([]*core.StoreInfo, 0)
// order by score desc.
cs := newCandidateStores(append(candidateStores, stores...), false, getScore)
// in candidate,the order stores:1(104),5(100),4(100),6,3,2
// store 4 should in pos 2
c.Assert(cs.binarySearch(stores[3]), Equals, 2)

// store 1 should in pos 0
store1 := stores[0]
c.Assert(cs.binarySearch(store1), Equals, 0)
deltaMap[store1.GetID()] = -1 // store 1
cs.resortStoreWithPos(0)
// store 1 should still in pos 0.
c.Assert(cs.stores[0].GetID(), Equals, uint64(1))
curIndx := cs.binarySearch(store1)
c.Assert(0, Equals, curIndx)
deltaMap[1] = -4
resortStores(stores, storeIndexMap, storeIndexMap[uint64(1)], less)
c.Assert(stores[2].GetID(), Equals, uint64(1))
c.Assert(storeIndexMap[uint64(1)], Equals, 2)
topID := stores[0].GetID()
deltaMap[topID] = -1
resortStores(stores, storeIndexMap, storeIndexMap[topID], less)
c.Assert(stores[1].GetID(), Equals, uint64(1))
c.Assert(storeIndexMap[uint64(1)], Equals, 1)
c.Assert(stores[2].GetID(), Equals, topID)
c.Assert(storeIndexMap[topID], Equals, 2)

bottomID := stores[5].GetID()
deltaMap[bottomID] = 4
resortStores(stores, storeIndexMap, storeIndexMap[bottomID], less)
c.Assert(stores[3].GetID(), Equals, bottomID)
c.Assert(storeIndexMap[bottomID], Equals, 3)
// store 1 update the scores to 104-4=100
// the order stores should be:5(100),4(100),1(100),6,3,2
cs.resortStoreWithPos(curIndx)
c.Assert(cs.stores[2].GetID(), Equals, uint64(1))
c.Assert(cs.binarySearch(store1), Equals, 2)
// the top store is : 5(100)
topStore := cs.stores[0]
topStorePos := cs.binarySearch(topStore)
deltaMap[topStore.GetID()] = -1
cs.resortStoreWithPos(topStorePos)

// after recorder, the order stores should be: 4(100),1(100),5(99),6,3,2
c.Assert(cs.stores[1].GetID(), Equals, uint64(1))
c.Assert(cs.binarySearch(store1), Equals, 1)
c.Assert(cs.stores[2].GetID(), Equals, topStore.GetID())
c.Assert(cs.binarySearch(topStore), Equals, 2)

bottomStore := cs.stores[5]
deltaMap[bottomStore.GetID()] = 4
cs.resortStoreWithPos(5)

// the order stores should be: 4(100),1(100),5(99),2(5),6,3
c.Assert(cs.stores[3].GetID(), Equals, bottomStore.GetID())
c.Assert(cs.binarySearch(bottomStore), Equals, 3)
}

var _ = Suite(&testBalanceRegionSchedulerSuite{})
Expand Down

0 comments on commit e0084c5

Please sign in to comment.