Skip to content

Commit

Permalink
copr: cherry pick #28201 to release-5.0-20210923 (#28341)
Browse files Browse the repository at this point in the history
* ddl, executor: fix test race in terror (#25405) (#28198)

* cherry pick #28201 to release-5.0

Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
windtalker authored Sep 26, 2021
1 parent 5bda4a8 commit b917170
Showing 1 changed file with 53 additions and 53 deletions.
106 changes: 53 additions & 53 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ func (rs *batchCopResponse) RespTime() time.Duration {
// if there is only 1 available store, then put the region to the related store
// otherwise, use a greedy algorithm to put it into the store with highest weight
func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTasks []*batchCopTask, mppStoreLastFailTime map[string]time.Time, ttl time.Duration) []*batchCopTask {
if len(originalTasks) <= 1 {
isMPP := mppStoreLastFailTime != nil
// for mpp, we still need to detect the store availability
if len(originalTasks) <= 1 && !isMPP {
return originalTasks
}
isMPP := mppStoreLastFailTime != nil
cache := kvStore.GetRegionCache()
storeTaskMap := make(map[uint64]*batchCopTask)
// storeCandidateRegionMap stores all the possible store->region map. Its content is
Expand Down Expand Up @@ -223,16 +224,28 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
}
}
}
if totalRemainingRegionNum == 0 {
return originalTasks
}

avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if totalRemainingRegionNum > 0 {
avgStorePerRegion := float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
findNextStore := func(candidateStores []uint64) uint64 {
store := uint64(math.MaxUint64)
weightedRegionNum := math.MaxFloat64
if candidateStores != nil {
for _, storeID := range candidateStores {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue
}
Expand All @@ -242,57 +255,44 @@ func balanceBatchCopTask(ctx context.Context, kvStore *tikv.KVStore, originalTas
weightedRegionNum = num
}
}
if store != uint64(math.MaxUint64) {
return store
}
return store
}
for storeID := range storeTaskMap {
if _, validStore := storeCandidateRegionMap[storeID]; !validStore {
continue

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
num := float64(len(storeCandidateRegionMap[storeID]))/avgStorePerRegion + float64(len(storeTaskMap[storeID].regionInfos))
if num < weightedRegionNum {
store = storeID
weightedRegionNum = num
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
}
return store
}

store := findNextStore(nil)
for totalRemainingRegionNum > 0 {
if store == uint64(math.MaxUint64) {
break
}
var key string
var ri tikv.RegionInfo
for key, ri = range storeCandidateRegionMap[store] {
// get the first region
break
}
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
storeTaskMap[store].regionInfos = append(storeTaskMap[store].regionInfos, ri)
totalRemainingRegionNum--
for _, id := range ri.AllStores {
if _, ok := storeCandidateRegionMap[id]; ok {
delete(storeCandidateRegionMap[id], key)
totalRegionCandidateNum--
if len(storeCandidateRegionMap[id]) == 0 {
delete(storeCandidateRegionMap, id)
}
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
}
}
if totalRemainingRegionNum > 0 {
avgStorePerRegion = float64(totalRegionCandidateNum) / float64(totalRemainingRegionNum)
// it is not optimal because we only check the stores that affected by this region, in fact in order
// to find out the store with the lowest weightedRegionNum, all stores should be checked, but I think
// check only the affected stores is more simple and will get a good enough result
store = findNextStore(ri.AllStores)
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}
}
if totalRemainingRegionNum > 0 {
logutil.BgLogger().Warn("Some regions are not used when trying to balance batch cop task, give up balancing")
return originalTasks
}

var ret []*batchCopTask
for _, task := range storeTaskMap {
Expand Down

0 comments on commit b917170

Please sign in to comment.