Skip to content

Commit

Permalink
*: add max/avg cop response time for TableReader, IndexReader and Ind…
Browse files Browse the repository at this point in the history
…exLookupReader. (pingcap#12003)
  • Loading branch information
lzmhhh123 committed Jan 17, 2020
1 parent 49b9bb0 commit c32ba61
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 37 deletions.
8 changes: 0 additions & 8 deletions cmd/explaintest/r/tpch.result
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,7 @@ Sort_6 2.94 root tpch.lineitem.l_returnflag:asc, tpch.lineitem.l_linestatus:asc
└─Projection_8 2.94 root tpch.lineitem.l_returnflag, tpch.lineitem.l_linestatus, 3_col_0, 3_col_1, 3_col_2, 3_col_3, 3_col_4, 3_col_5, 3_col_6, 3_col_7
└─HashAgg_14 2.94 root group by:col_13, col_14, funcs:sum(col_0), sum(col_1), sum(col_2), sum(col_3), avg(col_4, col_5), avg(col_6, col_7), avg(col_8, col_9), count(col_10), firstrow(col_11), firstrow(col_12)
└─TableReader_15 2.94 root data:HashAgg_9
<<<<<<< HEAD
└─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), avg(tpch.lineitem.l_quantity), avg(tpch.lineitem.l_extendedprice), avg(tpch.lineitem.l_discount), count(1), firstrow(tpch.lineitem.l_returnflag), firstrow(tpch.lineitem.l_linestatus)
=======
└─HashAgg_9 2.94 cop group by:tpch.lineitem.l_linestatus, tpch.lineitem.l_returnflag, funcs:sum(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_extendedprice), sum(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount))), sum(mul(mul(tpch.lineitem.l_extendedprice, minus(1, tpch.lineitem.l_discount)), plus(1, tpch.lineitem.l_tax))), count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity), count(tpch.lineitem.l_extendedprice), sum(tpch.lineitem.l_extendedprice), count(tpch.lineitem.l_discount), sum(tpch.lineitem.l_discount), count(1)
>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926)
└─Selection_13 293795345.00 cop le(tpch.lineitem.l_shipdate, 1998-08-15)
└─TableScan_12 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
/*
Expand Down Expand Up @@ -986,11 +982,7 @@ Projection_16 1.00 root div(11_col_0, 7.0)
│ └─TableScan_31 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
└─HashAgg_40 9943040.00 root group by:col_3, funcs:avg(col_0, col_1), firstrow(col_2)
└─TableReader_41 9943040.00 root data:HashAgg_36
<<<<<<< HEAD
└─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:avg(tpch.lineitem.l_quantity), firstrow(tpch.lineitem.l_partkey)
=======
└─HashAgg_36 9943040.00 cop group by:tpch.lineitem.l_partkey, funcs:count(tpch.lineitem.l_quantity), sum(tpch.lineitem.l_quantity)
>>>>>>> b239f2f04... planner: split avg to count and sum for TableReader cop task (#11926)
└─TableScan_39 300005811.00 cop table:lineitem, range:[-inf,+inf], keep order:false
/*
Q18 Large Volume Customer Query
Expand Down
12 changes: 6 additions & 6 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes, fb)
if err != nil {
return sr, err
}
if selectResult, ok := sr.(*selectResult); ok {
selectResult.copPlanIDs = copPlanIDs
if err == nil {
if selectResult, ok := sr.(*selectResult); ok {
selectResult.copPlanIDs = copPlanIDs
selectResult.rootPlanID = rootPlanID
}
}
return sr, err
}
Expand Down
5 changes: 4 additions & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
idx := i
planIDFuncs = append(planIDFuncs, stringutil.StringerStr(planIDs[idx]))
}
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs)
response, err = SelectWithRuntimeStats(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false), planIDFuncs, stringutil.StringerStr("root_0"))
}

c.Assert(err, IsNil)
Expand Down Expand Up @@ -372,6 +372,9 @@ func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
// MemSize implements kv.ResultSubset interface.
func (r *mockResultSubset) MemSize() int64 { return int64(cap(r.data)) }

// RespTime implements kv.ResultSubset interface.
func (r *mockResultSubset) RespTime() time.Duration { return 0 }

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
6 changes: 4 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type selectResult struct {
// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
copPlanIDs []fmt.Stringer
rootPlanID fmt.Stringer

memTracker *memory.Tracker
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func (r *selectResult) getSelectResp() error {
for _, warning := range r.selectResp.Warnings {
sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress)
r.updateCopRuntimeStats(re.result.GetExecDetails().CalleeAddress, re.result.RespTime())
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails(), nil)
Expand All @@ -202,7 +203,7 @@ func (r *selectResult) getSelectResp() error {
}
}

func (r *selectResult) updateCopRuntimeStats(callee string) {
func (r *selectResult) updateCopRuntimeStats(callee string, respTime time.Duration) {
if r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
return
}
Expand All @@ -214,6 +215,7 @@ func (r *selectResult) updateCopRuntimeStats(callee string) {
return
}

r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RecordOneReaderStats(r.rootPlanID.String(), respTime)
for i, detail := range r.selectResp.GetExecutionSummaries() {
if detail != nil && detail.TimeProcessedNs != nil &&
detail.NumProducedRows != nil && detail.NumIterations != nil {
Expand Down
56 changes: 56 additions & 0 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package distsql

import (
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
)

func (s *testSuite) TestUpdateCopRuntimeStats(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = new(stmtctx.StatementContext)
sr := selectResult{ctx: ctx}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, IsNil)
sr.rootPlanID = copPlan{}
sr.updateCopRuntimeStats("a", 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
t := uint64(1)
sr.selectResp = &tipb.SelectResponse{
ExecutionSummaries: []*tipb.ExecutorExecutionSummary{
{TimeProcessedNs: &t, NumProducedRows: &t, NumIterations: &t},
},
}
c.Assert(len(sr.selectResp.GetExecutionSummaries()) != len(sr.copPlanIDs), IsTrue)
sr.updateCopRuntimeStats("callee", 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats("callee"), IsFalse)

sr.copPlanIDs = []fmt.Stringer{copPlan{}}
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl, NotNil)
c.Assert(len(sr.selectResp.GetExecutionSummaries()), Equals, len(sr.copPlanIDs))
sr.updateCopRuntimeStats("callee", 0)
c.Assert(ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetCopStats("callee").String(), Equals, "time:1ns, loops:1, rows:1")
}

type copPlan struct{}

func (p copPlan) String() string {
return "callee"
}
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2148,7 +2148,7 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return nil, err
}
e.resultHandler = &tableResultHandler{}
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := builder.SelectResult(ctx, builder.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.feedback.Invalidate()
return err
}
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
e.result, err = e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
e.feedback.Invalidate()
return err
Expand Down Expand Up @@ -487,7 +487,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
tps = e.idxColTps
}
// Since the first read only need handle information. So its returned col is only 1.
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans))
result, err := distsql.SelectWithRuntimeStats(ctx, e.ctx, kvReq, tps, e.feedback, getPhysicalPlanIDs(e.idxPlans), e.id)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions executor/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor_test
import (
"context"
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -983,9 +984,9 @@ func (s *testSuite2) TestHashJoin(c *C) {
c.Assert(len(row), Equals, 7)
outerExecInfo := row[1][4].(string)
// FIXME: revert this result to 1 after TableReaderExecutor can handle initChunkSize.
c.Assert(outerExecInfo[len(outerExecInfo)-1:], Equals, "5")
c.Assert(outerExecInfo[strings.Index(outerExecInfo, "rows")+5:strings.Index(outerExecInfo, "rows")+6], Equals, "5")
innerExecInfo := row[4][4].(string)
c.Assert(innerExecInfo[len(innerExecInfo)-1:], Equals, "0")
c.Assert(innerExecInfo[strings.Index(innerExecInfo, "rows")+5:strings.Index(innerExecInfo, "rows")+6], Equals, "0")
}

func (s *testSuite2) TestJoinDifferentDecimals(c *C) {
Expand Down
6 changes: 3 additions & 3 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ type selectResultHook struct {
}

func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer) (distsql.SelectResult, error) {
fieldTypes []*types.FieldType, fb *statistics.QueryFeedback, copPlanIDs []fmt.Stringer, rootPlanID fmt.Stringer) (distsql.SelectResult, error) {
if sr.selectResultFunc == nil {
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs, rootPlanID)
}
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, fb, copPlanIDs)
}
Expand Down Expand Up @@ -178,7 +178,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, err
}
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans))
result, err := e.SelectResult(ctx, e.ctx, kvReq, retTypes(e), e.feedback, getPhysicalPlanIDs(e.plans), e.id)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ type ResultSubset interface {
GetExecDetails() *execdetails.ExecDetails
// MemSize returns how many bytes of memory this result use for tracing memory usage.
MemSize() int64
// RespTime returns the response time for the request.
RespTime() time.Duration
}

// Response represents the response returned from KV layer.
Expand Down
14 changes: 11 additions & 3 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,13 +630,21 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st
runtimeStatsColl := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl
// There maybe some mock information for cop task to let runtimeStatsColl.Exists(p.ExplainID()) is true.
// So check copTaskExecDetail first and print the real cop task information if it's not empty.
var analyzeInfo string
if runtimeStatsColl.ExistsCopStats(explainID) {
row = append(row, runtimeStatsColl.GetCopStats(explainID).String())
analyzeInfo = runtimeStatsColl.GetCopStats(explainID).String()
} else if runtimeStatsColl.ExistsRootStats(explainID) {
row = append(row, runtimeStatsColl.GetRootStats(explainID).String())
analyzeInfo = runtimeStatsColl.GetRootStats(explainID).String()
} else {
row = append(row, "time:0ns, loops:0, rows:0")
analyzeInfo = "time:0ns, loops:0, rows:0"
}
switch p.(type) {
case *PhysicalTableReader, *PhysicalIndexReader, *PhysicalIndexLookUpReader:
if s := runtimeStatsColl.GetReaderStats(explainID); s != nil && len(s.String()) > 0 {
analyzeInfo += ", " + s.String()
}
}
row = append(row, analyzeInfo)

tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
Expand Down
1 change: 1 addition & 0 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"

"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down
16 changes: 11 additions & 5 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ type copResponse struct {
startKey kv.Key
err error
respSize int64
respTime time.Duration
}

const (
Expand Down Expand Up @@ -465,6 +466,10 @@ func (rs *copResponse) MemSize() int64 {
return rs.respSize
}

func (rs *copResponse) RespTime() time.Duration {
return rs.respTime
}

const minLogCopTaskTime = 300 * time.Millisecond

// run is a worker function that get a copTask from channel, handle it and
Expand Down Expand Up @@ -706,11 +711,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
metrics.TiKVCoprocessorHistogram.Observe(costTime.Seconds())

if task.cmdType == tikvrpc.CmdCopStream {
return worker.handleCopStreamResult(bo, rpcCtx, resp.CopStream, task, ch)
return worker.handleCopStreamResult(bo, rpcCtx, resp.CopStream, task, ch, costTime)
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Cop}, task, ch, nil)
return worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp.Cop}, task, ch, nil, costTime)
}

const (
Expand Down Expand Up @@ -762,7 +767,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RPCContext, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse, costTime time.Duration) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -772,7 +777,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange)
remainedTasks, err := worker.handleCopResponse(bo, rpcCtx, &copResponse{pbResp: resp}, task, ch, lastRange, costTime)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand Down Expand Up @@ -804,7 +809,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *RP
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCContext, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange, costTime time.Duration) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -848,6 +853,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
}
resp.respTime = costTime
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.detail.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
Expand Down
Loading

0 comments on commit c32ba61

Please sign in to comment.