Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: make the memory tracker of Jsonobjectagg more accurate #23024

Merged
merged 11 commits into from
Mar 4, 2021
Merged
20 changes: 17 additions & 3 deletions executor/aggfuncs/func_json_objectagg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/types/json"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/stringutil"
)

const (
// DefPartialResult4JsonObjectAgg is the size of partialResult4JsonObject
DefPartialResult4JsonObjectAgg = int64(unsafe.Sizeof(partialResult4JsonObjectAgg{}))
// DefMapStringInterfaceBucketSize = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(interface{}))+2*ptrSize
DefMapStringInterfaceBucketSize = 8*(1+16+16) + 16
)

type jsonObjectAgg struct {
Expand All @@ -35,17 +38,20 @@ type jsonObjectAgg struct {

type partialResult4JsonObjectAgg struct {
entries map[string]interface{}
bInMap int // indicate there are 2^bInMap buckets in entries.
}

func (e *jsonObjectAgg) AllocPartialResult() (pr PartialResult, memDelta int64) {
p := partialResult4JsonObjectAgg{}
p.entries = make(map[string]interface{})
return PartialResult(&p), DefPartialResult4JsonObjectAgg
p.bInMap = 0
return PartialResult(&p), DefPartialResult4JsonObjectAgg + (1<<p.bInMap)*DefMapStringInterfaceBucketSize
}

func (e *jsonObjectAgg) ResetPartialResult(pr PartialResult) {
p := (*partialResult4JsonObjectAgg)(pr)
p.entries = make(map[string]interface{})
p.bInMap = 0
}

func (e *jsonObjectAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
Expand Down Expand Up @@ -105,8 +111,11 @@ func (e *jsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup
switch x := realVal.(type) {
case nil, bool, int64, uint64, float64, string, json.BinaryJSON, *types.MyDecimal, []uint8, types.Time, types.Duration:
if _, ok := p.entries[keyString]; !ok {
memDelta += int64(len(keyString))
memDelta += getValMemDelta(realVal)
memDelta += int64(len(keyString)) + getValMemDelta(realVal)
if len(p.entries)+1 > (1<<p.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta += (1 << p.bInMap) * DefMapStringInterfaceBucketSize
p.bInMap++
}
}
p.entries[keyString] = realVal
default:
Expand Down Expand Up @@ -150,6 +159,11 @@ func (e *jsonObjectAgg) MergePartialResult(sctx sessionctx.Context, src, dst Par
// and only the last value encountered is used with that key in the returned object
for k, v := range p1.entries {
p2.entries[k] = v
memDelta += int64(len(k)) + getValMemDelta(v)
if len(p2.entries)+1 > (1<<p2.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta += (1 << p2.bInMap) * DefMapStringInterfaceBucketSize
p2.bInMap++
}
}
return 0, nil
}
4 changes: 2 additions & 2 deletions executor/aggfuncs/func_json_objectagg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func (s *testSuite) TestMemJsonObjectagg(c *C) {
}

tests := []multiArgsAggMemTest{
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg, defaultMultiArgsMemDeltaGens, true),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg, defaultMultiArgsMemDeltaGens, false),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg+aggfuncs.DefMapStringInterfaceBucketSize, defaultMultiArgsMemDeltaGens, true),
buildMultiArgsAggMemTester(ast.AggFuncJsonObjectAgg, argTypes, mysql.TypeJSON, numRows, aggfuncs.DefPartialResult4JsonObjectAgg+aggfuncs.DefMapStringInterfaceBucketSize, defaultMultiArgsMemDeltaGens, false),
}
for _, test := range tests {
s.testMultiArgsAggMemFunc(c, test)
Expand Down
9 changes: 3 additions & 6 deletions executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/set"
Expand All @@ -61,10 +62,6 @@ const (
// defBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(slice))+2*ptrSize
// The bucket size may be changed by golang implement in the future.
defBucketMemoryUsage = 8*(1+16+24) + 16
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2
)

func newBaseHashAggWorker(ctx sessionctx.Context, finishCh <-chan struct{}, aggFuncs []aggfuncs.AggFunc,
Expand Down Expand Up @@ -540,7 +537,7 @@ func (w *baseHashAggWorker) getPartialResult(sc *stmtctx.StatementContext, group
mapper[string(groupKey[i])] = partialResults[i]
allMemDelta += int64(len(groupKey[i]))
// Map will expand when count > bucketNum * loadFactor. The memory usage will doubled.
if len(mapper) > (1<<w.BInMap)*loadFactorNum/loadFactorDen {
if len(mapper) > (1<<w.BInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
w.memTracker.Consume(defBucketMemoryUsage * (1 << w.BInMap))
w.BInMap++
}
Expand Down Expand Up @@ -914,7 +911,7 @@ func (e *HashAggExec) getPartialResults(groupKey string) []aggfuncs.PartialResul
e.partialResultMap[groupKey] = partialResults
allMemDelta += int64(len(groupKey))
// Map will expand when count > bucketNum * loadFactor. The memory usage will doubled.
if len(e.partialResultMap) > (1<<e.bInMap)*loadFactorNum/loadFactorDen {
if len(e.partialResultMap) > (1<<e.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
e.memTracker.Consume(defBucketMemoryUsage * (1 << e.bInMap))
e.bInMap++
}
Expand Down
10 changes: 10 additions & 0 deletions util/hack/hack.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,13 @@ func Slice(s string) (b []byte) {
pbytes.Cap = pstring.Len
return
}

// LoadFactor is the maximum average load of a bucket that triggers growth is 6.5 in Golang Map.
// Represent as LoadFactorNum/LoadFactorDen, to allow integer math.
// They are from the golang definition. ref: https://github.com/golang/go/blob/go1.13.15/src/runtime/map.go#L68-L71
const (
// LoadFactorNum is the numerator of load factor
LoadFactorNum = 13
// LoadFactorDen is the denominator of load factor
LoadFactorDen = 2
)
17 changes: 9 additions & 8 deletions util/set/set_with_memory_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@

package set

import "unsafe"
import (
"unsafe"

"github.com/pingcap/tidb/util/hack"
)

const (
// DefStringSetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(string) + unsafe.Sizeof(struct{}))+2*ptrSize
// ref https://github.com/golang/go/blob/go1.15.6/src/reflect/type.go#L2162.
DefStringSetBucketMemoryUsage = 8*(1+16+0) + 16
// DefFloat64SetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(float64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefFloat64SetBucketMemoryUsage = 8*(1+8+0) + 16
// DefInt64SetBucketMemoryUsage = bucketSize*(1+unsafe.Sizeof(int64) + unsafe.Sizeof(struct{}))+2*ptrSize
DefInt64SetBucketMemoryUsage = 8*(1+8+0) + 16
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2

// DefFloat64Size is the size of float64
DefFloat64Size = int64(unsafe.Sizeof(float64(0)))
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewStringSetWithMemoryUsage(ss ...string) (setWithMemoryUsage StringSetWith
// Insert inserts `val` into `s` and return memDelta.
func (s *StringSetWithMemoryUsage) Insert(val string) (memDelta int64) {
s.StringSet.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefStringSetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down Expand Up @@ -86,7 +87,7 @@ func NewFloat64SetWithMemoryUsage(ss ...float64) (setWithMemoryUsage Float64SetW
// Insert inserts `val` into `s` and return memDelta.
func (s *Float64SetWithMemoryUsage) Insert(val float64) (memDelta int64) {
s.Float64Set.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefFloat64SetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewInt64SetWithMemoryUsage(ss ...int64) (setWithMemoryUsage Int64SetWithMem
// Insert inserts `val` into `s` and return memDelta.
func (s *Int64SetWithMemoryUsage) Insert(val int64) (memDelta int64) {
s.Int64Set.Insert(val)
if s.Count() > (1<<s.bInMap)*loadFactorNum/loadFactorDen {
if s.Count() > (1<<s.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
memDelta = DefInt64SetBucketMemoryUsage * (1 << s.bInMap)
s.bInMap++
}
Expand Down