Skip to content

Commit

Permalink
This is an automated cherry-pick of tikv#6965
Browse files Browse the repository at this point in the history
close tikv#6962

Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
bufferflies authored and ti-chi-bot committed Sep 1, 2023
1 parent 71e8929 commit 62f8135
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 56 deletions.
108 changes: 56 additions & 52 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -87,26 +88,6 @@ func (s *selectedStores) GetGroupDistribution(group string) (map[uint64]uint64,
return s.getDistributionByGroupLocked(group)
}

// TotalCountByStore counts the total count by store
func (s *selectedStores) TotalCountByStore(storeID uint64) uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
groups := s.groupDistribution.GetAllID()
totalCount := uint64(0)
for _, group := range groups {
storeDistribution, ok := s.getDistributionByGroupLocked(group)
if !ok {
continue
}
count, ok := storeDistribution[storeID]
if !ok {
continue
}
totalCount += count
}
return totalCount
}

// getDistributionByGroupLocked should be called with lock
func (s *selectedStores) getDistributionByGroupLocked(group string) (map[uint64]uint64, bool) {
if result, ok := s.groupDistribution.Get(group); ok {
Expand Down Expand Up @@ -315,6 +296,12 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
selectedStores := make(map[uint64]struct{}, len(region.GetPeers())) // selected StoreID set
leaderCandidateStores := make([]uint64, 0, len(region.GetPeers())) // StoreID allowed to become Leader
scatterWithSameEngine := func(peers map[uint64]*metapb.Peer, context engineContext) { // peers: StoreID -> Peer
filterLen := len(context.filterFuncs) + 2
filters := make([]filter.Filter, filterLen)
for i, filterFunc := range context.filterFuncs {
filters[i] = filterFunc()
}
filters[filterLen-2] = filter.NewExcludedFilter(r.name, nil, selectedStores)
for _, peer := range peers {
if _, ok := selectedStores[peer.GetStoreId()]; ok {
if allowLeader(oldFit, peer) {
Expand All @@ -323,9 +310,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// It is both sourcePeer and targetPeer itself, no need to select.
continue
}
sourceStore := r.cluster.GetStore(peer.GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", peer.GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
continue
}
filters[filterLen-1] = filter.NewPlacementSafeguard(r.name, r.cluster.GetSharedConfig(), r.cluster.GetBasicCluster(), r.cluster.GetRuleManager(), region, sourceStore, oldFit)
for {
candidates := r.selectCandidates(region, oldFit, peer.GetStoreId(), selectedStores, context)
newPeer := r.selectStore(group, peer, peer.GetStoreId(), candidates, context)
newPeer := r.selectNewPeer(context, group, peer, filters)
targetPeers[newPeer.GetStoreId()] = newPeer
selectedStores[newPeer.GetStoreId()] = struct{}{}
// If the selected peer is a peer other than origin peer in this region,
Expand All @@ -346,7 +338,7 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
targetLeader, leaderStorePickedCount := r.selectAvailableLeaderStore(group, region, leaderCandidateStores, r.ordinaryEngine)
if targetLeader == 0 {
scatterCounter.WithLabelValues("no-leader", "").Inc()
return nil
Expand Down Expand Up @@ -381,7 +373,13 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
if op != nil {
scatterCounter.WithLabelValues("success", "").Inc()
r.Put(targetPeers, targetLeader, group)
<<<<<<< HEAD:server/schedule/region_scatterer.go

Check failure on line 376 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: unexpected <<, expecting }
op.SetPriorityLevel(core.High)
=======

Check failure on line 378 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: unexpected ==, expecting }
op.AdditionalInfos["group"] = group
op.AdditionalInfos["leader-picked-count"] = strconv.FormatUint(leaderStorePickedCount, 10)
op.SetPriorityLevel(constant.High)
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer.go

Check failure on line 382 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

invalid character U+0023 '#'
}
return op
}
Expand Down Expand Up @@ -418,6 +416,7 @@ func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.
return region.GetLeader().GetStoreId() == targetLeader
}

<<<<<<< HEAD:server/schedule/region_scatterer.go

Check failure on line 419 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: non-declaration statement outside function body
func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *placement.RegionFit, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
Expand All @@ -432,55 +431,59 @@ func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, oldFit *plac
filters = append(filters, filterFunc())
}
filters = append(filters, scoreGuard)
=======

Check failure on line 434 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: unexpected ==, expecting }
// selectNewPeer return the new peer which pick the fewest picked count.
// it keeps the origin peer if the origin store's pick count is equal the fewest pick.
// it can be diveded into three steps:
// 1. found the max pick count and the min pick count.
// 2. if max pick count equals min pick count, it means all store picked count are some, return the origin peer.
// 3. otherwise, select the store which pick count is the min pick count and pass all filter.
func (r *RegionScatterer) selectNewPeer(context engineContext, group string, peer *metapb.Peer, filters []filter.Filter) *metapb.Peer {
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer.go

Check failure on line 442 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: unexpected >>, expecting }

Check failure on line 442 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

invalid character U+0023 '#'
stores := r.cluster.GetStores()
candidates := make([]uint64, 0)
maxStoreTotalCount := uint64(0)
minStoreTotalCount := uint64(math.MaxUint64)
for _, store := range stores {
count := context.selectedPeer.TotalCountByStore(store.GetID())
count := context.selectedPeer.Get(store.GetID(), group)
if count > maxStoreTotalCount {
maxStoreTotalCount = count
}
if count < minStoreTotalCount {
minStoreTotalCount = count
}
}

var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)

