Skip to content

Commit

Permalink
planner: PhysicalPlan memory trace 7 (#38030)
Browse files Browse the repository at this point in the history
ref #37632
  • Loading branch information
fzzf678 authored Sep 23, 2022
1 parent 5fe9379 commit c4f6de1
Show file tree
Hide file tree
Showing 8 changed files with 135 additions and 3 deletions.
5 changes: 5 additions & 0 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (mp *mockDataPhysicalPlan) SelectBlockOffset() int {
return 0
}

// MemoryUsage of mockDataPhysicalPlan is only for testing
func (mp *mockDataPhysicalPlan) MemoryUsage() (sum int64) {
return
}

func buildMockDataPhysicalPlan(ctx sessionctx.Context, srcExec Executor) *mockDataPhysicalPlan {
return &mockDataPhysicalPlan{
schema: srcExec.Schema(),
Expand Down
4 changes: 2 additions & 2 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3965,9 +3965,9 @@ type mockPhysicalIndexReader struct {
e Executor
}

// MemoryUsage return the memory usage of mockPhysicalIndexReader
// MemoryUsage of mockPhysicalIndexReader is only for testing
func (p *mockPhysicalIndexReader) MemoryUsage() (sum int64) {
return // mock operator for testing only
return
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
Expand Down
5 changes: 5 additions & 0 deletions executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,11 @@ func (mp *mockPlan) Schema() *expression.Schema {
return mp.exec.Schema()
}

// MemoryUsage of mockPlan is only for testing
func (mp *mockPlan) MemoryUsage() (sum int64) {
return
}

func TestVecGroupCheckerDATARACE(t *testing.T) {
ctx := mock.NewContext()

Expand Down
6 changes: 5 additions & 1 deletion expression/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package expression
import (
"strings"
"unsafe"

"github.com/pingcap/tidb/util/size"
)

// KeyInfo stores the columns of one unique key or primary key.
Expand Down Expand Up @@ -216,17 +218,19 @@ func (s *Schema) MemoryUsage() (sum int64) {
return
}

sum = emptySchemaSize
sum = emptySchemaSize + int64(cap(s.Columns))*size.SizeOfPointer + int64(cap(s.Keys)+cap(s.UniqueKeys))*size.SizeOfSlice

for _, col := range s.Columns {
sum += col.MemoryUsage()
}
for _, cols := range s.Keys {
sum += int64(cap(cols)) * size.SizeOfPointer
for _, col := range cols {
sum += col.MemoryUsage()
}
}
for _, cols := range s.UniqueKeys {
sum += int64(cap(cols)) * size.SizeOfPointer
for _, col := range cols {
sum += col.MemoryUsage()
}
Expand Down
5 changes: 5 additions & 0 deletions planner/core/find_best_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ func (p *mockPhysicalPlan4Test) attach2Task(tasks ...task) task {
return t
}

// MemoryUsage of mockPhysicalPlan4Test is only for testing
func (p *mockPhysicalPlan4Test) MemoryUsage() (sum int64) {
return
}

func TestCostOverflow(t *testing.T) {
ctx := MockContext()
// Plan Tree: mockPlan -> mockDataSource
Expand Down
14 changes: 14 additions & 0 deletions planner/core/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/size"
)

// Init initializes LogicalAggregation.
Expand Down Expand Up @@ -472,6 +473,19 @@ func (p PhysicalTableSample) Init(ctx sessionctx.Context, offset int) *PhysicalT
return &p
}

// MemoryUsage return the memory usage of PhysicalTableSample
func (p *PhysicalTableSample) MemoryUsage() (sum int64) {
if p == nil {
return
}

sum = p.physicalSchemaProducer.MemoryUsage() + size.SizeOfInterface + size.SizeOfBool
if p.TableSampleInfo != nil {
sum += p.TableSampleInfo.MemoryUsage()
}
return
}

// Init initializes PhysicalIndexReader.
func (p PhysicalIndexReader) Init(ctx sessionctx.Context, offset int) *PhysicalIndexReader {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeIndexReader, &p, offset)
Expand Down
27 changes: 27 additions & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package core

import (
"math"
"unsafe"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/size"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -1926,6 +1928,31 @@ type CTEClass struct {
ColumnMap map[string]*expression.Column
}

const emptyCTEClassSize = int64(unsafe.Sizeof(CTEClass{}))

// MemoryUsage return the memory usage of CTEClass
func (cc *CTEClass) MemoryUsage() (sum int64) {
if cc == nil {
return
}

sum = emptyCTEClassSize
if cc.seedPartPhysicalPlan != nil {
sum += cc.seedPartPhysicalPlan.MemoryUsage()
}
if cc.recursivePartPhysicalPlan != nil {
sum += cc.recursivePartPhysicalPlan.MemoryUsage()
}

for _, expr := range cc.pushDownPredicates {
sum += expr.MemoryUsage()
}
for key, val := range cc.ColumnMap {
sum += size.SizeOfString + int64(len(key)) + size.SizeOfPointer + val.MemoryUsage()
}
return
}

// LogicalCTE is for CTE.
type LogicalCTE struct {
logicalSchemaProducer
Expand Down
72 changes: 72 additions & 0 deletions planner/core/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,15 @@ func (p *PhysicalUnionAll) Clone() (PhysicalPlan, error) {
return cloned, nil
}

// MemoryUsage return the memory usage of PhysicalUnionAll
func (p *PhysicalUnionAll) MemoryUsage() (sum int64) {
if p == nil {
return
}

return p.physicalSchemaProducer.MemoryUsage() + size.SizeOfBool
}

// AggMppRunMode defines the running mode of aggregation in MPP
type AggMppRunMode int

Expand Down Expand Up @@ -2070,6 +2079,22 @@ type TableSampleInfo struct {
Partitions []table.PartitionedTable
}

// MemoryUsage return the memory usage of TableSampleInfo
func (t *TableSampleInfo) MemoryUsage() (sum int64) {
if t == nil {
return
}

sum = size.SizeOfPointer*2 + size.SizeOfSlice + int64(cap(t.Partitions))*size.SizeOfInterface
if t.AstNode != nil {
sum += int64(unsafe.Sizeof(ast.TableSample{}))
}
if t.FullSchema != nil {
sum += t.FullSchema.MemoryUsage()
}
return
}

// NewTableSampleInfo creates a new TableSampleInfo.
func NewTableSampleInfo(node *ast.TableSample, fullSchema *expression.Schema, pt []table.PartitionedTable) *TableSampleInfo {
if node == nil {
Expand Down Expand Up @@ -2128,11 +2153,39 @@ func (p *PhysicalCTE) ExplainID() fmt.Stringer {
})
}

// MemoryUsage return the memory usage of PhysicalCTE
func (p *PhysicalCTE) MemoryUsage() (sum int64) {
if p == nil {
return
}

sum = p.physicalSchemaProducer.MemoryUsage() + p.cteAsName.MemoryUsage()
if p.SeedPlan != nil {
sum += p.SeedPlan.MemoryUsage()
}
if p.RecurPlan != nil {
sum += p.RecurPlan.MemoryUsage()
}
if p.CTE != nil {
sum += p.CTE.MemoryUsage()
}
return
}

// ExplainInfo overrides the ExplainInfo
func (p *PhysicalCTETable) ExplainInfo() string {
return "Scan on CTE_" + strconv.Itoa(p.IDForStorage)
}

// MemoryUsage return the memory usage of PhysicalCTETable
func (p *PhysicalCTETable) MemoryUsage() (sum int64) {
if p == nil {
return
}

return p.physicalSchemaProducer.MemoryUsage() + size.SizeOfInt
}

// CTEDefinition is CTE definition for explain.
type CTEDefinition PhysicalCTE

Expand All @@ -2157,6 +2210,25 @@ func (p *CTEDefinition) ExplainID() fmt.Stringer {
})
}

// MemoryUsage return the memory usage of CTEDefinition
func (p *CTEDefinition) MemoryUsage() (sum int64) {
if p == nil {
return
}

sum = p.physicalSchemaProducer.MemoryUsage() + p.cteAsName.MemoryUsage()
if p.SeedPlan != nil {
sum += p.SeedPlan.MemoryUsage()
}
if p.RecurPlan != nil {
sum += p.RecurPlan.MemoryUsage()
}
if p.CTE != nil {
sum += p.CTE.MemoryUsage()
}
return
}

func appendChildCandidate(origin PhysicalPlan, pp PhysicalPlan, op *physicalOptimizeOp) {
candidate := &tracing.CandidatePlanTrace{
PlanTrace: &tracing.PlanTrace{
Expand Down

0 comments on commit c4f6de1

Please sign in to comment.