From d63a5fd33dee1c6eef907ca7fae16972cc403557 Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Tue, 19 Apr 2022 23:28:03 +0800 Subject: [PATCH] executor: add some memory tracker in HashJoin (#33918) ref pingcap/tidb#33877 --- executor/aggregate.go | 17 ++++------- executor/concurrent_map.go | 22 ++++++++------ executor/concurrent_map_test.go | 35 ++++++++++++++++++++++ executor/hash_table.go | 52 +++++++++++++++++++++++++++++---- executor/hash_table_test.go | 23 +++++++++++++-- util/chunk/row_container.go | 3 +- util/hack/hack.go | 9 ++++++ 7 files changed, 132 insertions(+), 29 deletions(-) diff --git a/executor/aggregate.go b/executor/aggregate.go index 8a6b83d089e58..7cf5c9bd0c04a 100644 --- a/executor/aggregate.go +++ b/executor/aggregate.go @@ -62,13 +62,6 @@ type baseHashAggWorker struct { BInMap int // indicate there are 2^BInMap buckets in Golang Map. } -const ( - // ref https://github.com/golang/go/blob/go1.15.6/src/reflect/type.go#L2162. - // defBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(slice))+2*ptrSize - // The bucket size may be changed by golang implement in the future. - defBucketMemoryUsage = 8*(1+16+24) + 16 -) - func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc, maxChunkSize int, memTrack *memory.Tracker) baseHashAggWorker { baseWorker := baseHashAggWorker{ @@ -332,7 +325,7 @@ func (e *HashAggExec) initForUnparallelExec() { e.partialResultMap = make(aggPartialResultMapper) e.bInMap = 0 failpoint.Inject("ConsumeRandomPanic", nil) - e.memTracker.Consume(defBucketMemoryUsage*(1< bucketNum * loadFactor. The memory usage will doubled. if len(mapper) > (1< bucketNum * loadFactor. The memory usage will doubled. if len(e.partialResultMap) > (1< (1<= cap(slice) { @@ -263,6 +269,7 @@ func (es *entryStore) GetStore() (e *entry) { es.slices = append(es.slices, slice) sliceIdx++ es.cursor = 0 + memDelta = int64(unsafe.Sizeof(entry{})) * int64(size) } e = &es.slices[sliceIdx][es.cursor] es.cursor++ @@ -273,6 +280,9 @@ type baseHashTable interface { Put(hashKey uint64, rowPtr chunk.RowPtr) Get(hashKey uint64) (rowPtrs []chunk.RowPtr) Len() uint64 + // GetAndCleanMemoryDelta gets and cleans the memDelta of the baseHashTable. Memory delta will be cleared after each fetch. + // It indicates the memory delta of the baseHashTable since the last calling GetAndCleanMemoryDelta(). + GetAndCleanMemoryDelta() int64 } // TODO (fangzhuhe) remove unsafeHashTable later if it not used anymore @@ -283,6 +293,9 @@ type unsafeHashTable struct { hashMap map[uint64]*entry entryStore *entryStore length uint64 + + bInMap int64 // indicate there are 2^bInMap buckets in hashMap + memDelta int64 // the memory delta of the unsafeHashTable since the last calling GetAndCleanMemoryDelta() } // newUnsafeHashTable creates a new unsafeHashTable. estCount means the estimated size of the hashMap. @@ -297,11 +310,16 @@ func newUnsafeHashTable(estCount int) *unsafeHashTable { // Put puts the key/rowPtr pairs to the unsafeHashTable, multiple rowPtrs are stored in a list. func (ht *unsafeHashTable) Put(hashKey uint64, rowPtr chunk.RowPtr) { oldEntry := ht.hashMap[hashKey] - newEntry := ht.entryStore.GetStore() + newEntry, memDelta := ht.entryStore.GetStore() newEntry.ptr = rowPtr newEntry.next = oldEntry ht.hashMap[hashKey] = newEntry + if len(ht.hashMap) > (1< 0) + require.Equal(t, spill, rowContainer.rowContainer.GetMemTracker().BytesConsumed() == 0) + require.Equal(t, !spill, rowContainer.rowContainer.GetMemTracker().BytesConsumed() > 0) + require.True(t, rowContainer.GetMemTracker().BytesConsumed() > 0) // hashtable need memory if rowContainer.alreadySpilledSafeForTest() { require.NotNil(t, rowContainer.GetDiskTracker()) require.True(t, rowContainer.GetDiskTracker().BytesConsumed() > 0) @@ -162,3 +165,19 @@ func testHashRowContainer(t *testing.T, hashFunc func() hash.Hash64, spill bool) require.Equal(t, chk1.GetRow(1).GetDatumRow(colTypes), matched[1].GetDatumRow(colTypes)) return rowContainer, copiedRC } + +func TestConcurrentMapHashTableMemoryUsage(t *testing.T) { + m := newConcurrentMapHashTable() + const iterations = 1024 * hack.LoadFactorNum / hack.LoadFactorDen // 6656 + wg := &sync.WaitGroup{} + wg.Add(2) + // Note: Now concurrentMapHashTable doesn't support inserting in parallel. + for i := 0; i < iterations; i++ { + // Add entry to map. + m.Put(uint64(i*ShardCount), chunk.RowPtr{ChkIdx: uint32(i), RowIdx: uint32(i)}) + } + mapMemoryExpected := int64(1024) * hack.DefBucketMemoryUsageForMapIntToPtr + entryMemoryExpected := 16 * int64(64+128+256+512+1024+2048+4096) + require.Equal(t, mapMemoryExpected+entryMemoryExpected, m.GetAndCleanMemoryDelta()) + require.Equal(t, int64(0), m.GetAndCleanMemoryDelta()) +} diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 3407022f4718f..e2104c52b45dc 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -99,9 +99,10 @@ func NewRowContainer(fieldType []*types.FieldType, chunkSize int) *RowContainer }, fieldType: fieldType, chunkSize: chunkSize, - memTracker: li.memTracker, + memTracker: memory.NewTracker(memory.LabelForRowContainer, -1), diskTracker: disk.NewTracker(memory.LabelForRowContainer, -1), } + li.GetMemTracker().AttachTo(rc.GetMemTracker()) return rc } diff --git a/util/hack/hack.go b/util/hack/hack.go index 60ad6aab9adf4..8ccf1c59efc80 100644 --- a/util/hack/hack.go +++ b/util/hack/hack.go @@ -56,3 +56,12 @@ const ( // LoadFactorDen is the denominator of load factor LoadFactorDen = 2 ) + +const ( + // DefBucketMemoryUsageForMapStrToSlice = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(slice))+2*ptrSize + // ref https://github.com/golang/go/blob/go1.15.6/src/reflect/type.go#L2162. + // The bucket size may be changed by golang implement in the future. + DefBucketMemoryUsageForMapStrToSlice = 8*(1+16+24) + 16 + // DefBucketMemoryUsageForMapIntToPtr = bucketSize*(1+unsafe.Sizeof(uint64) + unsafe.Sizeof(pointer))+2*ptrSize + DefBucketMemoryUsageForMapIntToPtr = 8*(1+8+8) + 16 +)