Check failure on line 457 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

syntax error: non-declaration statement outside function body
originStorePickedCount := uint64(math.MaxUint64)
for _, store := range stores {
storeCount := context.selectedPeer.TotalCountByStore(store.GetID())
storeCount := context.selectedPeer.Get(store.GetID(), group)
if store.GetID() == peer.GetId() {
originStorePickedCount = storeCount
}
// If storeCount is equal to the maxStoreTotalCount, we should skip this store as candidate.
// If the storeCount are all the same for the whole cluster(maxStoreTotalCount == minStoreTotalCount), any store
// could be selected as candidate.
if storeCount < maxStoreTotalCount || maxStoreTotalCount == minStoreTotalCount {
<<<<<<< HEAD:server/schedule/region_scatterer.go
if filter.Target(r.cluster.GetOpts(), store, filters) {
candidates = append(candidates, store.GetID())
=======
if filter.Target(r.cluster.GetSharedConfig(), store, filters) {
if storeCount < minCount {
minCount = storeCount
newPeer = &metapb.Peer{
StoreId: store.GetID(),
Role: peer.GetRole(),
}
}
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer.go

Check failure on line 480 in server/schedule/region_scatterer.go

View workflow job for this annotation

GitHub Actions / statics

invalid character U+0023 '#'
}
}
}
return candidates
}

func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceStoreID uint64, candidates []uint64, context engineContext) *metapb.Peer {
if len(candidates) < 1 {
if originStorePickedCount <= minCount {
return peer
}
var newPeer *metapb.Peer
minCount := uint64(math.MaxUint64)
for _, storeID := range candidates {
count := context.selectedPeer.Get(storeID, group)
if count < minCount {
minCount = count
newPeer = &metapb.Peer{
StoreId: storeID,
Role: peer.GetRole(),
}
}
}
// if the source store have the least count, we don't need to scatter this peer
for _, storeID := range candidates {
if storeID == sourceStoreID && context.selectedPeer.Get(sourceStoreID, group) <= minCount {
return peer
}
}
if newPeer == nil {
return peer
}
Expand All @@ -489,11 +492,12 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto

// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level. Please use this func before scatter spacial engines.
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo, leaderCandidateStores []uint64, context engineContext) uint64 {
func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.RegionInfo,
leaderCandidateStores []uint64, context engineContext) (leaderID uint64, leaderStorePickedCount uint64) {
sourceStore := r.cluster.GetStore(region.GetLeader().GetStoreId())
if sourceStore == nil {
log.Error("failed to get the store", zap.Uint64("store-id", region.GetLeader().GetStoreId()), errs.ZapError(errs.ErrGetSourceStore))
return 0
return 0, 0
}
minStoreGroupLeader := uint64(math.MaxUint64)
id := uint64(0)
Expand All @@ -508,7 +512,7 @@ func (r *RegionScatterer) selectAvailableLeaderStore(group string, region *core.
id = storeID
}
}
return id
return id, minStoreGroupLeader
}

