Skip to content

Commit

Permalink
statistics: merge partition-level FMSketch to global-level FMSketch a…
Browse files Browse the repository at this point in the history
…nd update the column NDV (#22878)
  • Loading branch information
Reminiscent authored Feb 23, 2021
1 parent 3edbab3 commit 62354d0
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 21 deletions.
6 changes: 5 additions & 1 deletion statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ func buildFMSketch(sc *stmtctx.StatementContext, values []types.Datum, maxSize i
return s, s.NDV(), nil
}

func (s *FMSketch) mergeFMSketch(rs *FMSketch) {
// MergeFMSketch merges two FM Sketch.
func (s *FMSketch) MergeFMSketch(rs *FMSketch) {
if s == nil || rs == nil {
return
}
if s.mask < rs.mask {
s.mask = rs.mask
for key := range s.hashset {
Expand Down
4 changes: 2 additions & 2 deletions statistics/fmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (s *testStatisticsSuite) TestSketch(c *C) {
c.Check(err, IsNil)
c.Check(ndv, Equals, int64(100480))

sampleSketch.mergeFMSketch(pkSketch)
sampleSketch.mergeFMSketch(rcSketch)
sampleSketch.MergeFMSketch(pkSketch)
sampleSketch.MergeFMSketch(rcSketch)
c.Check(sampleSketch.NDV(), Equals, int64(100480))

maxSize = 2
Expand Down
29 changes: 24 additions & 5 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)
globalStats.Fms = make([]*statistics.FMSketch, globalStats.Num)

// The first dimension of slice is means the number of column or index stats in the globalStats.
// The second dimension of slice is means the number of partition tables.
Expand All @@ -349,7 +350,7 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i
}
tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, false, 0)
partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, true, 0)
if err != nil {
return
}
Expand Down Expand Up @@ -409,10 +410,28 @@ func (h *Handle) MergePartitionStats2GlobalStats(sc *stmtctx.StatementContext, i
return
}

// Merge NDV
err = errors.Errorf("TODO: The merge function of the NDV has not been implemented yet")
if err != nil {
return
// Update NDV of global-level stats
if isIndex == 0 {
// For the column stats, we should merge the FMSketch first. And use the FMSketch to calculate the new NDV.
// merge FMSketch
globalStats.Fms[i] = allFms[i][0].Copy()
for j := uint64(1); j < partitionNum; j++ {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
}

// update the NDV
globalStatsNDV := globalStats.Fms[i].NDV()
if globalStatsNDV > globalStats.Count {
globalStatsNDV = globalStats.Count
}
globalStats.Hg[i].NDV = globalStatsNDV
} else {
// For the index stats, we get the final NDV by accumulating the NDV of each bucket in the index histogram.
globalStatsNDV := int64(0)
for _, bucket := range globalStats.Hg[i].Buckets {
globalStatsNDV += bucket.NDV
}
globalStats.Hg[i].NDV = globalStatsNDV
}
}
return
Expand Down
24 changes: 12 additions & 12 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,31 +705,31 @@ func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {

// Test the 'dynamic-only' mode
testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic-only';")
err := testKit.ExecToErr("analyze table t, t1;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
testKit.MustExec("analyze table t, t1;")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(len(result.Rows()), Equals, 4)
c.Assert(result.Rows()[0][5], Equals, "5")
c.Assert(result.Rows()[1][5], Equals, "1")
c.Assert(result.Rows()[2][5], Equals, "2")
c.Assert(result.Rows()[3][5], Equals, "2")
result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 15)
c.Assert(len(result.Rows()), Equals, 20)

result = testKit.MustQuery("show stats_meta where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)
c.Assert(result.Rows()[0][5], Equals, "5")
result = testKit.MustQuery("show stats_histograms where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)

err = testKit.ExecToErr("analyze table t index idx_t_ab, idx_t_b;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the NDV has not been implemented yet")
testKit.MustExec("analyze table t index idx_t_ab, idx_t_b;")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(len(result.Rows()), Equals, 4)
c.Assert(result.Rows()[0][5], Equals, "5")
c.Assert(result.Rows()[1][5], Equals, "1")
c.Assert(result.Rows()[2][5], Equals, "2")
c.Assert(result.Rows()[3][5], Equals, "2")
result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 15)
c.Assert(len(result.Rows()), Equals, 20)
}

func (s *testStatsSuite) TestExtendedStatsDefaultSwitch(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion statistics/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *SampleCollector) MergeSampleCollector(sc *stmtctx.StatementContext, rc
c.NullCount += rc.NullCount
c.Count += rc.Count
c.TotalSize += rc.TotalSize
c.FMSketch.mergeFMSketch(rc.FMSketch)
c.FMSketch.MergeFMSketch(rc.FMSketch)
if rc.CMSketch != nil {
err := c.CMSketch.MergeCMSketch(rc.CMSketch)
terror.Log(errors.Trace(err))
Expand Down

0 comments on commit 62354d0

Please sign in to comment.