Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: dump FMSketch to KV only for partition table with dynamic prune mode #24453

Merged
merged 16 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
var err1 error
if result.StatsVer == statistics.Version3 {
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, nil, result.TopNs[i], result.Fms[i], result.StatsVer, 1)
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, nil, result.TopNs[i], result.Fms[i], result.StatsVer, 1, result.TableID.IsPartitionTable() && needGlobalStats)
} else {
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.Fms[i], result.StatsVer, 1)
err1 = statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.Fms[i], result.StatsVer, 1, result.TableID.IsPartitionTable() && needGlobalStats)
}
if err1 != nil {
err = err1
Expand Down Expand Up @@ -198,7 +198,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN, fms := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i], globalStats.Fms[i]
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1)
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1, false)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
Expand Down
6 changes: 4 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
}
for i := 0; i < newColGlobalStats.Num; i++ {
hg, cms, topN, fms := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i], newColGlobalStats.Fms[i]
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1)
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -141,7 +142,8 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
}
for i := 0; i < newIndexGlobalStats.Num; i++ {
hg, cms, topN, fms := newIndexGlobalStats.Hg[i], newIndexGlobalStats.Cms[i], newIndexGlobalStats.TopN[i], newIndexGlobalStats.Fms[i]
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, 1, hg, cms, topN, fms, 2, 1)
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, 1, hg, cms, topN, fms, 2, 1, false)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,15 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,
}

for _, col := range tbl.Columns {
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1)
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1, false)
if err != nil {
return errors.Trace(err)
}
}
for _, idx := range tbl.Indices {
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1)
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *testStatsSuite) TestDumpCMSketchWithTopN(c *C) {
cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100)

stat := h.GetTableStats(tableInfo)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1, false)
c.Assert(err, IsNil)
c.Assert(h.Update(is), IsNil)

Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func (h *Handle) extendedStatsFromStorage(reader *statsReader, table *statistics
}

// SaveStatsToStorage saves the stats to storage.
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64) (err error) {
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool) (err error) {
h.mu.Lock()
defer h.mu.Unlock()
ctx := context.TODO()
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
if _, err := exec.ExecuteInternal(ctx, "delete from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, hg.ID); err != nil {
return err
}
if fmSketch != nil {
if fmSketch != nil && needDumpFMS {
if _, err = exec.ExecuteInternal(ctx, "insert into mysql.stats_fm_sketch (table_id, is_index, hist_id, value) values (%?, %?, %?, %?)", tableID, isIndex, hg.ID, fmSketch); err != nil {
return err
}
Expand Down
85 changes: 62 additions & 23 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2332,41 +2332,45 @@ func (s *testStatsSuite) TestDuplicateFMSketch(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int, c int)")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustExec("create table t(a int, b int, c int) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1, 1, 1)")
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9"))
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9"))

tk.MustExec("alter table t drop column a")
tk.MustExec("alter table t drop column b")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), time.Duration(0)), IsNil)
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
}

func (s *testStatsSuite) TestIndexFMSketch(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, index ia(a), index ibc(b, c))")
tk.MustExec("create table t(a int, b int, c int, index ia(a), index ibc(b, c)) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1, 1, 1)")
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer tk.MustExec("set @@tidb_partition_prune_mode='static'")
tk.MustExec("analyze table t index ia")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3"))
tk.MustExec("analyze table t index ibc")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("5"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("15"))
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

// clustered index
tk.MustExec("drop table if exists t")
tk.MustExec("set @@tidb_enable_clustered_index=ON")
tk.MustExec("create table t (a datetime, b datetime, primary key (a))")
tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3")
tk.MustExec("insert into t values ('2000-01-01', '2000-01-01')")
tk.MustExec("analyze table t")
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("2"))
tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6"))
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

Expand All @@ -2383,25 +2387,23 @@ func (s *testStatsSuite) TestIndexFMSketch(c *C) {
}