// Put put the final distribution in the context no matter the operator was created
Expand Down
22 changes: 18 additions & 4 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"math"
"math/rand"
"strconv"
"sync"
"testing"
Expand Down Expand Up @@ -532,6 +531,7 @@ func TestSelectedStoreGC(t *testing.T) {
re.False(ok)
}

<<<<<<< HEAD:server/schedule/region_scatterer_test.go
// TestRegionFromDifferentGroups test the multi regions. each region have its own group.
// After scatter, the distribution for the whole cluster should be well.
func TestRegionFromDifferentGroups(t *testing.T) {
Expand Down Expand Up @@ -570,11 +570,18 @@ func TestRegionFromDifferentGroups(t *testing.T) {
check(scatterer.ordinaryEngine.selectedPeer)
}

=======
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer_test.go
func TestRegionHasLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<<<<<<< HEAD:server/schedule/region_scatterer_test.go
opt := config.NewTestOptions()
=======
group := "group"
opt := mockconfig.NewTestOptions()
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer_test.go
tc := mockcluster.NewCluster(ctx, opt)
stream := hbstream.NewTestHeartbeatStreams(ctx, tc.ID, tc, false)
oc := NewOperatorController(ctx, tc, stream)
Expand Down Expand Up @@ -616,14 +623,18 @@ func TestRegionHasLearner(t *testing.T) {
scatterer := NewRegionScatterer(ctx, tc, oc)
regionCount := 50
for i := 1; i <= regionCount; i++ {
<<<<<<< HEAD:server/schedule/region_scatterer_test.go
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), "group")
=======
_, err := scatterer.Scatter(tc.AddRegionWithLearner(uint64(i), uint64(1), []uint64{uint64(2), uint64(3)}, []uint64{7}), group, false)
>>>>>>> 72a13c023 (Scatter: make peer scatter logic same with the leader (#6965)):pkg/schedule/scatter/region_scatterer_test.go
re.NoError(err)
}
check := func(ss *selectedStores) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= max; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -638,7 +649,7 @@ func TestRegionHasLearner(t *testing.T) {
max := uint64(0)
min := uint64(math.MaxUint64)
for i := uint64(1); i <= voterCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
if count > max {
max = count
}
Expand All @@ -649,7 +660,7 @@ func TestRegionHasLearner(t *testing.T) {
re.LessOrEqual(max-2, uint64(regionCount)/voterCount)
re.LessOrEqual(min-1, uint64(regionCount)/voterCount)
for i := voterCount + 1; i <= storeCount; i++ {
count := ss.TotalCountByStore(i)
count := ss.Get(i, group)
re.LessOrEqual(count, uint64(0))
}
}
Expand Down Expand Up @@ -690,6 +701,9 @@ func TestSelectedStoresTooFewPeers(t *testing.T) {
region := tc.AddLeaderRegion(i+200, i%3+2, (i+1)%3+2, (i+2)%3+2)
op := scatterer.scatterRegion(region, group)
re.False(isPeerCountChanged(op))
if op != nil {
re.Equal(group, op.AdditionalInfos["group"])
}
}
}

Expand Down

0 comments on commit 62f8135

Please sign in to comment.