Skip to content

Commit

Permalink
This is an automated cherry-pick of #39394
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
wshwsh12 authored and ti-chi-bot committed Nov 29, 2022
1 parent 6b02a5d commit 34f9db4
Show file tree
Hide file tree
Showing 26 changed files with 337 additions and 87 deletions.
9 changes: 6 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ func TestSelectWithRuntimeStats(t *testing.T) {
}

func TestSelectResultRuntimeStats(t *testing.T) {
basic := &execdetails.BasicRuntimeStats{}
stmtStats := execdetails.NewRuntimeStatsColl(nil)
basic := stmtStats.GetBasicRuntimeStats(1)
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
Expand All @@ -119,8 +120,6 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}

s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl(nil)
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
Expand All @@ -135,7 +134,11 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
<<<<<<< HEAD
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
=======
expect = "time:0s, loops:0, cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
5 changes: 3 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,10 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}

if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

Expand Down Expand Up @@ -461,6 +459,9 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}
return r.resp.Close()
}

Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestUpdateCopRuntimeStats(t *testing.T) {
require.Nil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)

sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "a"}}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil)
i := uint64(1)
Expand All @@ -47,13 +47,13 @@ func TestUpdateCopRuntimeStats(t *testing.T) {

require.NotEqual(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.False(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234))

sr.copPlanIDs = []int{sr.rootPlanID}
require.NotNil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)
require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String())
}
4 changes: 3 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext,

// Close implements the Executor Close interface.
func (e *HashAggExec) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.isUnparallelExec {
var firstErr error
e.childResult = nil
Expand Down Expand Up @@ -1097,7 +1100,6 @@ func (e *HashAggExec) initRuntimeStats() {
stats.PartialStats = make([]*AggWorkerStat, 0, stats.PartialConcurrency)
stats.FinalStats = make([]*AggWorkerStat, 0, stats.FinalConcurrency)
e.stats = stats
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down
3 changes: 3 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ func MockNewCacheTableSnapShot(snapshot kv.Snapshot, memBuffer kv.MemBuffer) *ca

// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
Expand Down
30 changes: 30 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4673,6 +4673,7 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
<<<<<<< HEAD
decoder := NewRowDecoder(b.ctx, plan.Schema(), plan.TblInfo)
e := &BatchPointGetExec{
baseExecutor: newBaseExecutor(b.ctx, plan.Schema(), plan.ID()),
Expand All @@ -4691,6 +4692,35 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
singlePart: plan.SinglePart,
partTblID: plan.PartTblID,
columns: plan.Columns,
=======
if e.ctx.GetSessionVars().IsReplicaReadClosestAdaptive() {
e.snapshot.SetOption(kv.ReplicaReadAdjuster, newReplicaReadAdjuster(e.ctx, plan.GetAvgRowSize()))
}
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
}

if plan.IndexInfo != nil {
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexNames = append(sctx.IndexNames, plan.TblInfo.Name.O+":"+plan.IndexInfo.Name.O)
}

failpoint.Inject("assertBatchPointReplicaOption", func(val failpoint.Value) {
assertScope := val.(string)
if e.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && assertScope != b.readReplicaScope {
panic("batch point get replica option fail")
}
})

snapshotTS, err := b.getSnapshotTS()
if err != nil {
b.err = err
return nil
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
}
if plan.TblInfo.TableCacheStatusType == model.TableCacheStatusEnable {
e.cacheTable = b.getCacheTable(plan.TblInfo, startTS)
Expand Down
5 changes: 4 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,9 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.kvRanges = e.kvRanges[:0]
if e.dummy {
return nil
Expand Down Expand Up @@ -858,7 +861,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats)
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID)
}
}
for {
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
e.runtimeStats = &execdetails.BasicRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, e.runtimeStats)
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
if schema != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
return nil
Expand Down Expand Up @@ -286,6 +285,9 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.finished.Store(false)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.cancelFunc = nil
return nil
Expand Down Expand Up @@ -758,6 +757,9 @@ func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool {

// Close implements the Executor interface.
func (e *IndexLookUpJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
3 changes: 3 additions & 0 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *loo

// Close implements the Executor interface.
func (e *IndexLookUpMergeJoin) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.runtimeStats)
}
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
Expand Down
7 changes: 4 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ func (e *IndexMergeReaderExecutor) initRuntimeStats() {
e.stats = &IndexMergeRuntimeStat{
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -694,6 +693,9 @@ func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context

// Close implements Exec Close interface.
func (e *IndexMergeReaderExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.finished == nil {
return nil
}
Expand Down Expand Up @@ -811,8 +813,7 @@ func (w *partialIndexWorker) fetchHandles(
var basicStats *execdetails.BasicRuntimeStats
if w.stats != nil {
if w.idxID != 0 {
basicStats = &execdetails.BasicRuntimeStats{}
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.idxID, basicStats)
basicStats = w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID)
}
}
for {
Expand Down
7 changes: 7 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
<<<<<<< HEAD
=======
if e.runtimeStats != nil && e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
defer e.memTracker.ReplaceBytesUsed(0)
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
e.setMessage()
Expand Down
1 change: 0 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
SnapshotRuntimeStats: snapshotStats,
AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
Expand Down
17 changes: 15 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (e *HashJoinExec) Close() error {
if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
err := e.baseExecutor.Close()
return err
}
Expand Down Expand Up @@ -185,7 +188,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.stats = &hashJoinRuntimeStats{
concurrent: cap(e.joiners),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return nil
}
Expand Down Expand Up @@ -843,7 +845,6 @@ func (e *NestedLoopApplyExec) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.canUseCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -854,6 +855,7 @@ func (e *NestedLoopApplyExec) Close() error {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.outerExec.Close()
}
Expand Down Expand Up @@ -1111,6 +1113,17 @@ func (e *joinRuntimeStats) Tp() int {
return execdetails.TpJoinRuntimeStats
}

func (e *joinRuntimeStats) Clone() execdetails.RuntimeStats {
newJRS := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: e.RuntimeStatsWithConcurrencyInfo,
applyCache: e.applyCache,
cache: e.cache,
hasHashStat: e.hasHashStat,
hashStat: e.hashStat,
}
return newJRS
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
Expand Down
3 changes: 3 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataInfo.stats)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (e *ParallelNestedLoopApplyExec) Close() error {

if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.useCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -187,6 +186,7 @@ func (e *ParallelNestedLoopApplyExec) Close() error {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", e.concurrency))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return err
}
Expand Down
Loading

0 comments on commit 34f9db4

Please sign in to comment.