Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/copr: add log for buildBatchCopTasksConsistentHash #41101

Merged
merged 12 commits into from
Feb 9, 2023
71 changes: 64 additions & 7 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,16 @@ func buildBatchCopTasksConsistentHash(
rangesForEachPhysicalTable []*KeyRanges,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
start := time.Now()
const cmdType = tikvrpc.CmdBatchCop
cache := kvStore.GetRegionCache()
fetchTopoBo := backoff.NewBackofferWithVars(ctx, fetchTopoMaxBackoff, nil)

var retryNum int
var rangesLen int
var storesStr []string
var (
retryNum int
rangesLen int
storesStr []string
)

tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)
Expand All @@ -635,7 +638,9 @@ func buildBatchCopTasksConsistentHash(
regionIDs = append(regionIDs, lo.Location.Region)
}
}
splitKeyElapsed := time.Since(start)

fetchTopoStart := time.Now()
for {
retryNum++
// todo: use AssureAndGetTopo() after SNS is done.
Expand All @@ -654,6 +659,7 @@ func buildBatchCopTasksConsistentHash(
}
break
}
fetchTopoElapsed := time.Since(fetchTopoStart)

rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr)
if err != nil {
Expand Down Expand Up @@ -688,6 +694,24 @@ func buildBatchCopTasksConsistentHash(
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(storesStr)))

if log.GetLevel() <= zap.DebugLevel {
debugTaskMap := make(map[string]string, len(taskMap))
for s, b := range taskMap {
debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos)
}
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHash", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", storesStr))
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasksConsistentHash takes too much time",
zap.Duration("total elapsed", elapsed),
zap.Int("retryNum", retryNum),
zap.Duration("splitKeyElapsed", splitKeyElapsed),
zap.Duration("fetchTopoElapsed", fetchTopoElapsed),
zap.Int("range len", rangesLen),
zap.Int("copTaskNum", len(tasks)),
zap.Int("batchCopTaskNum", len(res)))
}
failpointCheckForConsistentHash(res)
return res, nil
}
Expand Down Expand Up @@ -1185,15 +1209,23 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
storeType kv.StoreType,
ttl time.Duration) (res []*batchCopTask, err error) {
const cmdType = tikvrpc.CmdBatchCop
var retryNum int
var (
retryNum int
rangesLen int
copTaskNum int
splitKeyElapsed time.Duration
getStoreElapsed time.Duration
)
cache := kvStore.GetRegionCache()
start := time.Now()

for {
retryNum++
var rangesLen int
rangesLen = 0
tasks := make([]*copTask, 0)
regionIDs := make([]tikv.RegionVerID, 0)

splitKeyStart := time.Now()
for i, ranges := range rangesForEachPhysicalTable {
rangesLen += ranges.Len()
locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit)
Expand All @@ -1211,7 +1243,9 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
regionIDs = append(regionIDs, lo.Location.Region)
}
}
splitKeyElapsed += time.Since(splitKeyStart)

getStoreStart := time.Now()
stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer())
if err != nil {
return nil, err
Expand All @@ -1220,13 +1254,14 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
if len(stores) == 0 {
return nil, errors.New("tiflash_compute node is unavailable")
}
getStoreElapsed = time.Since(getStoreStart)

rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores)
if err != nil {
return nil, err
}
if rpcCtxs == nil {
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD retry because rcpCtx is nil", zap.Int("retryNum", retryNum))
err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer"))
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -1236,6 +1271,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
if len(rpcCtxs) != len(tasks) {
return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks))
}
copTaskNum = len(tasks)
taskMap := make(map[string]*batchCopTask)
for i, rpcCtx := range rpcCtxs {
regionInfo := RegionInfo{
Expand All @@ -1259,10 +1295,31 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer,
res = append(res, batchTask)
}
}
logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores)))
if log.GetLevel() <= zap.DebugLevel {
debugStores := make([]string, 0, len(stores))
for _, s := range stores {
debugStores = append(debugStores, s.GetAddr())
}
debugTaskMap := make(map[string]string, len(taskMap))
for s, b := range taskMap {
debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos)
}
logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHashForPD", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", debugStores))
}
break
}

if elapsed := time.Since(start); elapsed > time.Millisecond*500 {
logutil.BgLogger().Warn("buildBatchCopTasksConsistentHashForPD takes too much time",
zap.Duration("total elapsed", elapsed),
zap.Int("retryNum", retryNum),
zap.Duration("splitKeyElapsed", splitKeyElapsed),
zap.Duration("getStoreElapsed", getStoreElapsed),
zap.Int("range len", rangesLen),
zap.Int("copTaskNum", copTaskNum),
zap.Int("batchCopTaskNum", len(res)))
}
failpointCheckForConsistentHash(res)
return res, nil
}