Skip to content

Commit

Permalink
executor: show operators' memory consumption in results of `EXPLAIN A…
Browse files Browse the repository at this point in the history
…NALYZE`(#11334) (#11418)

All tests passed, auto merged by Bot
  • Loading branch information
qw4990 authored and sre-bot committed Jul 25, 2019
1 parent cfcd4db commit da97861
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 45 deletions.
4 changes: 3 additions & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -42,7 +43,8 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"),
s.sctx.GetSessionVars().MemQuotaDistSQL)).
Build()
c.Assert(err, IsNil)

Expand Down
8 changes: 2 additions & 6 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
package distsql

import (
"fmt"
"math"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
Expand All @@ -44,10 +42,8 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
}

// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBuilder {
builder.Request.MemTracker = tracker
return builder
}

Expand Down
19 changes: 11 additions & 8 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math"
"runtime"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -252,6 +253,8 @@ type IndexReaderExecutor struct {
colLens []int
plans []plannercore.PhysicalPlan

memTracker *memory.Tracker

selectResultHook // for testing
}

Expand Down Expand Up @@ -301,8 +304,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
return e.open(ctx, kvRanges)
}

var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker")

func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
Expand All @@ -317,14 +318,16 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.dagPB.CollectExecutionSummaries = &collExec
}

e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel).
SetMemTracker(e.memTracker).
Build()
if err != nil {
e.feedback.Invalidate()
Expand Down Expand Up @@ -455,14 +458,16 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
e.dagPB.CollectExecutionSummaries = &collExec
}

tracker := memory.NewTracker(stringutil.StringerStr("IndexWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
tracker.AttachTo(e.memTracker)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel).
SetMemTracker(tracker).
Build()
if err != nil {
return err
Expand Down Expand Up @@ -511,8 +516,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
return nil
}

var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker")

// startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
Expand All @@ -526,7 +529,8 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
isCheckOp: e.isCheckOp,
memTracker: memory.NewTracker(tableWorkerLabel, -1),
memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }),
e.ctx.GetSessionVars().MemQuotaIndexLookupReader),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -571,7 +575,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker.Detach()
e.memTracker = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String())
Expand Down
49 changes: 49 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package executor_test

import (
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/parser/auth"
plannercore "github.com/pingcap/tidb/planner/core"
Expand Down Expand Up @@ -60,3 +62,50 @@ func (s *testSuite1) TestExplainPriviliges(c *C) {
err = tk1.ExecToErr("explain select * from v")
c.Assert(err.Error(), Equals, plannercore.ErrTableaccessDenied.GenWithStackByArgs("SELECT", "explain", "%", "v").Error())
}

func (s *testSuite1) TestExplainAnalyzeMemory(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (v int, k int, key(k))")
tk.MustExec("insert into t values (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)")

s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v")
s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v limit 5")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_HJ(t1, t2) */ t1.k from t t1, t t2 where t1.v = t2.v+1")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_SMJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k+1")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_INLJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k and t1.v=1")
s.checkMemoryInfo(c, tk, "explain analyze select sum(k) from t group by v")
s.checkMemoryInfo(c, tk, "explain analyze select sum(v) from t group by k")
s.checkMemoryInfo(c, tk, "explain analyze select * from t")
s.checkMemoryInfo(c, tk, "explain analyze select k from t use index(k)")
s.checkMemoryInfo(c, tk, "explain analyze select * from t use index(k)")
}

func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) {
memCol := 5
ops := []string{"Join", "Reader", "Top", "Sort", "LookUp"}
rows := tk.MustQuery(sql).Rows()
for _, row := range rows {
strs := make([]string, len(row))
for i, c := range row {
strs[i] = c.(string)
}
if strings.Contains(strs[2], "cop") {
continue
}

shouldHasMem := false
for _, op := range ops {
if strings.Contains(strs[0], op) {
shouldHasMem = true
break
}
}

if shouldHasMem {
c.Assert(strs[memCol], Not(Equals), "N/A")
} else {
c.Assert(strs[memCol], Equals, "N/A")
}
}
}
4 changes: 0 additions & 4 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
return nil, nil
}

if e.task != nil {
e.task.memTracker.Detach()
}
e.task = task
return task, nil
}
Expand Down Expand Up @@ -655,7 +652,6 @@ func (e *IndexLookUpJoin) Close() error {
e.cancelFunc()
}
e.workerWg.Wait()
e.memTracker.Detach()
e.memTracker = nil
return e.children[0].Close()
}
2 changes: 0 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func (e *HashJoinExec) Close() error {
e.outerChkResourceCh = nil
e.joinChkResourceCh = nil
}
e.memTracker.Detach()
e.memTracker = nil

err := e.baseExecutor.Close()
Expand Down Expand Up @@ -638,7 +637,6 @@ type NestedLoopApplyExec struct {
func (e *NestedLoopApplyExec) Close() error {
e.innerRows = nil

e.memTracker.Detach()
e.memTracker = nil
return e.outerExec.Close()
}
Expand Down
1 change: 0 additions & 1 deletion executor/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ func (t *mergeJoinInnerTable) reallocReaderResult() {

// Close implements the Executor Close interface.
func (e *MergeJoinExec) Close() error {
e.memTracker.Detach()
e.childrenResults = nil
e.memTracker = nil

Expand Down
1 change: 0 additions & 1 deletion executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type SortExec struct {

// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
e.memTracker.Detach()
e.memTracker = nil
return e.children[0].Close()
}
Expand Down
11 changes: 7 additions & 4 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)

Expand Down Expand Up @@ -74,11 +74,16 @@ type TableReaderExecutor struct {
corColInAccess bool
plans []plannercore.PhysicalPlan

memTracker *memory.Tracker

selectResultHook // for testing
}

// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
Expand Down Expand Up @@ -158,8 +163,6 @@ func (e *TableReaderExecutor) Close() error {
return err
}

var tableReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("TableReaderDistSQLTracker")

// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
Expand All @@ -170,7 +173,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, tableReaderDistSQLTrackerLabel).
SetMemTracker(e.memTracker).
Build()
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) {
rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1")
c.Assert(len(rs.Rows()), Equals, 10)
for _, row := range rs.Rows() {
c.Assert(len(row), Equals, 5)
c.Assert(len(row), Equals, 6)
execInfo := row[4].(string)
c.Assert(strings.Contains(execInfo, "time"), Equals, true)
c.Assert(strings.Contains(execInfo, "loops"), Equals, true)
Expand Down Expand Up @@ -977,7 +977,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) {
c.Assert(rs.Rows(), HasLen, 10)
hasIndexLookUp12 := false
for _, row := range rs.Rows() {
c.Assert(row, HasLen, 5)
c.Assert(row, HasLen, 6)
if strings.HasSuffix(row[0].(string), "IndexLookUp_12") {
hasIndexLookUp12 = true
c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0")
Expand Down
9 changes: 8 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func (e *Explain) prepareSchema() error {
case ast.ExplainFormatROW:
retFields := []string{"id", "count", "task", "operator info"}
if e.Analyze {
retFields = append(retFields, "execution info")
retFields = append(retFields, "execution info", "memory")
}
schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...)
for _, fieldName := range retFields {
Expand Down Expand Up @@ -628,6 +628,13 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st
} else {
row = append(row, "time:0ns, loops:0, rows:0")
}

tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
row = append(row, tracker.BytesToString(tracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}
}
e.Rows = append(e.Rows, row)
}
Expand Down
36 changes: 21 additions & 15 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ func (t *Tracker) AttachTo(parent *Tracker) {
t.parent.Consume(t.BytesConsumed())
}

// Detach detaches this Tracker from its parent.
func (t *Tracker) Detach() {
t.parent.remove(t)
}

func (t *Tracker) remove(oldChild *Tracker) {
t.mu.Lock()
defer t.mu.Unlock()
Expand Down Expand Up @@ -144,17 +139,13 @@ func (t *Tracker) Consume(bytes int64) {
rootExceed = tracker
}

if tracker.parent == nil {
// since we only need a total memory usage during execution,
// we only record max consumed bytes in root(statement-level) for performance.
for {
maxNow := atomic.LoadInt64(&tracker.maxConsumed)
consumed := atomic.LoadInt64(&tracker.bytesConsumed)
if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) {
continue
}
break
for {
maxNow := atomic.LoadInt64(&tracker.maxConsumed)
consumed := atomic.LoadInt64(&tracker.bytesConsumed)
if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) {
continue
}
break
}
}
if rootExceed != nil {
Expand All @@ -172,6 +163,21 @@ func (t *Tracker) MaxConsumed() int64 {
return atomic.LoadInt64(&t.maxConsumed)
}

// SearchTracker searches the specific tracker under this tracker.
func (t *Tracker) SearchTracker(label string) *Tracker {
if t.label.String() == label {
return t
}
t.mu.Lock()
defer t.mu.Unlock()
for _, child := range t.mu.children {
if result := child.SearchTracker(label); result != nil {
return result
}
}
return nil
}

// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
buffer := bytes.NewBufferString("\n")
Expand Down

0 comments on commit da97861

Please sign in to comment.