Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add runtime information for point-get executor (#18666) #18817

Merged
merged 11 commits into from
Jul 31, 2020
4 changes: 0 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
Expand Down Expand Up @@ -98,9 +97,6 @@ type MockPhysicalPlan interface {
}

func (b *executorBuilder) build(p plannercore.Plan) Executor {
if config.GetGlobalConfig().EnableCollectExecutionInfo && b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil {
b.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}
switch v := p.(type) {
case nil:
return nil
Expand Down
9 changes: 7 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,10 +1544,11 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
DiskTracker: disk.NewTracker(stringutil.MemoizeStr(s.Text), -1),
TaskID: stmtctx.AllocateTaskID(),
}
if config.GetGlobalConfig().OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
globalConfig := config.GetGlobalConfig()
if globalConfig.OOMUseTmpStorage && GlobalDiskUsageTracker != nil {
sc.DiskTracker.AttachToGlobalTracker(GlobalDiskUsageTracker)
}
switch config.GetGlobalConfig().OOMAction {
switch globalConfig.OOMAction {
case config.OOMActionCancel:
action := &memory.PanicOnExceed{ConnID: ctx.GetSessionVars().ConnectionID}
action.SetLogHook(domain.GetDomain(ctx).ExpensiveQueryHandle().LogOnQueryExceedMemQuota)
Expand Down Expand Up @@ -1671,6 +1672,10 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
if globalConfig.EnableCollectExecutionInfo {
sc.RuntimeStatsColl = execdetails.NewRuntimeStatsColl()
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
Expand Down
38 changes: 38 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/rowcodec"
)

Expand Down Expand Up @@ -80,6 +82,8 @@ type PointGetExecutor struct {

// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -117,6 +121,9 @@ func (e *PointGetExecutor) Open(context.Context) error {

// Close implements the Executor interface.
func (e *PointGetExecutor) Close() error {
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.DelOption(kv.CollectRuntimeStats)
}
return nil
}

Expand Down Expand Up @@ -146,6 +153,15 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
Expand Down Expand Up @@ -398,3 +414,25 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
}
return nil
}

type pointGetRuntimeStats struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
}
if e.SnapshotRuntimeStats != nil {
rpcStatsStr = e.SnapshotRuntimeStats.String()
}
if rpcStatsStr == "" {
return basic
}
if basic == "" {
return rpcStatsStr
}
return basic + ", " + rpcStatsStr
}
2 changes: 2 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
ReplicaRead
// Set task ID
TaskID
// CollectRuntimeStats is used to enable collect runtime stats.
CollectRuntimeStats
)

// Priority value for transaction priority.
Expand Down
19 changes: 19 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
package core_test

import (
"bytes"
"fmt"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
Expand Down Expand Up @@ -1177,3 +1179,20 @@ func (s *testIntegrationSuite) TestIssue16935(c *C) {

tk.MustQuery("SELECT * FROM t0 LEFT JOIN v0 ON TRUE WHERE v0.c0 IS NULL;")
}

func (s *testIntegrationSerialSuite) TestExplainAnalyzePointGet(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b varchar(20))")
tk.MustExec("insert into t values (1,1)")

res := tk.MustQuery("explain analyze select * from t where a=1;")
resBuff := bytes.NewBufferString("")
for _, row := range res.Rows() {
fmt.Fprintf(resBuff, "%s\n", row)
}
explain := resBuff.String()
c.Assert(strings.Contains(explain, "Get:{num_rpc:"), IsTrue, Commentf("%s", explain))
c.Assert(strings.Contains(explain, "total_time:"), IsTrue, Commentf("%s", explain))
}
7 changes: 7 additions & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,13 +821,19 @@ type clientHelper struct {
*minCommitTSPushed
Client
resolveLite bool
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// ResolveLocks wraps the ResolveLocks function and store the resolved result.
func (ch *clientHelper) ResolveLocks(bo *Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) {
var err error
var resolvedLocks []uint64
var msBeforeTxnExpired int64
if ch.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ch.stats, tikvrpc.CmdResolveLock, time.Since(start))
}(time.Now())
}
if ch.resolveLite {
msBeforeTxnExpired, resolvedLocks, err = ch.LockResolver.resolveLocksLite(bo, callerStartTS, locks)
} else {
Expand All @@ -849,6 +855,7 @@ func (ch *clientHelper) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, regionID
if len(directStoreAddr) > 0 {
sender.storeAddr = directStoreAddr
}
sender.stats = ch.stats
req.Context.ResolvedLocks = ch.minCommitTSPushed.Get()
resp, ctx, err := sender.SendReqCtx(bo, req, regionID, timeout, sType)
return resp, ctx, sender.storeAddr, err
Expand Down
33 changes: 32 additions & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ type RegionRequestSender struct {
storeAddr string
rpcError error
failStoreIDs map[uint64]struct{}
stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
}

// RegionRequestRuntimeStats records the runtime stats of send region requests.
type RegionRequestRuntimeStats struct {
count int64
// Send region request consume time.
consume int64
}

// RegionBatchRequestSender sends BatchCop requests to TiFlash server by stream way.
Expand All @@ -78,6 +86,11 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA
if e := tikvrpc.SetContext(req, ctx.Meta, ctx.Peer); e != nil {
return nil, false, errors.Trace(e)
}
if ss.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(ss.stats, req.Type, time.Since(start))
}(time.Now())
}
resp, err = ss.client.SendRequest(bo.ctx, ctx.Addr, req, timout)
if err != nil {
ss.rpcError = err
Expand All @@ -93,6 +106,19 @@ func (ss *RegionBatchRequestSender) sendReqToAddr(bo *Backoffer, ctxs []copTaskA
return
}

func recordRegionRequestRuntimeStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats, cmd tikvrpc.CmdType, d time.Duration) {
stat, ok := stats[cmd]
if !ok {
stats[cmd] = &RegionRequestRuntimeStats{
count: 1,
consume: int64(d),
}
return
}
stat.count++
stat.consume += int64(d)
}

