Skip to content

Commit

Permalink
executor: enhance memory tracking for IndexLookup (#49336)
Browse files Browse the repository at this point in the history
close #45901
  • Loading branch information
solotzg authored Jan 31, 2024
1 parent d62ba73 commit d0d4eca
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 14 deletions.
47 changes: 33 additions & 14 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil/consistency"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -797,7 +798,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
}
}

func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (exec.Executor, error) {
func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookupTableTask) (*TableReaderExecutor, error) {
table := e.table
if e.partitionTableMode && task.partitionTable != nil {
table = task.partitionTable
Expand Down Expand Up @@ -1474,10 +1475,22 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
return w.compareData(ctx, task, tableReader)
}

task.memTracker = w.memTracker
memUsage := int64(cap(task.handles) * 8)
task.memUsage = memUsage
task.memTracker.Consume(memUsage)
{
task.memTracker = w.memTracker
memUsage := int64(cap(task.handles))*size.SizeOfInterface + tableReader.memUsage()
for _, h := range task.handles {
memUsage += int64(h.MemUsage())
}
if task.indexOrder != nil {
memUsage += task.indexOrder.MemUsage()
}
if task.duplicatedIndexOrder != nil {
memUsage += task.duplicatedIndexOrder.MemUsage()
}
memUsage += task.idxRows.MemoryUsage()
task.memUsage = memUsage
task.memTracker.Consume(memUsage)
}
handleCnt := len(task.handles)
task.rows = make([]chunk.Row, 0, handleCnt)
for {
Expand All @@ -1490,19 +1503,23 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
if chk.NumRows() == 0 {
break
}
memUsage = chk.MemoryUsage()
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
{
memUsage := chk.MemoryUsage()
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
iter := chunk.NewIterator4Chunk(chk)
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
task.rows = append(task.rows, row)
}
}

defer trace.StartRegion(ctx, "IndexLookUpTableCompute").End()
memUsage = int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{}))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
{
memUsage := int64(cap(task.rows)) * int64(unsafe.Sizeof(chunk.Row{}))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
if w.keepOrder {
task.rowIdx = make([]int, 0, len(task.rows))
for i := range task.rows {
Expand All @@ -1513,9 +1530,11 @@ func (w *tableWorker) executeTask(ctx context.Context, task *lookupTableTask) er
rowIdx, _ := task.indexOrder.Get(handle)
task.rowIdx = append(task.rowIdx, rowIdx.(int))
}
memUsage = int64(cap(task.rowIdx) * 4)
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
{
memUsage := int64(cap(task.rowIdx) * int(size.SizeOfInt))
task.memUsage += memUsage
task.memTracker.Consume(memUsage)
}
sort.Sort(task)
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"slices"
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/distsql"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tidb/pkg/util/stringutil"
"github.com/pingcap/tidb/pkg/util/tracing"
"github.com/pingcap/tipb/go-tipb"
Expand Down Expand Up @@ -139,6 +141,20 @@ func (e *TableReaderExecutor) setDummy() {
e.dummy = true
}

func (e *TableReaderExecutor) memUsage() int64 {
const sizeofTableReaderExecutor = int64(unsafe.Sizeof(*(*TableReaderExecutor)(nil)))

res := sizeofTableReaderExecutor
res += size.SizeOfPointer * int64(cap(e.ranges))
for _, v := range e.ranges {
res += v.MemUsage()
}
res += kv.KeyRangeSliceMemUsage(e.kvRanges)
res += int64(e.dagPB.Size())
// TODO: add more statistics
return res
}

// Open initializes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
r, ctx := tracing.StartRegionEx(ctx, "TableReaderExecutor.Open")
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/set",
"//pkg/util/size",
"//pkg/util/tiflash",
"//pkg/util/tiflashcompute",
"//pkg/util/trxevents",
Expand Down Expand Up @@ -87,6 +88,7 @@ go_test(
"//pkg/testkit/testutil",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/size",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/deadlock",
Expand Down
48 changes: 48 additions & 0 deletions pkg/kv/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"strconv"
"strings"
"unsafe"

"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/set"
"github.com/pingcap/tidb/pkg/util/size"
)

// Key represents high-level Key type.
Expand Down Expand Up @@ -102,6 +104,18 @@ type KeyRange struct {
XXXsizecache int32
}

// KeyRangeSliceMemUsage return the memory usage of []KeyRange
func KeyRangeSliceMemUsage(k []KeyRange) int64 {
const sizeofKeyRange = int64(unsafe.Sizeof(*(*KeyRange)(nil)))

res := sizeofKeyRange * int64(cap(k))
for _, m := range k {
res += int64(cap(m.StartKey)) + int64(cap(m.EndKey)) + int64(cap(m.XXXunrecognized))
}

return res
}

// IsPoint checks if the key range represents a point.
func (r *KeyRange) IsPoint() bool {
if len(r.StartKey) != len(r.EndKey) {
Expand Down Expand Up @@ -404,6 +418,12 @@ type strHandleVal struct {
val any
}

// SizeofHandleMap presents the memory size of struct HandleMap
const SizeofHandleMap = int64(unsafe.Sizeof(*(*HandleMap)(nil)))

// SizeofStrHandleVal presents the memory size of struct strHandleVal
const SizeofStrHandleVal = int64(unsafe.Sizeof(*(*strHandleVal)(nil)))

// NewHandleMap creates a new map for handle.
func NewHandleMap() *HandleMap {
return &HandleMap{
Expand Down Expand Up @@ -436,6 +456,34 @@ func (m *HandleMap) Get(h Handle) (v any, ok bool) {
return
}

func calcStrsMemUsage(strs map[string]strHandleVal) int64 {
res := int64(0)
for key := range strs {
res += size.SizeOfString + int64(len(key)) + SizeofStrHandleVal
}
return res
}

func calcIntsMemUsage(ints map[int64]interface{}) int64 {
return int64(len(ints)) * (size.SizeOfInt64 + size.SizeOfInterface)
}

// MemUsage gets the memory usage.
func (m *HandleMap) MemUsage() int64 {
res := SizeofHandleMap
res += int64(len(m.partitionInts)) * (size.SizeOfInt64 + size.SizeOfMap)
for _, v := range m.partitionInts {
res += calcIntsMemUsage(v)
}
res += int64(len(m.partitionStrs)) * (size.SizeOfInt64 + size.SizeOfMap)
for _, v := range m.partitionStrs {
res += calcStrsMemUsage(v)
}
res += calcIntsMemUsage(m.ints)
res += calcStrsMemUsage(m.strs)
return res
}

// Set sets a value with a Handle.
func (m *HandleMap) Set(h Handle, val any) {
ints, strs := m.ints, m.strs
Expand Down
22 changes: 22 additions & 0 deletions pkg/kv/key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/testkit/testutil"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -167,22 +168,34 @@ func TestHandleMap(t *testing.T) {
m := NewHandleMap()
h := IntHandle(1)

assert.Equal(t, SizeofHandleMap, m.MemUsage())

m.Set(h, 1)
v, ok := m.Get(h)
assert.True(t, ok)
assert.Equal(t, 1, v)

assert.Equal(t, SizeofHandleMap+size.SizeOfInt64+size.SizeOfInterface, m.MemUsage())

m.Delete(h)
v, ok = m.Get(h)
assert.False(t, ok)
assert.Nil(t, v)

assert.Equal(t, SizeofHandleMap, m.MemUsage())

ch := testutil.MustNewCommonHandle(t, 100, "abc")
m.Set(ch, "a")
v, ok = m.Get(ch)
assert.True(t, ok)
assert.Equal(t, "a", v)

{
key := string(ch.Encoded())
sz := size.SizeOfString + int64(len(key)) + SizeofStrHandleVal
assert.Equal(t, SizeofHandleMap+sz, m.MemUsage())
}

m.Delete(ch)
v, ok = m.Get(ch)
assert.False(t, ok)
Expand Down Expand Up @@ -324,6 +337,15 @@ func TestKeyRangeDefinition(t *testing.T) {
// And same default value.
require.Equal(t, (*coprocessor.KeyRange)(unsafe.Pointer(&r1)), &r2)
require.Equal(t, &r1, (*KeyRange)(unsafe.Pointer(&r2)))

s := []KeyRange{{
StartKey: []byte("s1"),
EndKey: []byte("e1"),
}, {
StartKey: []byte("s2"),
EndKey: []byte("e2"),
}}
require.Equal(t, int64(168), KeyRangeSliceMemUsage(s))
}

func BenchmarkIsPoint(b *testing.B) {
Expand Down

0 comments on commit d0d4eca

Please sign in to comment.