diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 99b7a807d52e2..273b97a0c2a13 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -607,13 +607,16 @@ func buildBatchCopTasksConsistentHash( rangesForEachPhysicalTable []*KeyRanges, storeType kv.StoreType, ttl time.Duration) (res []*batchCopTask, err error) { + start := time.Now() const cmdType = tikvrpc.CmdBatchCop cache := kvStore.GetRegionCache() fetchTopoBo := backoff.NewBackofferWithVars(ctx, fetchTopoMaxBackoff, nil) - var retryNum int - var rangesLen int - var storesStr []string + var ( + retryNum int + rangesLen int + storesStr []string + ) tasks := make([]*copTask, 0) regionIDs := make([]tikv.RegionVerID, 0) @@ -635,7 +638,9 @@ func buildBatchCopTasksConsistentHash( regionIDs = append(regionIDs, lo.Location.Region) } } + splitKeyElapsed := time.Since(start) + fetchTopoStart := time.Now() for { retryNum++ // todo: use AssureAndGetTopo() after SNS is done. @@ -654,6 +659,7 @@ func buildBatchCopTasksConsistentHash( } break } + fetchTopoElapsed := time.Since(fetchTopoStart) rpcCtxs, err := getTiFlashComputeRPCContextByConsistentHash(regionIDs, storesStr) if err != nil { @@ -688,6 +694,24 @@ func buildBatchCopTasksConsistentHash( } logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(storesStr))) + if log.GetLevel() <= zap.DebugLevel { + debugTaskMap := make(map[string]string, len(taskMap)) + for s, b := range taskMap { + debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos) + } + logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHash", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", storesStr)) + } + + if elapsed := time.Since(start); elapsed > time.Millisecond*500 { + logutil.BgLogger().Warn("buildBatchCopTasksConsistentHash takes too much time", + zap.Duration("total elapsed", elapsed), + zap.Int("retryNum", retryNum), + zap.Duration("splitKeyElapsed", splitKeyElapsed), + zap.Duration("fetchTopoElapsed", fetchTopoElapsed), + zap.Int("range len", rangesLen), + zap.Int("copTaskNum", len(tasks)), + zap.Int("batchCopTaskNum", len(res))) + } failpointCheckForConsistentHash(res) return res, nil } @@ -1185,15 +1209,23 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, storeType kv.StoreType, ttl time.Duration) (res []*batchCopTask, err error) { const cmdType = tikvrpc.CmdBatchCop - var retryNum int + var ( + retryNum int + rangesLen int + copTaskNum int + splitKeyElapsed time.Duration + getStoreElapsed time.Duration + ) cache := kvStore.GetRegionCache() + start := time.Now() for { retryNum++ - var rangesLen int + rangesLen = 0 tasks := make([]*copTask, 0) regionIDs := make([]tikv.RegionVerID, 0) + splitKeyStart := time.Now() for i, ranges := range rangesForEachPhysicalTable { rangesLen += ranges.Len() locations, err := cache.SplitKeyRangesByLocations(bo, ranges, UnspecifiedLimit) @@ -1211,7 +1243,9 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, regionIDs = append(regionIDs, lo.Location.Region) } } + splitKeyElapsed += time.Since(splitKeyStart) + getStoreStart := time.Now() stores, err := cache.GetTiFlashComputeStores(bo.TiKVBackoffer()) if err != nil { return nil, err @@ -1220,13 +1254,14 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, if len(stores) == 0 { return nil, errors.New("tiflash_compute node is unavailable") } + getStoreElapsed = time.Since(getStoreStart) rpcCtxs, err := cache.GetTiFlashComputeRPCContextByConsistentHash(bo.TiKVBackoffer(), regionIDs, stores) if err != nil { return nil, err } if rpcCtxs == nil { - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) + logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD retry because rcpCtx is nil", zap.Int("retryNum", retryNum)) err := bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) if err != nil { return nil, errors.Trace(err) @@ -1236,6 +1271,7 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, if len(rpcCtxs) != len(tasks) { return nil, errors.Errorf("length should be equal, len(rpcCtxs): %d, len(tasks): %d", len(rpcCtxs), len(tasks)) } + copTaskNum = len(tasks) taskMap := make(map[string]*batchCopTask) for i, rpcCtx := range rpcCtxs { regionInfo := RegionInfo{ @@ -1259,10 +1295,31 @@ func buildBatchCopTasksConsistentHashForPD(bo *backoff.Backoffer, res = append(res, batchTask) } } - logutil.BgLogger().Info("buildBatchCopTasksConsistentHash done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + logutil.BgLogger().Info("buildBatchCopTasksConsistentHashForPD done", zap.Any("len(tasks)", len(taskMap)), zap.Any("len(tiflash_compute)", len(stores))) + if log.GetLevel() <= zap.DebugLevel { + debugStores := make([]string, 0, len(stores)) + for _, s := range stores { + debugStores = append(debugStores, s.GetAddr()) + } + debugTaskMap := make(map[string]string, len(taskMap)) + for s, b := range taskMap { + debugTaskMap[s] = fmt.Sprintf("addr: %s; regionInfos: %v", b.storeAddr, b.regionInfos) + } + logutil.BgLogger().Debug("detailed info buildBatchCopTasksConsistentHashForPD", zap.Any("taskMap", debugTaskMap), zap.Any("allStores", debugStores)) + } break } + if elapsed := time.Since(start); elapsed > time.Millisecond*500 { + logutil.BgLogger().Warn("buildBatchCopTasksConsistentHashForPD takes too much time", + zap.Duration("total elapsed", elapsed), + zap.Int("retryNum", retryNum), + zap.Duration("splitKeyElapsed", splitKeyElapsed), + zap.Duration("getStoreElapsed", getStoreElapsed), + zap.Int("range len", rangesLen), + zap.Int("copTaskNum", copTaskNum), + zap.Int("batchCopTaskNum", len(res))) + } failpointCheckForConsistentHash(res) return res, nil }