func (ss *RegionBatchRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err error) error {
// If it failed because the context is cancelled by ourself, don't retry.
if errors.Cause(err) == context.Canceled || status.Code(errors.Cause(err)) == codes.Canceled {
Expand Down Expand Up @@ -263,8 +289,13 @@ func (s *RegionRequestSender) sendReqToRegion(bo *Backoffer, ctx *RPCContext, re
}
defer s.releaseStoreToken(ctx.Store)
}
resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout)
if s.stats != nil {
defer func(start time.Time) {
recordRegionRequestRuntimeStats(s.stats, req.Type, time.Since(start))
}(time.Now())
}

resp, err = s.client.SendRequest(bo.ctx, ctx.Addr, req, timeout)
if err != nil {
s.rpcError = err
if e := s.onSendFail(bo, ctx, err); e != nil {
Expand Down
85 changes: 84 additions & 1 deletion store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type tikvSnapshot struct {
sync.RWMutex
cached map[string][]byte
}
stats *SnapshotRuntimeStats
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
Expand Down Expand Up @@ -140,6 +141,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string]
m[string(k)] = v
mu.Unlock()
})
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -233,6 +235,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
minCommitTSPushed: &s.minCommitTSPushed,
Client: s.store.client,
}
if s.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
}()
}

pending := batch.keys
for {
Expand Down Expand Up @@ -314,7 +322,9 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
}(time.Now())

ctx = context.WithValue(ctx, txnStartKey, s.version.Ver)
val, err := s.get(NewBackofferWithVars(ctx, getMaxBackoff, s.vars), k)
bo := NewBackofferWithVars(ctx, getMaxBackoff, s.vars)
val, err := s.get(bo, k)
s.recordBackoffInfo(bo)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -353,6 +363,12 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
Client: s.store.client,
resolveLite: true,
}
if s.stats != nil {
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
defer func() {
s.mergeRegionRequestStats(cli.stats)
}()
}

req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet,
&pb.GetRequest{
Expand Down Expand Up @@ -431,6 +447,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
s.priority = kvPriorityToCommandPri(val.(int))
case kv.TaskID:
s.taskID = val.(uint64)
case kv.CollectRuntimeStats:
s.stats = val.(*SnapshotRuntimeStats)
}
}

Expand All @@ -439,6 +457,8 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
switch opt {
case kv.ReplicaRead:
s.replicaRead = kv.ReplicaReadLeader
case kv.CollectRuntimeStats:
s.stats = nil
}
}

Expand Down Expand Up @@ -544,3 +564,66 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
logutil.BgLogger().Error("error", zap.Error(err4))
}
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
if s.stats == nil || bo.totalSleep == 0 {
return
}
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.backoffSleepMS == nil {
s.stats.backoffSleepMS = bo.backoffSleepMS
s.stats.backoffTimes = bo.backoffTimes
return
}
for k, v := range bo.backoffSleepMS {
s.stats.backoffSleepMS[k] += v
}
for k, v := range bo.backoffTimes {
s.stats.backoffTimes[k] += v
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stats.rpcStats == nil {
s.stats.rpcStats = stats
return
}
for k, v := range stats {
stat, ok := s.stats.rpcStats[k]
if !ok {
s.stats.rpcStats[k] = v
continue
}
stat.count += v.count
stat.consume += v.consume
}
}

// SnapshotRuntimeStats records the runtime stats of snapshot.
type SnapshotRuntimeStats struct {
rpcStats map[tikvrpc.CmdType]*RegionRequestRuntimeStats
backoffSleepMS map[backoffType]int
backoffTimes map[backoffType]int
}

// String implements fmt.Stringer interface.
func (rs *SnapshotRuntimeStats) String() string {
var buf bytes.Buffer
for k, v := range rs.rpcStats {
if buf.Len() > 0 {
buf.WriteByte(',')
}
buf.WriteString(fmt.Sprintf("%s:{num_rpc:%d, total_time:%s}", k.String(), v.count, time.Duration(v.consume)))
}
for k, v := range rs.backoffTimes {
if buf.Len() > 0 {
buf.WriteByte(',')
}
ms := rs.backoffSleepMS[k]
buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%d ms}", k.String(), v, ms))
}
return buf.String()
}
Loading