diff --git a/executor/aggregate.go b/executor/aggregate.go index 82e11199601e6..db7ed555d46c7 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -246,7 +246,9 @@ func (e *HashAggExec) Close() error { } partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency) finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency) - e.runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } return e.baseExecutor.Close() } diff --git a/executor/distsql.go b/executor/distsql.go index 1a1f19b47e462..755cf664df991 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -242,10 +242,6 @@ type IndexReaderExecutor struct { func (e *IndexReaderExecutor) Close() error { err := e.result.Close() e.result = nil - if e.runtimeStats != nil { - copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String()) - copStats.SetRowNum(e.feedback.Actual()) - } e.ctx.StoreQueryFeedback(e.feedback) return err } @@ -483,7 +479,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.idxWorkerWg.Add(1) go func() { ctx1, cancel := context.WithCancel(ctx) - count, err := worker.fetchHandles(ctx1, result) + _, err := worker.fetchHandles(ctx1, result) if err != nil { e.feedback.Invalidate() } @@ -491,12 +487,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k if err := result.Close(); err != nil { logutil.Logger(ctx).Error("close Select result failed", zap.Error(err)) } - if e.runtimeStats != nil { - copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String()) - copStats.SetRowNum(int64(count)) - copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String()) - copStats.SetRowNum(int64(count)) - } e.ctx.StoreQueryFeedback(e.feedback) close(workCh) close(e.resultCh) @@ -568,10 +558,6 @@ func (e *IndexLookUpExecutor) Close() error { e.finished = nil e.workerStarted = false e.memTracker = nil - if e.runtimeStats != nil { - copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String()) - copStats.SetRowNum(e.feedback.Actual()) - } return nil } diff --git a/executor/executor.go b/executor/executor.go index 1421315314ef0..fca9826489360 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -98,7 +98,7 @@ type baseExecutor struct { maxChunkSize int children []Executor retFieldTypes []*types.FieldType - runtimeStats *execdetails.RuntimeStats + runtimeStats *execdetails.BasicRuntimeStats } // globalPanicOnExceed panics when GlobalDisTracker storage usage exceeds storage quota. @@ -199,7 +199,8 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id fmt.S } if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil { if e.id != nil { - e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.id.String()) + e.runtimeStats = &execdetails.BasicRuntimeStats{} + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id.String(), e.runtimeStats) } } if schema != nil { diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index c0aba642a8c56..40310d54742a6 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -298,7 +298,9 @@ func (e *IndexNestedLoopHashJoin) Close() error { } if e.runtimeStats != nil { concurrency := cap(e.joinChkResourceCh) - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } for i := range e.joinChkResourceCh { close(e.joinChkResourceCh[i]) diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 14890aa716dc9..be31b762482c4 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -682,7 +682,9 @@ func (e *IndexLookUpJoin) Close() error { e.memTracker = nil if e.runtimeStats != nil { concurrency := cap(e.resultCh) - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } return e.baseExecutor.Close() } diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index dd1d4ab3ba4b4..044a4216cdb43 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -728,7 +728,9 @@ func (e *IndexLookUpMergeJoin) Close() error { e.memTracker = nil if e.runtimeStats != nil { concurrency := cap(e.resultCh) - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } return e.baseExecutor.Close() } diff --git a/executor/join.go b/executor/join.go index 03f357cc16523..8950ffce629fd 100644 --- a/executor/join.go +++ b/executor/join.go @@ -139,9 +139,11 @@ func (e *HashJoinExec) Close() error { if e.runtimeStats != nil { concurrency := cap(e.joiners) - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + runtimeStats := newJoinRuntimeStats(e.runtimeStats) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) if e.rowContainer != nil { - e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String()) + runtimeStats.setHashStat(e.rowContainer.stat) } } err := e.baseExecutor.Close() @@ -880,3 +882,58 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err e } } } + +// cacheInfo is used to save the concurrency information of the executor operator +type cacheInfo struct { + hitRatio float64 + useCache bool +} + +type joinRuntimeStats struct { + *execdetails.RuntimeStatsWithConcurrencyInfo + + applyCache bool + cache cacheInfo + hasHashStat bool + hashStat hashStatistic +} + +func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats { + stats := &joinRuntimeStats{ + RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{ + BasicRuntimeStats: basic, + }, + } + return stats +} + +// setCacheInfo sets the cache information. Only used for apply executor. +func (e *joinRuntimeStats) setCacheInfo(useCache bool, hitRatio float64) { + e.Lock() + e.applyCache = true + e.cache.useCache = useCache + e.cache.hitRatio = hitRatio + e.Unlock() +} + +func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) { + e.Lock() + e.hasHashStat = true + e.hashStat = hashStat + e.Unlock() +} + +func (e *joinRuntimeStats) String() string { + result := e.RuntimeStatsWithConcurrencyInfo.String() + if e.applyCache { + if e.cache.useCache { + result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100) + } else { + result += fmt.Sprintf(", cache:OFF") + } + } + if e.hasHashStat { + result += ", " + e.hashStat.String() + } + return result +} diff --git a/executor/projection.go b/executor/projection.go index 920d82f8d2e52..001cf7806e9cd 100644 --- a/executor/projection.go +++ b/executor/projection.go @@ -307,12 +307,16 @@ func (e *ProjectionExec) Close() error { e.drainOutputCh(w.outputCh) } } - if e.runtimeStats != nil { + if e.baseExecutor.runtimeStats != nil { + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{ + BasicRuntimeStats: e.runtimeStats, + } if e.isUnparallelExec() { - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0)) } else { - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers))) + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers))) } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } return e.baseExecutor.Close() } diff --git a/executor/shuffle.go b/executor/shuffle.go index c8f93bafb4bf7..a42dbd9e3b225 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -145,7 +145,9 @@ func (e *ShuffleExec) Close() error { e.executed = false if e.runtimeStats != nil { - e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency)) + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats) } err := e.dataSource.Close() diff --git a/executor/table_reader.go b/executor/table_reader.go index 47555653fa294..02890fea5c510 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -182,10 +182,6 @@ func (e *TableReaderExecutor) Close() error { if e.resultHandler != nil { err = e.resultHandler.Close() } - if e.runtimeStats != nil { - copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String()) - copStats.SetRowNum(e.feedback.Actual()) - } e.ctx.StoreQueryFeedback(e.feedback) return err } diff --git a/util/execdetails/execdetails.go b/util/execdetails/execdetails.go index c2a516e3dbcec..ddedcfaf8fef3 100644 --- a/util/execdetails/execdetails.go +++ b/util/execdetails/execdetails.go @@ -260,7 +260,7 @@ type CopRuntimeStats struct { // have many region leaders, several coprocessor tasks can be sent to the // same tikv-server instance. We have to use a list to maintain all tasks // executed on each instance. - stats map[string][]*RuntimeStats + stats map[string][]*BasicRuntimeStats } // RecordOneCopTask records a specific cop tasks's execution detail. @@ -268,7 +268,7 @@ func (crs *CopRuntimeStats) RecordOneCopTask(address string, summary *tipb.Execu crs.Lock() defer crs.Unlock() crs.stats[address] = append(crs.stats[address], - &RuntimeStats{loop: int32(*summary.NumIterations), + &BasicRuntimeStats{loop: int32(*summary.NumIterations), consume: int64(*summary.TimeProcessedNs), rows: int64(*summary.NumProducedRows)}) } @@ -352,56 +352,72 @@ func (rrs *ReaderRuntimeStats) String() string { return fmt.Sprintf("rpc num: %v, rpc max:%v, min:%v, avg:%v, p80:%v, p95:%v, proc keys max:%v, p95:%v", size, vMax, vMin, vAvg, vP80, vP95, keyMax, keyP95) } -// RuntimeStatsColl collects executors's execution info. -type RuntimeStatsColl struct { - mu sync.Mutex - rootStats map[string]*RuntimeStats - copStats map[string]*CopRuntimeStats - readerStats map[string]*ReaderRuntimeStats +// RuntimeStats is used to express the executor runtime information. +type RuntimeStats interface { + GetActRows() int64 + String() string } -// ConcurrencyInfo is used to save the concurrency information of the executor operator -type ConcurrencyInfo struct { - concurrencyName string - concurrencyNum int -} - -// NewConcurrencyInfo creates new executor's concurrencyInfo. -func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { - return &ConcurrencyInfo{name, num} -} - -// RuntimeStats collects one executor's execution info. -type RuntimeStats struct { +// BasicRuntimeStats is the basic runtime stats. +type BasicRuntimeStats struct { // executor's Next() called times. loop int32 // executor consume time. consume int64 // executor return row count. rows int64 +} - // protect concurrency - mu sync.Mutex - // executor concurrency information - concurrency []*ConcurrencyInfo +// GetActRows implements the RuntimeStats interface. +func (e *BasicRuntimeStats) GetActRows() int64 { + return e.rows +} + +// Record records executor's execution. +func (e *BasicRuntimeStats) Record(d time.Duration, rowNum int) { + atomic.AddInt32(&e.loop, 1) + atomic.AddInt64(&e.consume, int64(d)) + atomic.AddInt64(&e.rows, int64(rowNum)) +} - // additional information for executors - additionalInfo string +// SetRowNum sets the row num. +func (e *BasicRuntimeStats) SetRowNum(rowNum int64) { + atomic.StoreInt64(&e.rows, rowNum) +} + +// String implements the RuntimeStats interface. +func (e *BasicRuntimeStats) String() string { + return fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop) +} + +// RuntimeStatsColl collects executors's execution info. +type RuntimeStatsColl struct { + mu sync.Mutex + rootStats map[string]RuntimeStats + copStats map[string]*CopRuntimeStats + readerStats map[string]*ReaderRuntimeStats } // NewRuntimeStatsColl creates new executor collector. func NewRuntimeStatsColl() *RuntimeStatsColl { - return &RuntimeStatsColl{rootStats: make(map[string]*RuntimeStats), + return &RuntimeStatsColl{rootStats: make(map[string]RuntimeStats), copStats: make(map[string]*CopRuntimeStats), readerStats: make(map[string]*ReaderRuntimeStats)} } +// RegisterStats register execStat for a executor. +func (e *RuntimeStatsColl) RegisterStats(planID string, info RuntimeStats) { + e.mu.Lock() + e.rootStats[planID] = info + e.mu.Unlock() +} + // GetRootStats gets execStat for a executor. -func (e *RuntimeStatsColl) GetRootStats(planID string) *RuntimeStats { +func (e *RuntimeStatsColl) GetRootStats(planID string) RuntimeStats { e.mu.Lock() defer e.mu.Unlock() runtimeStats, exists := e.rootStats[planID] if !exists { - runtimeStats = &RuntimeStats{} + runtimeStats = &BasicRuntimeStats{} e.rootStats[planID] = runtimeStats } return runtimeStats @@ -413,7 +429,7 @@ func (e *RuntimeStatsColl) GetCopStats(planID string) *CopRuntimeStats { defer e.mu.Unlock() copStats, ok := e.copStats[planID] if !ok { - copStats = &CopRuntimeStats{stats: make(map[string][]*RuntimeStats)} + copStats = &CopRuntimeStats{stats: make(map[string][]*BasicRuntimeStats)} e.copStats[planID] = copStats } return copStats @@ -459,44 +475,44 @@ func (e *RuntimeStatsColl) GetReaderStats(planID string) *ReaderRuntimeStats { return stats } -// Record records executor's execution. -func (e *RuntimeStats) Record(d time.Duration, rowNum int) { - atomic.AddInt32(&e.loop, 1) - atomic.AddInt64(&e.consume, int64(d)) - atomic.AddInt64(&e.rows, int64(rowNum)) +// ConcurrencyInfo is used to save the concurrency information of the executor operator +type ConcurrencyInfo struct { + concurrencyName string + concurrencyNum int } -// SetRowNum sets the row num. -func (e *RuntimeStats) SetRowNum(rowNum int64) { - atomic.StoreInt64(&e.rows, rowNum) +// NewConcurrencyInfo creates new executor's concurrencyInfo. +func NewConcurrencyInfo(name string, num int) *ConcurrencyInfo { + return &ConcurrencyInfo{name, num} +} + +// RuntimeStatsWithConcurrencyInfo is the BasicRuntimeStats with ConcurrencyInfo. +type RuntimeStatsWithConcurrencyInfo struct { + *BasicRuntimeStats + + // protect concurrency + sync.Mutex + // executor concurrency information + concurrency []*ConcurrencyInfo } // SetConcurrencyInfo sets the concurrency informations. // We must clear the concurrencyInfo first when we call the SetConcurrencyInfo. // When the num <= 0, it means the exector operator is not executed parallel. -func (e *RuntimeStats) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { - e.mu.Lock() - defer e.mu.Unlock() +func (e *RuntimeStatsWithConcurrencyInfo) SetConcurrencyInfo(infos ...*ConcurrencyInfo) { + e.Lock() + defer e.Unlock() e.concurrency = e.concurrency[:0] for _, info := range infos { e.concurrency = append(e.concurrency, info) } } -// SetAdditionalInfo sets the additional information. -func (e *RuntimeStats) SetAdditionalInfo(info string) { - e.mu.Lock() - e.additionalInfo = info - e.mu.Unlock() -} - -// GetActRows return rows of CopRuntimeStats. -func (e *RuntimeStats) GetActRows() int64 { - return e.rows -} - -func (e *RuntimeStats) String() string { - result := fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop) +func (e *RuntimeStatsWithConcurrencyInfo) String() string { + var result string + if e.BasicRuntimeStats != nil { + result = fmt.Sprintf("time:%v, loops:%d", time.Duration(e.consume), e.loop) + } if len(e.concurrency) > 0 { for _, concurrency := range e.concurrency { if concurrency.concurrencyNum > 0 { @@ -506,8 +522,5 @@ func (e *RuntimeStats) String() string { } } } - if len(e.additionalInfo) > 0 { - result += ", " + e.additionalInfo - } return result }