Skip to content

Commit

Permalink
util/execdetails: refactor execdetails information of runtime collect (
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Jul 15, 2020
1 parent 9e5e2a9 commit f0e5876
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 115 deletions.
4 changes: 3 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,9 @@ func (e *HashAggExec) Close() error {
}
partialConcurrencyInfo := execdetails.NewConcurrencyInfo("PartialConcurrency", partialConcurrency)
finalConcurrencyInfo := execdetails.NewConcurrencyInfo("FinalConcurrency", finalConcurrency)
e.runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(partialConcurrencyInfo, finalConcurrencyInfo)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
16 changes: 1 addition & 15 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,6 @@ type IndexReaderExecutor struct {
func (e *IndexReaderExecutor) Close() error {
err := e.result.Close()
e.result = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
e.ctx.StoreQueryFeedback(e.feedback)
return err
}
Expand Down Expand Up @@ -501,20 +497,14 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
e.idxWorkerWg.Add(1)
go func() {
ctx1, cancel := context.WithCancel(ctx)
count, err := worker.fetchHandles(ctx1, result)
_, err := worker.fetchHandles(ctx1, result)
if err != nil {
e.feedback.Invalidate()
}
cancel()
if err := result.Close(); err != nil {
logutil.Logger(ctx).Error("close Select result failed", zap.Error(err))
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[len(e.idxPlans)-1].ExplainID().String())
copStats.SetRowNum(int64(count))
copStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.tblPlans[0].ExplainID().String())
copStats.SetRowNum(int64(count))
}
e.ctx.StoreQueryFeedback(e.feedback)
close(workCh)
close(e.resultCh)
Expand Down Expand Up @@ -586,10 +576,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.finished = nil
e.workerStarted = false
e.memTracker = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type baseExecutor struct {
maxChunkSize int
children []Executor
retFieldTypes []*types.FieldType
runtimeStats *execdetails.RuntimeStats
runtimeStats *execdetails.BasicRuntimeStats
}

const (
Expand Down Expand Up @@ -218,7 +218,8 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id fmt.S
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id != nil {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.id.String())
e.runtimeStats = &execdetails.BasicRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id.String(), e.runtimeStats)
}
}
if schema != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ func (e *IndexNestedLoopHashJoin) Close() error {
}
if e.runtimeStats != nil {
concurrency := cap(e.joinChkResourceCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
for i := range e.joinChkResourceCh {
close(e.joinChkResourceCh[i])
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,9 @@ func (e *IndexLookUpJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
4 changes: 3 additions & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ func (e *IndexLookUpMergeJoin) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
67 changes: 63 additions & 4 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ func (e *HashJoinExec) Close() error {

if e.runtimeStats != nil {
concurrency := cap(e.joiners)
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
runtimeStats := newJoinRuntimeStats(e.runtimeStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
if e.rowContainer != nil {
e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String())
runtimeStats.setHashStat(e.rowContainer.stat)
}
}
err := e.baseExecutor.Close()
Expand Down Expand Up @@ -765,14 +767,16 @@ func (e *NestedLoopApplyExec) Close() error {
e.innerRows = nil
e.memTracker = nil
if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats(e.runtimeStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
if e.canUseCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
hitRatio = float64(e.cacheHitCounter) / float64(e.cacheAccessCounter)
}
e.runtimeStats.SetCacheInfo(true, hitRatio)
runtimeStats.setCacheInfo(true, hitRatio)
} else {
e.runtimeStats.SetCacheInfo(false, 0)
runtimeStats.setCacheInfo(false, 0)
}
}
return e.outerExec.Close()
Expand Down Expand Up @@ -940,3 +944,58 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err e
}
}
}

// cacheInfo is used to save the concurrency information of the executor operator
type cacheInfo struct {
hitRatio float64
useCache bool
}

type joinRuntimeStats struct {
*execdetails.RuntimeStatsWithConcurrencyInfo

applyCache bool
cache cacheInfo
hasHashStat bool
hashStat hashStatistic
}

func newJoinRuntimeStats(basic *execdetails.BasicRuntimeStats) *joinRuntimeStats {
stats := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: basic,
},
}
return stats
}

// setCacheInfo sets the cache information. Only used for apply executor.
func (e *joinRuntimeStats) setCacheInfo(useCache bool, hitRatio float64) {
e.Lock()
e.applyCache = true
e.cache.useCache = useCache
e.cache.hitRatio = hitRatio
e.Unlock()
}

func (e *joinRuntimeStats) setHashStat(hashStat hashStatistic) {
e.Lock()
e.hasHashStat = true
e.hashStat = hashStat
e.Unlock()
}

func (e *joinRuntimeStats) String() string {
result := e.RuntimeStatsWithConcurrencyInfo.String()
if e.applyCache {
if e.cache.useCache {
result += fmt.Sprintf(", cache:ON, cacheHitRatio:%.3f%%", e.cache.hitRatio*100)
} else {
result += fmt.Sprintf(", cache:OFF")
}
}
if e.hasHashStat {
result += ", " + e.hashStat.String()
}
return result
}
10 changes: 7 additions & 3 deletions executor/projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,16 @@ func (e *ProjectionExec) Close() error {
e.drainOutputCh(w.outputCh)
}
}
if e.runtimeStats != nil {
if e.baseExecutor.runtimeStats != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{
BasicRuntimeStats: e.runtimeStats,
}
if e.isUnparallelExec() {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
} else {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}
return e.baseExecutor.Close()
}
Expand Down
4 changes: 3 additions & 1 deletion executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ func (e *ShuffleExec) Close() error {
e.executed = false

if e.runtimeStats != nil {
e.runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency))
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{BasicRuntimeStats: e.runtimeStats}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("ShuffleConcurrency", e.concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id.String(), runtimeStats)
}

err := e.dataSource.Close()
Expand Down
4 changes: 0 additions & 4 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ func (e *TableReaderExecutor) Close() error {
if e.resultHandler != nil {
err = e.resultHandler.Close()
}
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.plans[0].ExplainID().String())
copStats.SetRowNum(e.feedback.Actual())
}
e.ctx.StoreQueryFeedback(e.feedback)
return err
}
Expand Down
Loading

0 comments on commit f0e5876

Please sign in to comment.