Skip to content

Commit

Permalink
cherry pick pingcap#19948 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
lzmhhh123 authored and ti-srebot committed Sep 14, 2020
1 parent 51d365f commit 76882df
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 0 deletions.
26 changes: 26 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,32 @@ func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
c.Assert(err, IsNil)
}

<<<<<<< HEAD
=======
func (s *testSuite) TestSelectResultRuntimeStats(c *C) {
basic := &execdetails.BasicRuntimeStats{}
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
procKeys: []int64{100, 200},
backoffSleep: map[string]time.Duration{"RegionMiss": time.Millisecond},
totalProcessTime: time.Second,
totalWaitTime: time.Second,
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl()
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
expect := "time:1s, loops:1, cop_task: {num: 4, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 2s, tot_wait: 2s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 2ms}"
c.Assert(stats.String(), Equals, expect)
// Test for idempotence.
c.Assert(stats.String(), Equals, expect)
}

>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948)
func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
Expand Down
58 changes: 58 additions & 0 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ type selectResultRuntimeStats struct {
totalProcessTime time.Duration
totalWaitTime time.Duration
rpcStat tikv.RegionRequestRuntimeStats
CoprCacheHitNum int64
}

func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntimeStats, respTime time.Duration) {
Expand All @@ -338,8 +339,51 @@ func (s *selectResultRuntimeStats) mergeCopRuntimeStats(copStats *tikv.CopRuntim
s.totalProcessTime += copStats.ProcessTime
s.totalWaitTime += copStats.WaitTime
s.rpcStat.Merge(copStats.RegionRequestRuntimeStats)
if copStats.CoprCacheHit {
s.CoprCacheHitNum++
}
}

<<<<<<< HEAD
=======
func (s *selectResultRuntimeStats) Clone() execdetails.RuntimeStats {
newRs := selectResultRuntimeStats{
copRespTime: make([]time.Duration, 0, len(s.copRespTime)),
procKeys: make([]int64, 0, len(s.procKeys)),
backoffSleep: make(map[string]time.Duration, len(s.backoffSleep)),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
newRs.copRespTime = append(newRs.copRespTime, s.copRespTime...)
newRs.procKeys = append(newRs.procKeys, s.procKeys...)
for k, v := range s.backoffSleep {
newRs.backoffSleep[k] += v
}
newRs.totalProcessTime += s.totalProcessTime
newRs.totalWaitTime += s.totalWaitTime
for k, v := range s.rpcStat.Stats {
newRs.rpcStat.Stats[k] = v
}
return &newRs
}

func (s *selectResultRuntimeStats) Merge(rs execdetails.RuntimeStats) {
other, ok := rs.(*selectResultRuntimeStats)
if !ok {
return
}
s.copRespTime = append(s.copRespTime, other.copRespTime...)
s.procKeys = append(s.procKeys, other.procKeys...)

for k, v := range other.backoffSleep {
s.backoffSleep[k] += v
}
s.totalProcessTime += other.totalProcessTime
s.totalWaitTime += other.totalWaitTime
s.rpcStat.Merge(other.rpcStat)
s.CoprCacheHitNum += other.CoprCacheHitNum
}

>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948)
func (s *selectResultRuntimeStats) String() string {
buf := bytes.NewBuffer(nil)
if s.RuntimeStats != nil {
Expand Down Expand Up @@ -383,6 +427,20 @@ func (s *selectResultRuntimeStats) String() string {
}
}
}
<<<<<<< HEAD
=======
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
delete(s.rpcStat.Stats, tikvrpc.CmdCop)
buf.WriteString(", rpc_num: ")
buf.WriteString(strconv.FormatInt(copRPC.Count, 10))
buf.WriteString(", rpc_time: ")
buf.WriteString(time.Duration(copRPC.Consume).String())
}
buf.WriteString(fmt.Sprintf(", copr_cache_hit_ratio: %v",
strconv.FormatFloat(float64(s.CoprCacheHitNum)/float64(len(s.copRespTime)), 'f', 2, 64)))
buf.WriteString("}")
>>>>>>> db8df83... executor: add coprocessor cache hit ratio in explain analyze (#19948)
}
copRPC := s.rpcStat.Stats[tikvrpc.CmdCop]
if copRPC != nil && copRPC.Count > 0 {
Expand Down
3 changes: 3 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) {
execInfo := row[5].(string)
c.Assert(strings.Contains(execInfo, "time"), Equals, true)
c.Assert(strings.Contains(execInfo, "loops"), Equals, true)
if strings.Contains(row[0].(string), "Reader") || strings.Contains(row[0].(string), "IndexLookUp") {
c.Assert(strings.Contains(execInfo, "copr_cache_hit_ratio"), Equals, true)
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
data := make([]byte, len(cacheValue.Data))
copy(data, cacheValue.Data)
resp.pbResp.Data = data
resp.detail.CoprCacheHit = true
} else {
// Cache not hit or cache hit but not valid: update the cache if the response can be cached.
if cacheKey != nil && resp.pbResp.CanBeCached && resp.pbResp.CacheLastVersion > 0 {
Expand All @@ -1081,6 +1082,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *RPCCon
type CopRuntimeStats struct {
execdetails.ExecDetails
RegionRequestRuntimeStats

CoprCacheHit bool
}

func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
Expand Down

0 comments on commit 76882df

Please sign in to comment.