Skip to content

Commit

Permalink
statistics: dump FMSketch to KV only for partition table with dynamic…
Browse files Browse the repository at this point in the history
… prune mode (#24453)
  • Loading branch information
rebelice authored May 11, 2021
1 parent 9b86513 commit 3fed33f
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 38 deletions.
7 changes: 4 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
var err1 error
if result.StatsVer == statistics.Version3 {
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, nil, result.TopNs[i], result.Fms[i], result.StatsVer, 1)
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, nil, result.TopNs[i], result.Fms[i], result.StatsVer, 1, result.TableID.IsPartitionTable() && needGlobalStats)
} else {
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.Fms[i], result.StatsVer, 1)
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.Fms[i], result.StatsVer, 1, result.TableID.IsPartitionTable() && needGlobalStats)
}
if err1 != nil {
err = err1
Expand Down Expand Up @@ -198,7 +198,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN, fms := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i], globalStats.Fms[i]
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1)
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1, false)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
}
for i := 0; i < newColGlobalStats.Num; i++ {
hg, cms, topN, fms := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i], newColGlobalStats.Fms[i]
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1)
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +142,8 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
}
for i := 0; i < newIndexGlobalStats.Num; i++ {
hg, cms, topN, fms := newIndexGlobalStats.Hg[i], newIndexGlobalStats.Cms[i], newIndexGlobalStats.TopN[i], newIndexGlobalStats.Fms[i]
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, 1, hg, cms, topN, fms, 2, 1)
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, 1, hg, cms, topN, fms, 2, 1, false)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,15 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,
}

for _, col := range tbl.Columns {
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1)
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1, false)
if err != nil {
return errors.Trace(err)
}
}
for _, idx := range tbl.Indices {
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1)
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *testStatsSuite) TestDumpCMSketchWithTopN(c *C) {
cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100)

stat := h.GetTableStats(tableInfo)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1, false)
c.Assert(err, IsNil)
c.Assert(h.Update(is), IsNil)

Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics
}

// SaveStatsToStorage saves the stats to storage.
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64) (err error) {
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
if fmSketch != nil {
if fmSketch != nil && needDumpFMS {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, isIndex, hg.ID, fmSketch); err != nil {
return err
}
Expand Down
85 changes: 62 additions & 23 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2332,41 +2332,45 @@ func (s *testStatsSuite) TestDuplicateFMSketch(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int, c int)")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustExec("create table t(a int, b int, c int) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1, 1, 1)")
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9"))
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9"))

tk.MustExec("alter table t drop column a")
tk.MustExec("alter table t drop column b")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), time.Duration(0)), IsNil)
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
}

func (s *testStatsSuite) TestIndexFMSketch(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index ia(a), index ibc(b, c))")
tk.MustExec("create table t(a int, b int, c int, index ia(a), index ibc(b, c)) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1, 1, 1)")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustExec("analyze table t index ia")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustExec("analyze table t index ibc")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("5"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("15"))
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

// clustered index
tk.MustExec("drop table if exists t")
tk.MustExec("set @@tidb_enable_clustered_index=ON")
tk.MustExec("create table t (a datetime, b datetime, primary key (a))")
tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3")
tk.MustExec("insert into t values ('2000-01-01', '2000-01-01')")
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

Expand All @@ -2383,25 +2387,23 @@ func (s *testStatsSuite) TestIndexFMSketch(c *C) {
}

tk.MustExec("set @@tidb_enable_clustered_index=OFF")
tk.MustExec("create table t(a int, key(a))")
tk.MustExec("create table t(a int, key(a)) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1), (2), (2), (3)")
checkNDV(2, 3)
tk.MustExec("insert into t values (4), (5)")
checkNDV(2, 5)
checkNDV(6, 1)
tk.MustExec("insert into t values (4), (5), (6)")
checkNDV(6, 2)
tk.MustExec("insert into t values (2), (5)")
checkNDV(2, 5)
checkNDV(6, 2)
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

// clustered index
tk.MustExec("set @@tidb_enable_clustered_index=ON")
tk.MustExec("create table t (a datetime, b datetime, primary key (a))")
tk.MustExec("insert into t values ('2000-01-01', '2000-01-01')")
checkNDV(2, 1)
tk.MustExec("insert into t values ('2020-01-01', '2020-01-01')")
checkNDV(2, 2)
tk.MustExec("insert into t values ('1999-01-01', '1999-01-01'), ('1999-01-02', '1999-01-02'), ('1999-01-03', '1999-01-03')")
checkNDV(2, 5)
tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3")
tk.MustExec("insert into t values ('2000-01-01', '2001-01-01'), ('2001-01-01', '2001-01-01'), ('2002-01-01', '2001-01-01')")
checkNDV(6, 1)
tk.MustExec("insert into t values ('1999-01-01', '1998-01-01'), ('1997-01-02', '1999-01-02'), ('1998-01-03', '1999-01-03')")
checkNDV(6, 2)
}

func (s *testStatsSuite) TestShowExtendedStats4DropColumn(c *C) {
Expand Down Expand Up @@ -2802,3 +2804,40 @@ func (s *testSerialStatsSuite) TestIssues24349(c *C) {
"test t global b 0 1 10 1 4 4 0",
))
}

func (s *testStatsSuite) TestIssues24401(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")

// normal table with static prune mode
testKit.MustExec("set @@tidb_partition_prune_mode='static'")
testKit.MustExec("create table t(a int, index(a))")
testKit.MustExec("insert into t values (1), (2), (3)")
testKit.MustExec("analyze table t")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// partition table with static prune mode
testKit.MustExec("create table tp(a int, index(a)) partition by hash(a) partitions 3")
testKit.MustExec("insert into tp values (1), (2), (3)")
testKit.MustExec("analyze table tp")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// normal table with dynamic prune mode
testKit.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer testKit.MustExec("set @@tidb_partition_prune_mode='static'")
testKit.MustExec("analyze table t")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// partition table with dynamic prune mode
testKit.MustExec("analyze table tp")
rows := testKit.MustQuery("select * from mysql.stats_fm_sketch").Rows()
lenRows := len(rows)
c.Assert(lenRows, Equals, 6)

// check fm-sketch won't increase infinitely
testKit.MustExec("insert into t values (10), (20), (30), (12), (23), (23), (4344)")
testKit.MustExec("analyze table tp")
rows = testKit.MustQuery("select * from mysql.stats_fm_sketch").Rows()
c.Assert(len(rows), Equals, lenRows)
}
11 changes: 6 additions & 5 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,11 +750,11 @@ func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []ch
return nil
}
var tbl *statistics.Table
if table.Meta().GetPartitionInfo() == nil || h.CurrentPruneMode() == variable.Dynamic {
tbl = h.GetTableStats(table.Meta())
} else {
tbl = h.GetPartitionStats(table.Meta(), physicalTableID)
// feedback for partition is not ready
if table.Meta().GetPartitionInfo() != nil {
return nil
}
tbl = h.GetTableStats(table.Meta())
var cms *statistics.CMSketch
var hist *statistics.Histogram
var topN *statistics.TopN
Expand Down Expand Up @@ -822,7 +822,8 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error {

func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *statistics.QueryFeedback, hist *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int64) error {
hist = statistics.UpdateHistogram(hist, q, int(statsVersion))
err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0)
// feedback for partition is not ready.
err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0, false)
metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}
Expand Down

0 comments on commit 3fed33f

Please sign in to comment.