Skip to content

Commit

Permalink
util/execdetails: refactor execdetails information of runtime collect (
Browse files Browse the repository at this point in the history
…#18530) (#18592)

Signed-off-by: ti-srebot <[email protected]>
Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
ti-srebot authored Jul 28, 2020
1 parent 679012e commit 4669f68
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 90 deletions.
4 changes: 3 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ func (e *HashAggExec) Close() error {
}
partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency)
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
e.runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
16 changes: 1 addition & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ type IndexReaderExecutor struct {
func (e *IndexReaderExecutor) Close() error {
err := e.result.Close()
e.result = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
e.ctx.StoreQueryFeedback(e.feedback)
return err
}
Expand Down Expand Up @@ -483,20 +479,14 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
e.idxWorkerWg.Add(1)
go func() {
ctx1, cancel := context.WithCancel(ctx)
count, err := worker.fetchHandles(ctx1, result)
_, err := worker.fetchHandles(ctx1, result)
if err != nil {
e.feedback.Invalidate()
}
cancel()
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String())
copStats.SetRowNum(int64(count))
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String())
copStats.SetRowNum(int64(count))
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
close(e.resultCh)
Expand Down Expand Up @@ -568,10 +558,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.finished = nil
e.workerStarted = false
e.memTracker = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.RuntimeStats
runtimeStats *execdetails.BasicRuntimeStats
}

// globalPanicOnExceed panics when GlobalDisTracker storage usage exceeds storage quota.
Expand Down Expand Up @@ -199,7 +199,8 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id fmt.S
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.id.String())
e.runtimeStats = &execdetails.BasicRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id.String(), e.runtimeStats)
}
}
if schema != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
if e.runtimeStats != nil {
concurrency := cap(e.joinChkResourceCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
for i := range e.joinChkResourceCh {
close(e.joinChkResourceCh[i])
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
4 changes: 3 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,9 @@ func (e *IndexLookUpMergeJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
61 changes: 59 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ func (e *HashJoinExec) Close() error {

if e.runtimeStats != nil {
concurrency := cap(e.joiners)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := newJoinRuntimeStats(e.runtimeStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
if e.rowContainer != nil {
e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String())
runtimeStats.setHashStat(e.rowContainer.stat)
}
}
err := e.baseExecutor.Close()
Expand Down Expand Up @@ -880,3 +882,58 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err e
}
}
}

// cacheInfo is used to save the concurrency information of the executor operator
type cacheInfo struct {
hitRatio float64
useCache bool
}

type joinRuntimeStats struct {
*execdetails.RuntimeStatsWithConcurrencyInfo

applyCache bool
cache cacheInfo
hasHashStat bool
hashStat hashStatistic
}

func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats {
stats := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: basic,
},
}
return stats
}

// setCacheInfo sets the cache information. Only used for apply executor.
func (e *joinRuntimeStats) setCacheInfo(useCache bool, hitRatio float64) {
e.Lock()
e.applyCache = true
e.cache.useCache = useCache
e.cache.hitRatio = hitRatio
e.Unlock()
}

func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) {
e.Lock()
e.hasHashStat = true
e.hashStat = hashStat
e.Unlock()
}

func (e *joinRuntimeStats) String() string {
result := e.RuntimeStatsWithConcurrencyInfo.String()
if e.applyCache {
if e.cache.useCache {
result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100)
} else {
result += fmt.Sprintf(", cache:OFF")
}
}
if e.hasHashStat {
result += ", " + e.hashStat.String()
}
return result
}
10 changes: 7 additions & 3 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@ func (e *ProjectionExec) Close() error {
e.drainOutputCh(w.outputCh)
}
}
if e.runtimeStats != nil {
if e.baseExecutor.runtimeStats != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: e.runtimeStats,
}
if e.isUnparallelExec() {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
} else {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
4 changes: 3 additions & 1 deletion executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func (e *ShuffleExec) Close() error {
e.executed = false

if e.runtimeStats != nil {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}

err := e.dataSource.Close()
Expand Down
4 changes: 0 additions & 4 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ func (e *TableReaderExecutor) Close() error {
if e.resultHandler != nil {
err = e.resultHandler.Close()
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
e.ctx.StoreQueryFeedback(e.feedback)
return err
}
Expand Down
Loading

0 comments on commit 4669f68

Please sign in to comment.