tk.MustExec("set @@tidb_enable_clustered_index=OFF")
tk.MustExec("create table t(a int, key(a))")
tk.MustExec("create table t(a int, key(a)) partition by hash(a) partitions 3")
tk.MustExec("insert into t values (1), (2), (2), (3)")
checkNDV(2, 3)
tk.MustExec("insert into t values (4), (5)")
checkNDV(2, 5)
checkNDV(6, 1)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("insert into t values (4), (5), (6)")
checkNDV(6, 2)
tk.MustExec("insert into t values (2), (5)")
checkNDV(2, 5)
checkNDV(6, 2)
tk.MustExec("drop table if exists t")
c.Assert(s.do.StatsHandle().GCStats(s.do.InfoSchema(), 0), IsNil)

// clustered index
tk.MustExec("set @@tidb_enable_clustered_index=ON")
tk.MustExec("create table t (a datetime, b datetime, primary key (a))")
tk.MustExec("insert into t values ('2000-01-01', '2000-01-01')")
checkNDV(2, 1)
tk.MustExec("insert into t values ('2020-01-01', '2020-01-01')")
checkNDV(2, 2)
tk.MustExec("insert into t values ('1999-01-01', '1999-01-01'), ('1999-01-02', '1999-01-02'), ('1999-01-03', '1999-01-03')")
checkNDV(2, 5)
tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3")
tk.MustExec("insert into t values ('2000-01-01', '2001-01-01'), ('2001-01-01', '2001-01-01'), ('2002-01-01', '2001-01-01')")
checkNDV(6, 1)
tk.MustExec("insert into t values ('1999-01-01', '1998-01-01'), ('1997-01-02', '1999-01-02'), ('1998-01-03', '1999-01-03')")
checkNDV(6, 2)
}

func (s *testStatsSuite) TestShowExtendedStats4DropColumn(c *C) {
Expand Down Expand Up @@ -2802,3 +2804,40 @@ func (s *testSerialStatsSuite) TestIssues24349(c *C) {
"test t global b 0 1 10 1 4 4 0",
))
}

func (s *testStatsSuite) TestIssues24401(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")

// normal table with static prune mode
testKit.MustExec("set @@tidb_partition_prune_mode='static'")
testKit.MustExec("create table t(a int, index(a))")
testKit.MustExec("insert into t values (1), (2), (3)")
testKit.MustExec("analyze table t")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// partition table with static prune mode
testKit.MustExec("create table tp(a int, index(a)) partition by hash(a) partitions 3")
testKit.MustExec("insert into tp values (1), (2), (3)")
testKit.MustExec("analyze table tp")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// normal table with dynamic prune mode
testKit.MustExec("set @@tidb_partition_prune_mode='dynamic'")
defer testKit.MustExec("set @@tidb_partition_prune_mode='static'")
testKit.MustExec("analyze table t")
testKit.MustQuery("select * from mysql.stats_fm_sketch").Check(testkit.Rows())

// partition table with dynamic prune mode
testKit.MustExec("analyze table tp")
rows := testKit.MustQuery("select * from mysql.stats_fm_sketch").Rows()
lenRows := len(rows)
c.Assert(lenRows, Equals, 6)

// check fm-sketch won't increase infinitely
testKit.MustExec("insert into t values (10), (20), (30), (12), (23), (23), (4344)")
testKit.MustExec("analyze table tp")
rows = testKit.MustQuery("select * from mysql.stats_fm_sketch").Rows()
c.Assert(len(rows), Equals, lenRows)
}
11 changes: 6 additions & 5 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,11 +750,11 @@ func (h *Handle) handleSingleHistogramUpdate(is infoschema.InfoSchema, rows []ch
return nil
}
var tbl *statistics.Table
if table.Meta().GetPartitionInfo() == nil || h.CurrentPruneMode() == variable.Dynamic {
tbl = h.GetTableStats(table.Meta())
} else {
tbl = h.GetPartitionStats(table.Meta(), physicalTableID)
// feedback for partition is not ready
if table.Meta().GetPartitionInfo() != nil {
return nil
}
tbl = h.GetTableStats(table.Meta())
var cms *statistics.CMSketch
var hist *statistics.Histogram
var topN *statistics.TopN
Expand Down Expand Up @@ -822,7 +822,8 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error {

func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *statistics.QueryFeedback, hist *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int64) error {
hist = statistics.UpdateHistogram(hist, q, int(statsVersion))
err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0)
// feedback for partition is not ready.
err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0, false)
metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}
Expand Down