Skip to content

Commit

Permalink
*: add max/avg cop response time for TableReader, IndexReader and Ind…
Browse files Browse the repository at this point in the history
…exLookupReader. (#12003)
  • Loading branch information
lzmhhh123 authored and sre-bot committed Sep 11, 2019
1 parent 57da569 commit 39e9c9f
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 27 deletions.
3 changes: 2 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb)
if err == nil {
if selectResult, ok := sr.(*selectResult); ok {
selectResult.copPlanIDs = copPlanIDs
selectResult.rootPlanID = rootPlanID
}
}
return sr, err
Expand Down
5 changes: 4 additions & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
idx := i
planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx]))
}
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs)
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0"))
}

c.Assert(err, IsNil)
Expand Down Expand Up @@ -404,6 +404,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

// RespTime implements kv.ResultSubset interface.
func (r *mockResultSubset) RespTime() time.Duration { return 0 }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
6 changes: 4 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type selectResult struct {
// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []fmt.Stringer
rootPlanID fmt.Stringer

memTracker *memory.Tracker
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (r *selectResult) getSelectResp() error {
for _, warning := range r.selectResp.Warnings {
sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress)
r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress, re.result.RespTime())
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
Expand All @@ -202,7 +203,7 @@ func (r *selectResult) getSelectResp() error {
}
}

func (r *selectResult) updateCopRuntimeStats(callee string) {
func (r *selectResult) updateCopRuntimeStats(callee string, respTime time.Duration) {
if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
Expand All @@ -214,6 +215,7 @@ func (r *selectResult) updateCopRuntimeStats(callee string) {
return
}

r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
Expand Down
7 changes: 4 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext)
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.updateCopRuntimeStats("a")
sr.rootPlanID = copPlan{}
sr.updateCopRuntimeStats("a", 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
Expand All @@ -38,13 +39,13 @@ func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats("callee")
sr.updateCopRuntimeStats("callee", 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse)

sr.copPlanIDs = []fmt.Stringer{copPlan{}}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats("callee")
sr.updateCopRuntimeStats("callee", 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1, rows:1")
}

Expand Down
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2106,7 +2106,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
e.resultHandler = &tableResultHandler{}
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
Expand Down Expand Up @@ -452,7 +452,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
tps = e.idxColTps
}
// Since the first read only need handle information. So its returned col is only 1.
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans))
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), e.id)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor_test
import (
"context"
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -1065,9 +1066,9 @@ func (s *testSuite2) TestHashJoin(c *C) {
c.Assert(len(row), Equals, 7)
outerExecInfo := row[1][4].(string)
// FIXME: revert this result to 1 after TableReaderExecutor can handle initChunkSize.
c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "5")
c.Assert(outerExecInfo[strings.Index(outerExecInfo, "rows")+5:strings.Index(outerExecInfo, "rows")+6], Equals, "5")
innerExecInfo := row[4][4].(string)
c.Assert(innerExecInfo[len(innerExecInfo)-1:], Equals, "0")
c.Assert(innerExecInfo[strings.Index(innerExecInfo, "rows")+5:strings.Index(innerExecInfo, "rows")+6], Equals, "0")
}

func (s *testSuite2) TestJoinDifferentDecimals(c *C) {
Expand Down
6 changes: 3 additions & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type selectResultHook struct {
}

func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (distsql.SelectResult, error) {
if sr.selectResultFunc == nil {
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs, rootPlanID)
}
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
return nil, err
}
e.kvRanges = append(e.kvRanges, kvReq.KeyRanges...)
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"context"
"time"

"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -255,6 +256,8 @@ type ResultSubset interface {
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
// RespTime returns the response time for the request.
RespTime() time.Duration
}

// Response represents the response returned from KV layer.
Expand Down
14 changes: 11 additions & 3 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,13 +636,21 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st
runtimeStatsColl := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl
// There maybe some mock information for cop task to let runtimeStatsColl.Exists(p.ExplainID()) is true.
// So check copTaskExecDetail first and print the real cop task information if it's not empty.
var analyzeInfo string
if runtimeStatsColl.ExistsCopStats(explainID) {
row = append(row, runtimeStatsColl.GetCopStats(explainID).String())
analyzeInfo = runtimeStatsColl.GetCopStats(explainID).String()
} else if runtimeStatsColl.ExistsRootStats(explainID) {
row = append(row, runtimeStatsColl.GetRootStats(explainID).String())
analyzeInfo = runtimeStatsColl.GetRootStats(explainID).String()
} else {
row = append(row, "time:0ns, loops:0, rows:0")
analyzeInfo = "time:0ns, loops:0, rows:0"
}
switch p.(type) {
case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader:
if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 {
analyzeInfo += ", " + s.String()
}
}
row = append(row, analyzeInfo)

tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
Expand Down
16 changes: 11 additions & 5 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ type copResponse struct {
startKey kv.Key
err error
respSize int64
respTime time.Duration
}

const (
Expand Down Expand Up @@ -429,6 +430,10 @@ func (rs *copResponse) MemSize() int64 {
return rs.respSize
}

func (rs *copResponse) RespTime() time.Duration {
return rs.respTime
}

const minLogCopTaskTime = 300 * time.Millisecond

// run is a worker function that get a copTask from channel, handle it and
Expand Down Expand Up @@ -663,11 +668,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds())

if task.cmdType == tikvrpc.CmdCopStream {
return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch)
return worker.handleCopStreamResult(bo, rpcCtx, resp.Resp.(*tikvrpc.CopStreamResponse), task, ch, costTime)
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil)
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Resp.(*coprocessor.Response)}, task, ch, nil, costTime)
}

const (
Expand Down Expand Up @@ -726,7 +731,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -736,7 +741,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange)
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange, costTime)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand Down Expand Up @@ -766,7 +771,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -810,6 +815,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
}
resp.respTime = costTime
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
Expand Down
62 changes: 58 additions & 4 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,47 @@ func (crs *CopRuntimeStats) String() string {
procTimes[n-1], procTimes[0], procTimes[n*4/5], procTimes[n*19/20], totalRows, totalIters, totalTasks)
}

// ReaderRuntimeStats collects stats for TableReader, IndexReader and IndexLookupReader
type ReaderRuntimeStats struct {
sync.Mutex

copRespTime []time.Duration
}

// recordOneCopTask record once cop response time to update maxcopRespTime
func (rrs *ReaderRuntimeStats) recordOneCopTask(t time.Duration) {
rrs.Lock()
defer rrs.Unlock()
rrs.copRespTime = append(rrs.copRespTime, t)
}

func (rrs *ReaderRuntimeStats) String() string {
size := len(rrs.copRespTime)
if size == 0 {
return ""
}
if size == 1 {
return fmt.Sprintf("rpc time:%v", rrs.copRespTime[0])
}
sort.Slice(rrs.copRespTime, func(i, j int) bool {
return rrs.copRespTime[i] < rrs.copRespTime[j]
})
vMax, vMin := rrs.copRespTime[size-1], rrs.copRespTime[0]
vP80, vP95 := rrs.copRespTime[size*4/5], rrs.copRespTime[size*19/20]
sum := 0.0
for _, t := range rrs.copRespTime {
sum += float64(t)
}
vAvg := time.Duration(sum / float64(size))
return fmt.Sprintf("rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v", vMax, vMin, vAvg, vP80, vP95)
}

// RuntimeStatsColl collects executors's execution info.
type RuntimeStatsColl struct {
mu sync.Mutex
rootStats map[string]*RuntimeStats
copStats map[string]*CopRuntimeStats
mu sync.Mutex
rootStats map[string]*RuntimeStats
copStats map[string]*CopRuntimeStats
readerStats map[string]*ReaderRuntimeStats
}

// RuntimeStats collects one executor's execution info.
Expand All @@ -273,7 +309,7 @@ type RuntimeStats struct {
// NewRuntimeStatsColl creates new executor collector.
func NewRuntimeStatsColl() *RuntimeStatsColl {
return &RuntimeStatsColl{rootStats: make(map[string]*RuntimeStats),
copStats: make(map[string]*CopRuntimeStats)}
copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)}
}

// GetRootStats gets execStat for a executor.
Expand Down Expand Up @@ -306,6 +342,12 @@ func (e *RuntimeStatsColl) RecordOneCopTask(planID, address string, summary *tip
copStats.RecordOneCopTask(address, summary)
}

// RecordOneReaderStats records a specific stats for TableReader, IndexReader and IndexLookupReader.
func (e *RuntimeStatsColl) RecordOneReaderStats(planID string, copRespTime time.Duration) {
readerStats := e.GetReaderStats(planID)
readerStats.recordOneCopTask(copRespTime)
}

// ExistsRootStats checks if the planID exists in the rootStats collection.
func (e *RuntimeStatsColl) ExistsRootStats(planID string) bool {
e.mu.Lock()
Expand All @@ -322,6 +364,18 @@ func (e *RuntimeStatsColl) ExistsCopStats(planID string) bool {
return exists
}

// GetReaderStats gets the ReaderRuntimeStats specified by planID.
func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats {
e.mu.Lock()
defer e.mu.Unlock()
stats, exists := e.readerStats[planID]
if !exists {
stats = &ReaderRuntimeStats{copRespTime: make([]time.Duration, 0, 20)}
e.readerStats[planID] = stats
}
return stats
}

// Record records executor's execution.
func (e *RuntimeStats) Record(d time.Duration, rowNum int) {
atomic.AddInt32(&e.loop, 1)
Expand Down

0 comments on commit 39e9c9f

Please sign in to comment.