Skip to content

Commit

Permalink
executor: make the memory tracker of Jsonobjectagg more accurate (#23024
Browse files Browse the repository at this point in the history
)
  • Loading branch information
wshwsh12 authored Mar 4, 2021
1 parent 263155a commit 8761adc
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 19 deletions.
20 changes: 17 additions & 3 deletions executor/aggfuncs/func_json_objectagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/stringutil"
)

const (
// DefPartialResult4JsonObjectAgg is the size of partialResult4JsonObject
DefPartialResult4JsonObjectAgg = int64(unsafe.Sizeof(partialResult4JsonObjectAgg{}))
// DefMapStringInterfaceBucketSize = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(interface{}))+2*ptrSize
DefMapStringInterfaceBucketSize = 8*(1+16+16) + 16
)

type jsonObjectAgg struct {
Expand All @@ -35,17 +38,20 @@ type jsonObjectAgg struct {

type partialResult4JsonObjectAgg struct {
entries map[string]interface{}
bInMap int // indicate there are 2^bInMap buckets in entries.
}

func (e *jsonObjectAgg) AllocPartialResult() (pr PartialResult, memDelta int64) {
p := partialResult4JsonObjectAgg{}
p.entries = make(map[string]interface{})
return PartialResult(&p), DefPartialResult4JsonObjectAgg
p.bInMap = 0
return PartialResult(&p), DefPartialResult4JsonObjectAgg + (1<<p.bInMap)*DefMapStringInterfaceBucketSize
}

func (e *jsonObjectAgg) ResetPartialResult(pr PartialResult) {
p := (*partialResult4JsonObjectAgg)(pr)
p.entries = make(map[string]interface{})
p.bInMap = 0
}

func (e *jsonObjectAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
Expand Down Expand Up @@ -105,8 +111,11 @@ func (e *jsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup
switch x := realVal.(type) {
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, *types.MyDecimal, []uint8, types.Time, types.Duration:
if _, ok := p.entries[keyString]; !ok {
memDelta += int64(len(keyString))
memDelta += getValMemDelta(realVal)
memDelta += int64(len(keyString)) + getValMemDelta(realVal)
if len(p.entries)+1 > (1<<p.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta += (1 << p.bInMap) * DefMapStringInterfaceBucketSize
p.bInMap++
}
}
p.entries[keyString] = realVal
default:
Expand Down Expand Up @@ -150,6 +159,11 @@ func (e *jsonObjectAgg) MergePartialResult(sctx sessionctx.Context, src, dst Par
// and only the last value encountered is used with that key in the returned object
for k, v := range p1.entries {
p2.entries[k] = v
memDelta += int64(len(k)) + getValMemDelta(v)
if len(p2.entries)+1 > (1<<p2.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta += (1 << p2.bInMap) * DefMapStringInterfaceBucketSize
p2.bInMap++
}
}
return 0, nil
}
4 changes: 2 additions & 2 deletions executor/aggfuncs/func_json_objectagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (s *testSuite) TestMemJsonObjectagg(c *C) {
}

tests := []multiArgsAggMemTest{
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg, defaultMultiArgsMemDeltaGens, true),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg, defaultMultiArgsMemDeltaGens, false),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg+aggfuncs.DefMapStringInterfaceBucketSize, defaultMultiArgsMemDeltaGens, true),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg+aggfuncs.DefMapStringInterfaceBucketSize, defaultMultiArgsMemDeltaGens, false),
}
for _, test := range tests {
s.testMultiArgsAggMemFunc(c, test)
Expand Down
9 changes: 3 additions & 6 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/set"
Expand All @@ -61,10 +62,6 @@ const (
// defBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(slice))+2*ptrSize
// The bucket size may be changed by golang implement in the future.
defBucketMemoryUsage = 8*(1+16+24) + 16
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2
)

func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc,
Expand Down Expand Up @@ -540,7 +537,7 @@ func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, group
mapper[string(groupKey[i])] = partialResults[i]
allMemDelta += int64(len(groupKey[i]))
// Map will expand when count > bucketNum * loadFactor. The memory usage will doubled.
if len(mapper) > (1<<w.BInMap)*loadFactorNum/loadFactorDen {
if len(mapper) > (1<<w.BInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
w.memTracker.Consume(defBucketMemoryUsage * (1 << w.BInMap))
w.BInMap++
}
Expand Down Expand Up @@ -914,7 +911,7 @@ func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResul
e.partialResultMap[groupKey] = partialResults
allMemDelta += int64(len(groupKey))
// Map will expand when count > bucketNum * loadFactor. The memory usage will doubled.
if len(e.partialResultMap) > (1<<e.bInMap)*loadFactorNum/loadFactorDen {
if len(e.partialResultMap) > (1<<e.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
e.memTracker.Consume(defBucketMemoryUsage * (1 << e.bInMap))
e.bInMap++
}
Expand Down
10 changes: 10 additions & 0 deletions util/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ func Slice(s string) (b []byte) {
pbytes.Cap = pstring.Len
return
}

// LoadFactor is the maximum average load of a bucket that triggers growth is 6.5 in Golang Map.
// Represent as LoadFactorNum/LoadFactorDen, to allow integer math.
// They are from the golang definition. ref: https://github.com/golang/go/blob/go1.13.15/src/runtime/map.go#L68-L71
const (
// LoadFactorNum is the numerator of load factor
LoadFactorNum = 13
// LoadFactorDen is the denominator of load factor
LoadFactorDen = 2
)
17 changes: 9 additions & 8 deletions util/set/set_with_memory_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@

package set

import "unsafe"
import (
"unsafe"

"github.com/pingcap/tidb/util/hack"
)

const (
// DefStringSetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(struct{}))+2*ptrSize
// ref https://github.com/golang/go/blob/go1.15.6/src/reflect/type.go#L2162.
DefStringSetBucketMemoryUsage = 8*(1+16+0) + 16
// DefFloat64SetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(float64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefFloat64SetBucketMemoryUsage = 8*(1+8+0) + 16
// DefInt64SetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(int64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefInt64SetBucketMemoryUsage = 8*(1+8+0) + 16
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2

// DefFloat64Size is the size of float64
DefFloat64Size = int64(unsafe.Sizeof(float64(0)))
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewStringSetWithMemoryUsage(ss ...string) (setWithMemoryUsage StringSetWith
// Insert inserts `val` into `s` and return memDelta.
func (s *StringSetWithMemoryUsage) Insert(val string) (memDelta int64) {
s.StringSet.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefStringSetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func NewFloat64SetWithMemoryUsage(ss ...float64) (setWithMemoryUsage Float64SetW
// Insert inserts `val` into `s` and return memDelta.
func (s *Float64SetWithMemoryUsage) Insert(val float64) (memDelta int64) {
s.Float64Set.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefFloat64SetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewInt64SetWithMemoryUsage(ss ...int64) (setWithMemoryUsage Int64SetWithMem
// Insert inserts `val` into `s` and return memDelta.
func (s *Int64SetWithMemoryUsage) Insert(val int64) (memDelta int64) {
s.Int64Set.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefInt64SetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down

0 comments on commit 8761adc

Please sign in to comment.