Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: merge the partition-level stats to global-level stats #22667

Merged
merged 15 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,26 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
close(taskCh)
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
panicCnt := 0

pruneMode := variable.PartitionPruneMode(e.ctx.GetSessionVars().PartitionPruneMode.Load())
// needGlobalStats used to indicate whether we should merge the partition-level stats to global-level stats.
needGlobalStats := pruneMode == variable.DynamicOnly || pruneMode == variable.StaticButPrepareDynamic
type globalStatsKey struct {
tableID int64
indexID int64
}
type globalStatsInfo struct {
isIndex int
// When the `isIndex == 0`, the idxID will be the column ID.
// Otherwise, the idxID will be the index ID.
idxID int64
statsVersion int
}
// globalStatsMap is a map used to store which partition tables and the corresponding indexes need global-level stats.
// The meaning of key in map is the structure that used to store the tableID and indexID.
// The meaning of value in map is some additional information needed to build global-level stats.
globalStatsMap := make(map[globalStatsKey]globalStatsInfo)

for panicCnt < concurrency {
result, ok := <-resultCh
if !ok {
Expand All @@ -113,6 +133,17 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
statisticsID := result.TableID.GetStatisticsID()
for i, hg := range result.Hist {
if result.TableID.IsPartitionTable() && needGlobalStats {
// If it does not belong to the statistics of index, we need to set it to -1 to distinguish.
idxID := int64(-1)
if result.IsIndex != 0 {
idxID = hg.ID
}
globalStatsID := globalStatsKey{result.TableID.TableID, idxID}
if _, ok := globalStatsMap[globalStatsID]; !ok {
globalStatsMap[globalStatsID] = globalStatsInfo{result.IsIndex, hg.ID, result.StatsVer}
}
}
err1 := statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.StatsVer, 1)
if err1 != nil {
err = err1
Expand All @@ -135,6 +166,21 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if needGlobalStats {
for globalStatsID, info := range globalStatsMap {
globalStats, err := statsHandle.MergePartitionStats2GlobalStats(infoschema.GetInfoSchema(e.ctx), globalStatsID.tableID, info.isIndex, info.idxID)
if err != nil {
return err
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
}
}
}
return statsHandle.Update(infoschema.GetInfoSchema(e.ctx))
}

Expand Down
29 changes: 29 additions & 0 deletions executor/show_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func (e *ShowExec) fetchShowStatsMeta() error {
pi := tbl.GetPartitionInfo()
if pi == nil || e.ctx.GetSessionVars().UseDynamicPartitionPrune() {
e.appendTableForStatsMeta(db.Name.O, tbl.Name.O, "", h.GetTableStats(tbl))
if pi != nil {
for _, def := range pi.Definitions {
e.appendTableForStatsMeta(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
}
}
} else {
for _, def := range pi.Definitions {
e.appendTableForStatsMeta(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
Expand Down Expand Up @@ -67,6 +72,11 @@ func (e *ShowExec) fetchShowStatsHistogram() error {
pi := tbl.GetPartitionInfo()
if pi == nil || e.ctx.GetSessionVars().UseDynamicPartitionPrune() {
e.appendTableForStatsHistograms(db.Name.O, tbl.Name.O, "", h.GetTableStats(tbl))
if pi != nil {
for _, def := range pi.Definitions {
e.appendTableForStatsHistograms(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
}
}
} else {
for _, def := range pi.Definitions {
e.appendTableForStatsHistograms(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
Expand Down Expand Up @@ -124,6 +134,13 @@ func (e *ShowExec) fetchShowStatsBuckets() error {
if err := e.appendTableForStatsBuckets(db.Name.O, tbl.Name.O, "", h.GetTableStats(tbl)); err != nil {
return err
}
if pi != nil {
for _, def := range pi.Definitions {
if err := e.appendTableForStatsBuckets(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID)); err != nil {
return err
}
}
}
} else {
for _, def := range pi.Definitions {
if err := e.appendTableForStatsBuckets(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID)); err != nil {
Expand Down Expand Up @@ -172,6 +189,13 @@ func (e *ShowExec) fetchShowStatsTopN() error {
if err := e.appendTableForStatsTopN(db.Name.O, tbl.Name.O, "", h.GetTableStats(tbl)); err != nil {
return err
}
if pi != nil {
for _, def := range pi.Definitions {
if err := e.appendTableForStatsTopN(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID)); err != nil {
return err
}
}
}
} else {
for _, def := range pi.Definitions {
if err := e.appendTableForStatsTopN(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID)); err != nil {
Expand Down Expand Up @@ -291,6 +315,11 @@ func (e *ShowExec) fetchShowStatsHealthy() {
pi := tbl.GetPartitionInfo()
if pi == nil || e.ctx.GetSessionVars().UseDynamicPartitionPrune() {
e.appendTableForStatsHealthy(db.Name.O, tbl.Name.O, "", h.GetTableStats(tbl))
if pi != nil {
for _, def := range pi.Definitions {
e.appendTableForStatsHealthy(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
}
}
} else {
for _, def := range pi.Definitions {
e.appendTableForStatsHealthy(db.Name.O, tbl.Name.O, def.Name.O, h.GetPartitionStats(tbl, def.ID))
Expand Down
1 change: 0 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,6 @@ func (h *AnalyzeTableID) GetStatisticsID() int64 {
}

// IsPartitionTable indicates whether the table is partition table.
// for new partition implementation is TRUE but FALSE for old partition implementation
func (h *AnalyzeTableID) IsPartitionTable() bool {
return h.PartitionID != -1
}
Expand Down
116 changes: 116 additions & 0 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,122 @@ func (h *Handle) UpdateSessionVar() error {
return err
}

// GlobalStats is used to store the statistics contained in the global-level stats
// which is generated by the merge of partition-level stats.
// It will both store the column stats and index stats.
// In the column statistics, the variable `num` is equal to the number of columns in the partition table.
// In the index statistics, the variable `num` is always equal to one.
type GlobalStats struct {
Num int
Count int64
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
}

// MergePartitionStats2GlobalStats merge the partition-level stats to global-level stats based on the tableID.
func (h *Handle) MergePartitionStats2GlobalStats(is infoschema.InfoSchema, physicalID int64, isIndex int, idxID int64) (globalStats *GlobalStats, err error) {
// get the partition table IDs
h.mu.Lock()
globalTable, ok := h.getTableByPhysicalID(is, physicalID)
h.mu.Unlock()
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", physicalID)
return
}
globalTableInfo := globalTable.Meta()
partitionNum := globalTableInfo.Partition.Num
partitionIDs := make([]int64, 0, partitionNum)
for i := uint64(0); i < partitionNum; i++ {
partitionIDs = append(partitionIDs, globalTableInfo.Partition.Definitions[i].ID)
}

// initialized the globalStats
globalStats = new(GlobalStats)
if isIndex == 0 {
globalStats.Num = len(globalTableInfo.Columns)
} else {
globalStats.Num = 1
}
globalStats.Count = 0
globalStats.Hg = make([]*statistics.Histogram, globalStats.Num)
globalStats.Cms = make([]*statistics.CMSketch, globalStats.Num)
globalStats.TopN = make([]*statistics.TopN, globalStats.Num)

// The first dimension of slice is means the number of column or index stats in the globalStats.
// The second dimension of slice is means the number of partition tables.
// Because all topN and histograms need to be collected before they can be merged.
// So we should store all of the partition-level stats first, and merge them together.
allHg := make([][]*statistics.Histogram, globalStats.Num)
allCms := make([][]*statistics.CMSketch, globalStats.Num)
allTopN := make([][]*statistics.TopN, globalStats.Num)
for i := 0; i < globalStats.Num; i++ {
allHg[i] = make([]*statistics.Histogram, 0, partitionNum)
allCms[i] = make([]*statistics.CMSketch, 0, partitionNum)
allTopN[i] = make([]*statistics.TopN, 0, partitionNum)
}

for _, partitionID := range partitionIDs {
h.mu.Lock()
partitionTable, ok := h.getTableByPhysicalID(is, partitionID)
h.mu.Unlock()
if !ok {
err = errors.Errorf("unknown physical ID %d in stats meta table, maybe it has been dropped", partitionID)
return
}
tableInfo := partitionTable.Meta()
var partitionStats *statistics.Table
partitionStats, err = h.TableStatsFromStorage(tableInfo, partitionID, false, 0)
if err != nil {
return
}
if partitionStats == nil {
err = errors.Errorf("[stats] error occurred when read partition-level stats of the table with tableID %d and partitionID %d", physicalID, partitionID)
return
}
globalStats.Count += partitionStats.Count
for i := 0; i < globalStats.Num; i++ {
ID := tableInfo.Columns[i].ID
if isIndex != 0 {
// If the statistics is the index stats, we should use the index ID to replace the column ID.
ID = idxID
}
hg, cms, topN := partitionStats.GetStatsInfo(ID, isIndex == 1)
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
allTopN[i] = append(allTopN[i], topN)
}
}

// After collect all of the statistics from the partition-level stats,
// we should merge them together.
for i := 0; i < globalStats.Num; i++ {
// Merge CMSketch
globalStats.Cms[i] = allCms[i][0].Copy()
for j := uint64(1); j < partitionNum; j++ {
err = globalStats.Cms[i].MergeCMSketch(allCms[i][j])
if err != nil {
return
}
}

// Merge topN. We need to merge TopN before merging the histogram.
// Because after merging TopN, some numbers will be left.
// These left numbers should be inserted into the histogram.
err = errors.Errorf("TODO: The merge function of the topN structure has not been implemented yet")
if err != nil {
return
}

// Merge histogram
err = errors.Errorf("TODO: The merge function of the histogram structure has not been implemented yet")
if err != nil {
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NDV should be merged here. You can add it when you merge NDV later.

}
return
}

func (h *Handle) getTableByPhysicalID(is infoschema.InfoSchema, physicalID int64) (table.Table, bool) {
if is.SchemaMetaVersion() != h.mu.schemaVersion {
h.mu.schemaVersion = is.SchemaMetaVersion()
Expand Down
56 changes: 56 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,62 @@ func (s *testStatsSuite) TestCorrelation(c *C) {
c.Assert(result.Rows()[0][9], Equals, "0")
}

func (s *testStatsSuite) TestBuildGlobalLevelStats(c *C) {
defer cleanEnv(c, s.store, s.do)
testKit := testkit.NewTestKit(c, s.store)
testKit.MustExec("use test")
testKit.MustExec("drop table if exists t, t1;")
testKit.MustExec("set @@tidb_partition_prune_mode = 'static-only';")
testKit.MustExec("create table t(a int, b int, c int) PARTITION BY HASH(a) PARTITIONS 3;")
testKit.MustExec("create table t1(a int);")
testKit.MustExec("insert into t values(1,1,1),(3,12,3),(4,20,4),(2,7,2),(5,21,5);")
testKit.MustExec("insert into t1 values(1),(3),(4),(2),(5);")
testKit.MustExec("create index idx_t_ab on t(a, b);")
testKit.MustExec("create index idx_t_b on t(b);")
testKit.MustExec("analyze table t, t1;")
result := testKit.MustQuery("show stats_meta where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(result.Rows()[2][5], Equals, "2")
result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 15)

result = testKit.MustQuery("show stats_meta where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)
c.Assert(result.Rows()[0][5], Equals, "5")
result = testKit.MustQuery("show stats_histograms where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)

// Test the 'dynamic-only' mode
testKit.MustExec("set @@tidb_partition_prune_mode = 'dynamic-only';")
err := testKit.ExecToErr("analyze table t, t1;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(result.Rows()[2][5], Equals, "2")
result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 15)

result = testKit.MustQuery("show stats_meta where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)
c.Assert(result.Rows()[0][5], Equals, "5")
result = testKit.MustQuery("show stats_histograms where table_name = 't1';").Sort()
c.Assert(len(result.Rows()), Equals, 1)

err = testKit.ExecToErr("analyze table t index idx_t_ab, idx_t_b;")
c.Assert(err.Error(), Equals, "TODO: The merge function of the topN structure has not been implemented yet")
result = testKit.MustQuery("show stats_meta where table_name = 't'").Sort()
c.Assert(len(result.Rows()), Equals, 3)
c.Assert(result.Rows()[0][5], Equals, "1")
c.Assert(result.Rows()[1][5], Equals, "2")
c.Assert(result.Rows()[2][5], Equals, "2")
result = testKit.MustQuery("show stats_histograms where table_name = 't';").Sort()
c.Assert(len(result.Rows()), Equals, 15)
}

func (s *testStatsSuite) TestExtendedStatsDefaultSwitch(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
Expand Down
10 changes: 10 additions & 0 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,16 @@ func (t *Table) ColumnByName(colName string) *Column {
return nil
}

// GetStatsInfo returns their statistics according to the ID of the column or index, including histogram, CMSketch and TopN.
func (t *Table) GetStatsInfo(ID int64, isIndex bool) (*Histogram, *CMSketch, *TopN) {
if isIndex {
idxStatsInfo := t.Indices[ID]
return idxStatsInfo.Histogram.Copy(), idxStatsInfo.CMSketch.Copy(), idxStatsInfo.TopN.Copy()
}
colStatsInfo := t.Columns[ID]
return colStatsInfo.Histogram.Copy(), colStatsInfo.CMSketch.Copy(), colStatsInfo.TopN.Copy()
}

type tableColumnID struct {
TableID int64
ColumnID int64
Expand Down