From 04aabef4c43433f83528aa716d302e80dc56f4f3 Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Thu, 26 Apr 2018 16:00:00 +0800 Subject: [PATCH] tikv: log slow coprocessor task in detail (#6344) --- store/tikv/coprocessor.go | 54 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index 59934a1d3a6dc..48f06906e1a66 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -18,6 +18,7 @@ import ( "fmt" "io" "sort" + "strings" "sync" "sync/atomic" "time" @@ -622,6 +623,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel), Priority: kvPriorityToCommandPri(worker.req.Priority), NotFillCache: worker.req.NotFillCache, + HandleTime: true, }, } startTime := time.Now() @@ -629,14 +631,14 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch if err != nil { return nil, errors.Trace(err) } + // Set task.storeAddr field so its task.String() method have the store address information. + task.storeAddr = sender.storeAddr costTime := time.Since(startTime) if costTime > minLogCopTaskTime { - log.Infof("[TIME_COP_TASK] %s%s %s", costTime, bo, task) + worker.logTimeCopTask(costTime, task, bo, resp) } metrics.TiKVCoprocessorCounter.WithLabelValues("handle_task").Inc() metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds()) - // Set task.storeAddr field so its task.String() method have the store address information. - task.storeAddr = sender.storeAddr if task.cmdType == tikvrpc.CmdCopStream { return worker.handleCopStreamResult(bo, resp.CopStream, task, ch) @@ -646,6 +648,52 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch return worker.handleCopResponse(bo, resp.Cop, task, ch, nil) } +const ( + minLogBackoffTime = 100 + minLogKVProcessTime = 100 + minLogKVWaitTime = 200 +) + +func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) { + logStr := fmt.Sprintf("[TIME_COP_TASK] resp_time:%s txn_start_ts:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.id, task.storeAddr) + if bo.totalSleep > minLogBackoffTime { + backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.types), " ", ",", -1) + logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.totalSleep, backoffTypes) + } + var detail *kvrpcpb.ExecDetails + if task.cmdType == tikvrpc.CmdCopStream { + detail = resp.CopStream.ExecDetails + } else { + detail = resp.Cop.ExecDetails + } + if detail != nil { + if detail.HandleTime != nil { + processMs := detail.HandleTime.ProcessMs + waitMs := detail.HandleTime.WaitMs + if processMs > minLogKVProcessTime { + logStr += fmt.Sprintf(" kv_process_ms:%d", processMs) + if detail.ScanDetail != nil { + logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write) + logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data) + logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock) + } + } + if waitMs > minLogKVWaitTime { + logStr += fmt.Sprintf(" kv_wait_ms:%d", waitMs) + } + } + } + log.Info(logStr) +} + +func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.ScanInfo) string { + if scanInfo != nil { + logStr += fmt.Sprintf(" scan_total_%s:%d", columnFamily, scanInfo.Total) + logStr += fmt.Sprintf(" scan_processed_%s:%d", columnFamily, scanInfo.Processed) + } + return logStr +} + func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) { defer stream.Close() var resp *coprocessor.Response