Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv: log slow coprocessor task in detail #6344

Merged
merged 12 commits into from
Apr 26, 2018
54 changes: 51 additions & 3 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -604,21 +605,22 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel),
Priority: kvPriorityToCommandPri(worker.req.Priority),
NotFillCache: worker.req.NotFillCache,
HandleTime: true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like "EnableHandleTime" is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is defined in protobuf, hard to change.

},
}
startTime := time.Now()
resp, err := sender.SendReq(bo, req, task.region, ReadTimeoutMedium)
if err != nil {
return nil, errors.Trace(err)
}
// Set task.storeAddr field so its task.String() method have the store address information.
task.storeAddr = sender.storeAddr
costTime := time.Since(startTime)
if costTime > minLogCopTaskTime {
log.Infof("[TIME_COP_TASK] %s%s %s", costTime, bo, task)
worker.logTimeCopTask(costTime, task, bo, resp)
}
metrics.TiKVCoprocessorCounter.WithLabelValues("handle_task").Inc()
metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds())
// Set task.storeAddr field so its task.String() method have the store address information.
task.storeAddr = sender.storeAddr

if task.cmdType == tikvrpc.CmdCopStream {
return worker.handleCopStreamResult(bo, resp.CopStream, task, ch)
Expand All @@ -628,6 +630,52 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
return worker.handleCopResponse(bo, resp.Cop, task, ch, nil)
}

const (
minLogBackoffTime = 100
minLogKVProcessTime = 100
minLogKVWaitTime = 200
)

func (worker *copIteratorWorker) logTimeCopTask(costTime time.Duration, task *copTask, bo *Backoffer, resp *tikvrpc.Response) {
logStr := fmt.Sprintf("[TIME_COP_TASK] resp_time:%s txn_start_ts:%d region_id:%d store_addr:%s", costTime, worker.req.StartTs, task.region.id, task.storeAddr)
if bo.totalSleep > minLogBackoffTime {
backoffTypes := strings.Replace(fmt.Sprintf("%v", bo.types), " ", ",", -1)
logStr += fmt.Sprintf(" backoff_ms:%d backoff_types:%s", bo.totalSleep, backoffTypes)
}
var detail *kvrpcpb.ExecDetails
if task.cmdType == tikvrpc.CmdCopStream {
detail = resp.CopStream.ExecDetails
} else {
detail = resp.Cop.ExecDetails
}
if detail != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are so many if in if.. Could we log.Info through defer function, and return directly if detail==nil?

Copy link
Member Author

@coocood coocood Apr 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think defer is used for release resource.
It looks weird if the normal execution code is done in defer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with Yuxing.

if detail.HandleTime != nil {
processMs := detail.HandleTime.ProcessMs
waitMs := detail.HandleTime.WaitMs
if processMs > minLogKVProcessTime {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndreMouche Does coprocessor also has a threshold to fill HandleTime.ProcessMs and HandleTime.WaitMs for slow cop requests ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coocood Maybe we just need to print HandleTime.ProcessMs and HandleTime.WaitMs once they are set, no need to use another thresholds like minLogKVProcessTime and minLogKVWaitTime

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After we set the HandleTime and ScanDetial to true, every request will return them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will we set ScanDetail to true ? @coocood

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScanDetail is not set to true now. it's set only when the duration exceeds the threshold(1s) in TiKV.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should remove these minXXXX thresholds defined in this file and do not check them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TiDB should control what to log, without depending on TiKV's implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zz-jason If we log everything, the most important ones will be obscure.

logStr += fmt.Sprintf(" kv_process_ms:%d", processMs)
if detail.ScanDetail != nil {
logStr = appendScanDetail(logStr, "write", detail.ScanDetail.Write)
logStr = appendScanDetail(logStr, "data", detail.ScanDetail.Data)
logStr = appendScanDetail(logStr, "lock", detail.ScanDetail.Lock)
}
}
if waitMs > minLogKVWaitTime {
logStr += fmt.Sprintf(" kv_wait_ms:%d", waitMs)
}
}
}
log.Info(logStr)
}

func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.ScanInfo) string {
if scanInfo != nil {
logStr += fmt.Sprintf(" scan_total_%s:%d", columnFamily, scanInfo.Total)
logStr += fmt.Sprintf(" scan_processed_%s:%d", columnFamily, scanInfo.Processed)
}
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
Expand Down