Skip to content

Commit

Permalink
statistics: merge partition-level TopN to global-level TopN (#22433)
Browse files Browse the repository at this point in the history
  • Loading branch information
Reminiscent authored Feb 19, 2021
1 parent c9e85ec commit 8842bbc
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 45 deletions.
6 changes: 3 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (e *AnalyzeIndexExec) buildStatsFromResult(result distsql.SelectResult, nee
if err := cms.MergeCMSketch(cm); err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]), false)
statistics.MergeTopNAndUpdateCMSketch(topn, tmpTopN, cms, uint32(e.opts[ast.AnalyzeOptNumTopN]))
}
}
}
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
if err != nil {
return nil, nil, nil, err
}
statistics.MergeTopN(topN, curTopN, cmSketch, numTop, false)
statistics.MergeTopNAndUpdateCMSketch(topN, curTopN, cmSketch, numTop)
}
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), idxInfo.ID, collector, types.NewFieldType(mysql.TypeBlob), rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
cms.CalcDefaultValForAnalyze(uint64(hist.NDV))
}
if statsVer == statistics.Version2 {
poped := statistics.MergeTopN(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]), false)
poped := statistics.MergeTopNAndUpdateCMSketch(topN, idxExec.oldTopN, cms, uint32(idxExec.opts[ast.AnalyzeOptNumTopN]))
hist.AddIdxVals(poped)
}
result := analyzeResult{
Expand Down
100 changes: 64 additions & 36 deletions statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,43 +304,18 @@ func (c *CMSketch) queryHashValue(h1, h2 uint64) uint64 {
return uint64(res)
}

// MergeTopN merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopN(dst, src *TopN, c *CMSketch, numTop uint32, usingMax bool) []TopNMeta {
if dst.TotalCount()+src.TotalCount() == 0 {
return nil
}
popedTopNPair := make([]TopNMeta, 0, 4)
counter := make(map[hack.MutableString]uint64)
for _, meta := range dst.TopN {
counter[hack.String(meta.Encoded)] += meta.Count
}
for _, meta := range src.TopN {
if usingMax {
counter[hack.String(meta.Encoded)] = mathutil.MaxUint64(counter[hack.String(meta.Encoded)], meta.Count)
} else {
counter[hack.String(meta.Encoded)] += meta.Count
}
}
sorted := make([]uint64, len(counter))
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
numTop = mathutil.MinUint32(uint32(len(counter)), numTop)
lastTopCnt := sorted[numTop-1]
dst.TopN = make([]TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
dst.AppendTopN(data, cnt)
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
c.InsertBytesByCount(data, cnt)
}
// MergeTopNAndUpdateCMSketch merges the src TopN into the dst, and spilled values will be inserted into the CMSketch.
func MergeTopNAndUpdateCMSketch(dst, src *TopN, c *CMSketch, numTop uint32) []TopNMeta {
topNs := []*TopN{src, dst}
mergedTopN, popedTopNPair := MergeTopN(topNs, numTop)
if mergedTopN == nil {
// mergedTopN == nil means the total count of the input TopN are equal to zero
return popedTopNPair
}
dst.TopN = mergedTopN.TopN
for _, topNMeta := range popedTopNPair {
c.InsertBytesByCount(topNMeta.Encoded, topNMeta.Count)
}
dst.Sort()
return popedTopNPair
}

Expand Down Expand Up @@ -652,3 +627,56 @@ func (c *TopN) RemoveVal(val []byte) {
func NewTopN(n int) *TopN {
return &TopN{TopN: make([]TopNMeta, 0, n)}
}

// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
// The output parameters are the newly generated TopN structure and the remaining numbers.
// Notice: The n can be 0. So n has no default value, we must explicitly specify this value.
func MergeTopN(topNs []*TopN, n uint32) (*TopN, []TopNMeta) {
totCnt := uint64(0)
for _, topN := range topNs {
totCnt += topN.TotalCount()
}
if totCnt == 0 {
return nil, nil
}
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]uint64)
for _, topN := range topNs {
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
counter[hack.String(val.Encoded)] += val.Count
}
}

numTop := len(counter)
if numTop == 0 {
return nil, nil
}
sorted := make([]uint64, numTop)
for _, cnt := range counter {
sorted = append(sorted, cnt)
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i] > sorted[j]
})
n = mathutil.MinUint32(uint32(numTop), n)
// lastTopCnt is the smallest value in the new TopN structure
lastTopCnt := sorted[numTop-1]

