From 1ad073bf80cfddf8af8a2d4dc696a07ed0896740 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Wed, 24 Jul 2019 10:53:02 +0800 Subject: [PATCH] executor: show operators' memory consumption in results of `EXPLAIN ANALYZE` (#11334) --- distsql/distsql_test.go | 4 ++- distsql/request_builder.go | 8 ++---- executor/distsql.go | 19 ++++++++------ executor/explain_test.go | 49 +++++++++++++++++++++++++++++++++++ executor/index_lookup_join.go | 4 --- executor/join.go | 2 -- executor/merge_join.go | 1 - executor/sort.go | 1 - executor/table_reader.go | 11 +++++--- planner/core/cbo_test.go | 4 +-- planner/core/common_plans.go | 9 ++++++- util/memory/tracker.go | 36 ++++++++++++++----------- 12 files changed, 103 insertions(+), 45 deletions(-) diff --git a/distsql/distsql_test.go b/distsql/distsql_test.go index 470f2e78bab67..4a145717d0fce 100644 --- a/distsql/distsql_test.go +++ b/distsql/distsql_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -42,7 +43,8 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str SetDesc(false). SetKeepOrder(false). SetFromSessionVars(variable.NewSessionVars()). - SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")). + SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"), + s.sctx.GetSessionVars().MemQuotaDistSQL)). Build() c.Assert(err, IsNil) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 9e1ef6c624f05..10c91e93598d1 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -14,12 +14,10 @@ package distsql import ( - "fmt" "math" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics" @@ -44,10 +42,8 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) { } // SetMemTracker sets a memTracker for this request. -func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder { - t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL) - t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker) - builder.Request.MemTracker = t +func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBuilder { + builder.Request.MemTracker = tracker return builder } diff --git a/executor/distsql.go b/executor/distsql.go index fcfbc110fcad3..c4cf91124d696 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -19,6 +19,7 @@ import ( "math" "runtime" "sort" + "strconv" "sync" "sync/atomic" "time" @@ -220,6 +221,8 @@ type IndexReaderExecutor struct { colLens []int plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } @@ -261,8 +264,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error { return e.open(ctx, kvRanges) } -var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker") - func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error { var err error if e.corColInFilter { @@ -277,6 +278,8 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.dagPB.CollectExecutionSummaries = &collExec } + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -284,7 +287,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel). + SetMemTracker(e.memTracker). Build() if err != nil { e.feedback.Invalidate() @@ -415,6 +418,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k e.dagPB.CollectExecutionSummaries = &collExec } + tracker := memory.NewTracker(stringutil.StringerStr("IndexWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader) + tracker.AttachTo(e.memTracker) var builder distsql.RequestBuilder kvReq, err := builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). @@ -422,7 +427,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k SetKeepOrder(e.keepOrder). SetStreaming(e.indexStreaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel). + SetMemTracker(tracker). Build() if err != nil { return err @@ -471,8 +476,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k return nil } -var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker") - // startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task. func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) { lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency @@ -486,7 +489,8 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, isCheckOp: e.isCheckOp, - memTracker: memory.NewTracker(tableWorkerLabel, -1), + memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), + e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker) ctx1, cancel := context.WithCancel(ctx) @@ -531,7 +535,6 @@ func (e *IndexLookUpExecutor) Close() error { e.tblWorkerWg.Wait() e.finished = nil e.workerStarted = false - e.memTracker.Detach() e.memTracker = nil if e.runtimeStats != nil { copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String()) diff --git a/executor/explain_test.go b/executor/explain_test.go index c36179a283e52..47c634f3f454e 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -14,6 +14,8 @@ package executor_test import ( + "strings" + . "github.com/pingcap/check" "github.com/pingcap/parser/auth" plannercore "github.com/pingcap/tidb/planner/core" @@ -74,3 +76,50 @@ func (s *testSuite1) TestExplainWrite(c *C) { tk.MustExec("explain analyze insert into t select 1") tk.MustQuery("select * from t order by a").Check(testkit.Rows("1", "2")) } + +func (s *testSuite1) TestExplainAnalyzeMemory(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (v int, k int, key(k))") + tk.MustExec("insert into t values (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)") + + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v") + s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v limit 5") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_HJ(t1, t2) */ t1.k from t t1, t t2 where t1.v = t2.v+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_SMJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k+1") + s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_INLJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k and t1.v=1") + s.checkMemoryInfo(c, tk, "explain analyze select sum(k) from t group by v") + s.checkMemoryInfo(c, tk, "explain analyze select sum(v) from t group by k") + s.checkMemoryInfo(c, tk, "explain analyze select * from t") + s.checkMemoryInfo(c, tk, "explain analyze select k from t use index(k)") + s.checkMemoryInfo(c, tk, "explain analyze select * from t use index(k)") +} + +func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) { + memCol := 5 + ops := []string{"Join", "Reader", "Top", "Sort", "LookUp"} + rows := tk.MustQuery(sql).Rows() + for _, row := range rows { + strs := make([]string, len(row)) + for i, c := range row { + strs[i] = c.(string) + } + if strings.Contains(strs[2], "cop") { + continue + } + + shouldHasMem := false + for _, op := range ops { + if strings.Contains(strs[0], op) { + shouldHasMem = true + break + } + } + + if shouldHasMem { + c.Assert(strs[memCol], Not(Equals), "N/A") + } else { + c.Assert(strs[memCol], Equals, "N/A") + } + } +} diff --git a/executor/index_lookup_join.go b/executor/index_lookup_join.go index 7f90e85303fe5..5f596a69b168d 100644 --- a/executor/index_lookup_join.go +++ b/executor/index_lookup_join.go @@ -295,9 +295,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask, return nil, nil } - if e.task != nil { - e.task.memTracker.Detach() - } e.task = task return task, nil } @@ -650,7 +647,6 @@ func (e *IndexLookUpJoin) Close() error { e.cancelFunc() } e.workerWg.Wait() - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/join.go b/executor/join.go index 12cd377271ade..d6f8e9042ca4a 100644 --- a/executor/join.go +++ b/executor/join.go @@ -134,7 +134,6 @@ func (e *HashJoinExec) Close() error { e.outerChkResourceCh = nil e.joinChkResourceCh = nil } - e.memTracker.Detach() e.memTracker = nil err := e.baseExecutor.Close() @@ -633,7 +632,6 @@ type NestedLoopApplyExec struct { func (e *NestedLoopApplyExec) Close() error { e.innerRows = nil - e.memTracker.Detach() e.memTracker = nil return e.outerExec.Close() } diff --git a/executor/merge_join.go b/executor/merge_join.go index 9a7a09912d440..f7415d6bf1444 100644 --- a/executor/merge_join.go +++ b/executor/merge_join.go @@ -200,7 +200,6 @@ func (t *mergeJoinInnerTable) reallocReaderResult() { // Close implements the Executor Close interface. func (e *MergeJoinExec) Close() error { - e.memTracker.Detach() e.childrenResults = nil e.memTracker = nil diff --git a/executor/sort.go b/executor/sort.go index 4209dafd91d4c..aa895015b1eec 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -54,7 +54,6 @@ type SortExec struct { // Close implements the Executor Close interface. func (e *SortExec) Close() error { - e.memTracker.Detach() e.memTracker = nil return e.children[0].Close() } diff --git a/executor/table_reader.go b/executor/table_reader.go index 3f6bea659295a..019957e678247 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -26,8 +26,8 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" - "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tipb/go-tipb" ) @@ -72,11 +72,16 @@ type TableReaderExecutor struct { corColInAccess bool plans []plannercore.PhysicalPlan + memTracker *memory.Tracker + selectResultHook // for testing } // Open initialzes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + var err error if e.corColInFilter { e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans) @@ -148,8 +153,6 @@ func (e *TableReaderExecutor) Close() error { return err } -var tableReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("TableReaderDistSQLTracker") - // buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee // to fetch all results. func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) { @@ -160,7 +163,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra SetKeepOrder(e.keepOrder). SetStreaming(e.streaming). SetFromSessionVars(e.ctx.GetSessionVars()). - SetMemTracker(e.ctx, tableReaderDistSQLTrackerLabel). + SetMemTracker(e.memTracker). Build() if err != nil { return nil, err diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index cdce14d6e8f73..222512c2419c0 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -80,7 +80,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) { rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1") c.Assert(len(rs.Rows()), Equals, 10) for _, row := range rs.Rows() { - c.Assert(len(row), Equals, 5) + c.Assert(len(row), Equals, 6) execInfo := row[4].(string) c.Assert(strings.Contains(execInfo, "time"), Equals, true) c.Assert(strings.Contains(execInfo, "loops"), Equals, true) @@ -977,7 +977,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) { c.Assert(rs.Rows(), HasLen, 10) hasIndexLookUp12 := false for _, row := range rs.Rows() { - c.Assert(row, HasLen, 5) + c.Assert(row, HasLen, 6) if strings.HasSuffix(row[0].(string), "IndexLookUp_12") { hasIndexLookUp12 = true c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0") diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 96a33507287f2..c43f03bf67b50 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -563,7 +563,7 @@ func (e *Explain) prepareSchema() error { case ast.ExplainFormatROW: retFields := []string{"id", "count", "task", "operator info"} if e.Analyze { - retFields = append(retFields, "execution info") + retFields = append(retFields, "execution info", "memory") } schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...) for _, fieldName := range retFields { @@ -643,6 +643,13 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st } else { row = append(row, "time:0ns, loops:0, rows:0") } + + tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String()) + if tracker != nil { + row = append(row, tracker.BytesToString(tracker.MaxConsumed())) + } else { + row = append(row, "N/A") + } } e.Rows = append(e.Rows, row) } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index befbab5ce3e76..3b935360f0fce 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -87,11 +87,6 @@ func (t *Tracker) AttachTo(parent *Tracker) { t.parent.Consume(t.BytesConsumed()) } -// Detach detaches this Tracker from its parent. -func (t *Tracker) Detach() { - t.parent.remove(t) -} - func (t *Tracker) remove(oldChild *Tracker) { t.mu.Lock() defer t.mu.Unlock() @@ -144,17 +139,13 @@ func (t *Tracker) Consume(bytes int64) { rootExceed = tracker } - if tracker.parent == nil { - // since we only need a total memory usage during execution, - // we only record max consumed bytes in root(statement-level) for performance. - for { - maxNow := atomic.LoadInt64(&tracker.maxConsumed) - consumed := atomic.LoadInt64(&tracker.bytesConsumed) - if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { - continue - } - break + for { + maxNow := atomic.LoadInt64(&tracker.maxConsumed) + consumed := atomic.LoadInt64(&tracker.bytesConsumed) + if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) { + continue } + break } } if rootExceed != nil { @@ -172,6 +163,21 @@ func (t *Tracker) MaxConsumed() int64 { return atomic.LoadInt64(&t.maxConsumed) } +// SearchTracker searches the specific tracker under this tracker. +func (t *Tracker) SearchTracker(label string) *Tracker { + if t.label.String() == label { + return t + } + t.mu.Lock() + defer t.mu.Unlock() + for _, child := range t.mu.children { + if result := child.SearchTracker(label); result != nil { + return result + } + } + return nil +} + // String returns the string representation of this Tracker tree. func (t *Tracker) String() string { buffer := bytes.NewBufferString("\n")