var finalTopN TopN
finalTopN.TopN = make([]TopNMeta, 0, n)
popedTopNPair := make([]TopNMeta, 0, uint32(numTop)-n)
for value, cnt := range counter {
data := hack.Slice(string(value))
if cnt >= lastTopCnt {
finalTopN.AppendTopN(data, cnt)
} else {
popedTopNPair = append(popedTopNPair, TopNMeta{Encoded: data, Count: cnt})
}
}
finalTopN.Sort()
return &finalTopN, popedTopNPair
}
87 changes: 87 additions & 0 deletions statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"fmt"
"math"
"math/rand"
"strconv"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -303,3 +304,89 @@ func (s *testStatisticsSuite) TestCMSketchCodingTopN(c *C) {
// do not panic
DecodeCMSketchAndTopN([]byte{}, rows)
}

func (s *testStatisticsSuite) TestMergeTopN(c *C) {
tests := []struct {
topnNum int
n int
maxTopNVal int
maxTopNCnt int
}{
{
topnNum: 10,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 1,
n: 5,
maxTopNVal: 50,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 5,
maxTopNCnt: 100,
},
{
topnNum: 5,
n: 5,
maxTopNVal: 10,
maxTopNCnt: 100,
},
}
for _, t := range tests {
topnNum, n := t.topnNum, t.n
maxTopNVal, maxTopNCnt := t.maxTopNVal, t.maxTopNCnt

// the number of maxTopNVal should be bigger than n.
ok := maxTopNVal >= n
c.Assert(ok, Equals, true)

topNs := make([]*TopN, 0, topnNum)
res := make(map[int]uint64)
rand.Seed(time.Now().Unix())
for i := 0; i < topnNum; i++ {
topN := NewTopN(n)
occur := make(map[int]bool)
for j := 0; j < n; j++ {
// The range of numbers in the topn structure is in [0, maxTopNVal)
// But there cannot be repeated occurrences of value in a topN structure.
randNum := rand.Intn(maxTopNVal)
for occur[randNum] {
randNum = rand.Intn(maxTopNVal)
}
occur[randNum] = true
tString := []byte(fmt.Sprintf("%d", randNum))
// The range of the number of occurrences in the topn structure is in [0, maxTopNCnt)
randCnt := uint64(rand.Intn(maxTopNCnt))
res[randNum] += randCnt
topNMeta := TopNMeta{tString, randCnt}
topN.TopN = append(topN.TopN, topNMeta)
}
topNs = append(topNs, topN)
}
topN, remainTopN := MergeTopN(topNs, uint32(n))
cnt := len(topN.TopN)
var minTopNCnt uint64
for _, topNMeta := range topN.TopN {
val, err := strconv.Atoi(string(topNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(topNMeta.Count, Equals, res[val])
minTopNCnt = topNMeta.Count
}
if remainTopN != nil {
cnt += len(remainTopN)
for _, remainTopNMeta := range remainTopN {
val, err := strconv.Atoi(string(remainTopNMeta.Encoded))
c.Assert(err, IsNil)
c.Assert(remainTopNMeta.Count, Equals, res[val])
ok = minTopNCnt > remainTopNMeta.Count
c.Assert(ok, Equals, true)
}
}
c.Assert(cnt, Equals, len(res))
}
}
16 changes: 12 additions & 4 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,18 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i

// Merge topN. We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These left numbers should be inserted into the histogram.
err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet")
if err != nil {
return
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
var popedTopN []statistics.TopNMeta
n := uint32(0)
for _, topN := range allTopN[i] {
if topN == nil {
continue
}
n = mathutil.MaxUint32(n, uint32(len(topN.TopN)))
}
globalStats.TopN[i], popedTopN = statistics.MergeTopN(allTopN[i], n)
if len(popedTopN) != 0 {
// TODO: use the popedTopN as a bucket for later histogram merging.
}

// Merge histogram
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
// Test the 'dynamic-only' mode
testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic-only';")
err := testKit.ExecToErr("analyze table t, t1;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand All @@ -722,7 +722,7 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
c.Assert(len(result.Rows()), Equals, 1)

err = testKit.ExecToErr("analyze table t index idx_t_ab, idx_t_b;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
Expand Down

0 comments on commit 8842bbc

Please sign in to comment.