Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: support to store FMSketch and add FMSketch to column stats #22841

Merged
merged 24 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f047988
add FMSketch to mysql.stats_histograms
Reminiscent Feb 20, 2021
0b29330
statistics: support store FMSketch and add FMSketch to histogram
Reminiscent Feb 20, 2021
98058de
FIX UT
Reminiscent Feb 20, 2021
36b7f83
Create Stats_FM_Sketch Table
Reminiscent Feb 20, 2021
1159002
Merge branch 'master' of https://github.com/pingcap/tidb into mergeNDV
Reminiscent Feb 20, 2021
4adb68e
fix UT
Reminiscent Feb 20, 2021
cc046bc
add some tests
Reminiscent Feb 20, 2021
f3ee62e
FIX ut
Reminiscent Feb 20, 2021
7d7af50
move fmSketch from histogram to column.stats
Reminiscent Feb 22, 2021
801f213
Merge branch 'master' of https://github.com/pingcap/tidb into mergeNDV
Reminiscent Feb 22, 2021
80b69f7
remove unused code and comments
Reminiscent Feb 22, 2021
65a5ba9
fix ut
Reminiscent Feb 22, 2021
d6e648c
fix ut
Reminiscent Feb 22, 2021
771930d
fix ut
Reminiscent Feb 22, 2021
a99bdb7
fix ut
Reminiscent Feb 22, 2021
22da63c
fix ut
Reminiscent Feb 22, 2021
f13369b
fix ut
Reminiscent Feb 22, 2021
fa65821
finish the FMSketch.MemoryUsage function
Reminiscent Feb 22, 2021
c7eb5ba
Merge branch 'master' of https://github.com/pingcap/tidb into mergeNDV
Reminiscent Feb 22, 2021
3deabd9
add some comments for FMSketch.MemoryUsage function
Reminiscent Feb 22, 2021
2e5a873
fix typo
Reminiscent Feb 22, 2021
94c6045
Merge branch 'master' into mergeNDV
ti-srebot Feb 22, 2021
7a30a15
Merge branch 'master' into mergeNDV
ti-chi-bot Feb 22, 2021
f922857
Merge branch 'master' into mergeNDV
Reminiscent Feb 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 49 additions & 32 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
globalStatsMap[globalStatsID] = globalStatsInfo{result.IsIndex, hg.ID, result.StatsVer}
}
}
err1 := statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.StatsVer, 1)
err1 := statsHandle.SaveStatsToStorage(statisticsID, result.Count, result.IsIndex, hg, result.Cms[i], result.TopNs[i], result.Fms[i], result.StatsVer, 1)
if err1 != nil {
err = err1
logutil.Logger(ctx).Error("save stats to storage failed", zap.Error(err))
Expand Down Expand Up @@ -174,8 +174,8 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i]
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, info.statsVersion, 1)
hg, cms, topN, fms := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i], globalStats.Fms[i]
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
Expand Down Expand Up @@ -292,6 +292,7 @@ func analyzeIndexPushdown(idxExec *AnalyzeIndexExec) analyzeResult {
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{cms},
TopNs: []*statistics.TopN{topN},
Fms: []*statistics.FMSketch{nil},
IsIndex: 1,
job: idxExec.job,
StatsVer: statsVer,
Expand Down Expand Up @@ -465,7 +466,7 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
} else {
ranges = ranger.FullIntRange(false)
}
hists, cms, topNs, extStats, err := colExec.buildStats(ranges, true)
hists, cms, topNs, fms, extStats, err := colExec.buildStats(ranges, true)
if err != nil {
return []analyzeResult{{Err: err, job: colExec.job}}
}
Expand All @@ -476,6 +477,7 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
Hist: hists[:1],
Cms: cms[:1],
TopNs: topNs[:1],
Fms: fms[:1],
ExtStats: nil,
job: nil,
StatsVer: statistics.Version1,
Expand All @@ -486,6 +488,7 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
Hist: hists[1:],
Cms: cms[1:],
TopNs: topNs[1:],
Fms: fms[1:],
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.analyzeVer,
Expand All @@ -498,6 +501,7 @@ func analyzeColumnsPushdown(colExec *AnalyzeColumnsExec) []analyzeResult {
Hist: hists,
Cms: cms,
TopNs: topNs,
Fms: fms,
ExtStats: extStats,
job: colExec.job,
StatsVer: colExec.analyzeVer,
Expand Down Expand Up @@ -568,9 +572,9 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
return result, nil
}

func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, extStats *statistics.ExtendedStatsColl, err error) {
func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats bool) (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, extStats *statistics.ExtendedStatsColl, err error) {
if err = e.open(ranges); err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
defer func() {
if err1 := e.resultHandler.Close(); err1 != nil {
Expand All @@ -593,15 +597,15 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
for {
data, err1 := e.resultHandler.nextRaw(context.TODO())
if err1 != nil {
return nil, nil, nil, nil, err1
return nil, nil, nil, nil, nil, err1
}
if data == nil {
break
}
resp := &tipb.AnalyzeColumnsResp{}
err = resp.Unmarshal(data)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
sc := e.ctx.GetSessionVars().StmtCtx
rowCount := int64(0)
Expand All @@ -610,7 +614,7 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
rowCount = int64(respHist.TotalRowCount())
pkHist, err = statistics.MergeHistograms(sc, pkHist, respHist, int(e.opts[ast.AnalyzeOptNumBuckets]), statistics.Version1)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
}
for i, rc := range resp.Collectors {
Expand All @@ -626,26 +630,27 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
pkHist.ID = pkInfo.ID
err = pkHist.DecodeTo(pkInfo.RetType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
hists = append(hists, pkHist)
cms = append(cms, nil)
topNs = append(topNs, nil)
fms = append(fms, nil)
}
for i, col := range e.colsInfo {
if e.analyzeVer < 2 {
// In analyze version 2, we don't collect TopN this way. We will collect TopN from samples in `BuildColumnHistAndTopN()` below.
err := collectors[i].ExtractTopN(uint32(e.opts[ast.AnalyzeOptNumTopN]), e.ctx.GetSessionVars().StmtCtx, &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
topNs = append(topNs, collectors[i].TopN)
}
for j, s := range collectors[i].Samples {
collectors[i].Samples[j].Ordinal = j
collectors[i].Samples[j].Value, err = tablecodec.DecodeColumnValue(s.Value.GetBytes(), &col.FieldType, timeZone)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
// When collation is enabled, we store the Key representation of the sampling data. So we set it to kind `Bytes` here
// to avoid to convert it to its Key representation once more.
Expand All @@ -663,20 +668,21 @@ func (e *AnalyzeColumnsExec) buildStats(ranges []*ranger.Range, needExtStats boo
topNs = append(topNs, topn)
}
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
hists = append(hists, hg)
collectors[i].CMSketch.CalcDefaultValForAnalyze(uint64(hg.NDV))
cms = append(cms, collectors[i].CMSketch)
fms = append(fms, collectors[i].FMSketch)
}
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.tableID.GetStatisticsID(), e.colsInfo, collectors)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
}
return hists, cms, topNs, extStats, nil
return hists, cms, topNs, fms, extStats, nil
}

func hasPkHist(handleCols core.HandleCols) bool {
Expand All @@ -697,7 +703,7 @@ var (
)

func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
hists, cms, topNs, err := exec.buildStats()
hists, cms, topNs, fms, err := exec.buildStats()
if err != nil {
return []analyzeResult{{Err: err, job: exec.job}}
}
Expand All @@ -710,6 +716,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: []*statistics.Histogram{hists[i]},
Cms: []*statistics.CMSketch{cms[i]},
TopNs: []*statistics.TopN{topNs[i]},
Fms: []*statistics.FMSketch{nil},
IsIndex: 1,
Count: hists[i].NullCount,
job: exec.job,
Expand All @@ -730,6 +737,7 @@ func analyzeFastExec(exec *AnalyzeFastExec) []analyzeResult {
Hist: hists[:pkColCount+len(exec.colsInfo)],
Cms: cms[:pkColCount+len(exec.colsInfo)],
TopNs: topNs[:pkColCount+len(exec.colsInfo)],
Fms: fms[:pkColCount+len(exec.colsInfo)],
Count: hist.NullCount,
job: exec.job,
StatsVer: statistics.Version1,
Expand Down Expand Up @@ -1107,25 +1115,31 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
}
}

func (e *AnalyzeFastExec) buildColumnStats(ID int64, collector *statistics.SampleCollector, tp *types.FieldType, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, *statistics.TopN, error) {
func (e *AnalyzeFastExec) buildColumnStats(ID int64, collector *statistics.SampleCollector, tp *types.FieldType, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, *statistics.TopN, *statistics.FMSketch, error) {
sc := e.ctx.GetSessionVars().StmtCtx
data := make([][]byte, 0, len(collector.Samples))
fmSketch := statistics.NewFMSketch(maxSketchSize)
for i, sample := range collector.Samples {
sample.Ordinal = i
if sample.Value.IsNull() {
collector.NullCount++
continue
}
bytes, err := tablecodec.EncodeValue(e.ctx.GetSessionVars().StmtCtx, nil, sample.Value)
err := fmSketch.InsertValue(sc, sample.Value)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
bytes, err := tablecodec.EncodeValue(sc, nil, sample.Value)
if err != nil {
return nil, nil, nil, nil, err
}
data = append(data, bytes)
}
// Build CMSketch.
cmSketch, topN, ndv, scaleRatio := statistics.NewCMSketchAndTopN(int32(e.opts[ast.AnalyzeOptCMSketchDepth]), int32(e.opts[ast.AnalyzeOptCMSketchWidth]), data, uint32(e.opts[ast.AnalyzeOptNumTopN]), uint64(rowCount))
// Build Histogram.
hist, err := statistics.BuildColumnHist(e.ctx, int64(e.opts[ast.AnalyzeOptNumBuckets]), ID, collector, tp, rowCount, int64(ndv), collector.NullCount*int64(scaleRatio))
return hist, cmSketch, topN, err
return hist, cmSketch, topN, fmSketch, err
}

func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *statistics.SampleCollector, rowCount int64) (*statistics.Histogram, *statistics.CMSketch, *statistics.TopN, error) {
Expand Down Expand Up @@ -1164,7 +1178,7 @@ func (e *AnalyzeFastExec) buildIndexStats(idxInfo *model.IndexInfo, collector *s
return hist, cmSketch, topN, err
}

func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, []*statistics.TopN, error) {
func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMSketch, []*statistics.TopN, []*statistics.FMSketch, error) {
errs := make([]error, e.concurrency)
pkColCount := pkColsCount(e.handleCols)
// collect column samples and primary key samples and index samples.
Expand All @@ -1185,14 +1199,14 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
e.wg.Wait()
for _, err := range errs {
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}

scanKeysSize, err := e.handleScanTasks(bo)
fastAnalyzeHistogramScanKeys.Observe(float64(scanKeysSize))
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

stats := domain.GetDomain(e.ctx).StatsHandle()
Expand All @@ -1202,7 +1216,7 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
rowCount = t.Count
}
}
hists, cms, topNs := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length), make([]*statistics.TopN, length)
hists, cms, topNs, fms := make([]*statistics.Histogram, length), make([]*statistics.CMSketch, length), make([]*statistics.TopN, length), make([]*statistics.FMSketch, length)
for i := 0; i < length; i++ {
// Build collector properties.
collector := e.collectors[i]
Expand All @@ -1217,20 +1231,20 @@ func (e *AnalyzeFastExec) runTasks() ([]*statistics.Histogram, []*statistics.CMS
}
if i < pkColCount {
pkCol := e.handleCols.GetCol(i)
hists[i], cms[i], topNs[i], err = e.buildColumnStats(pkCol.ID, e.collectors[i], pkCol.RetType, rowCount)
hists[i], cms[i], topNs[i], fms[i], err = e.buildColumnStats(pkCol.ID, e.collectors[i], pkCol.RetType, rowCount)
} else if i < pkColCount+len(e.colsInfo) {
hists[i], cms[i], topNs[i], err = e.buildColumnStats(e.colsInfo[i-pkColCount].ID, e.collectors[i], &e.colsInfo[i-pkColCount].FieldType, rowCount)
hists[i], cms[i], topNs[i], fms[i], err = e.buildColumnStats(e.colsInfo[i-pkColCount].ID, e.collectors[i], &e.colsInfo[i-pkColCount].FieldType, rowCount)
} else {
hists[i], cms[i], topNs[i], err = e.buildIndexStats(e.idxsInfo[i-pkColCount-len(e.colsInfo)], e.collectors[i], rowCount)
}
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}
}
return hists, cms, topNs, nil
return hists, cms, topNs, fms, nil
}

func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, err error) {
func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*statistics.CMSketch, topNs []*statistics.TopN, fms []*statistics.FMSketch, err error) {
// To set rand seed, it's for unit test.
// To ensure that random sequences are different in non-test environments, RandSeed must be set time.Now().
if RandSeed == 1 {
Expand All @@ -1241,7 +1255,7 @@ func (e *AnalyzeFastExec) buildStats() (hists []*statistics.Histogram, cms []*st

err = e.buildSampTask()
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, nil, err
}

return e.runTasks()
Expand Down Expand Up @@ -1273,7 +1287,7 @@ func (e *AnalyzeTestFastExec) TestFastSample() error {
e.job = &statistics.AnalyzeJob{}
e.tblInfo = e.TblInfo
e.opts = e.Opts
_, _, _, err := e.buildStats()
_, _, _, _, err := e.buildStats()
e.Collectors = e.collectors
return err
}
Expand Down Expand Up @@ -1320,6 +1334,7 @@ func analyzeIndexIncremental(idxExec *analyzeIndexIncrementalExec) analyzeResult
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{cms},
TopNs: []*statistics.TopN{topN},
Fms: []*statistics.FMSketch{nil},
IsIndex: 1,
job: idxExec.job,
StatsVer: statsVer,
Expand All @@ -1346,7 +1361,7 @@ func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult {
}
startPos := *colExec.oldHist.GetUpper(colExec.oldHist.Len() - 1)
ran := ranger.Range{LowVal: []types.Datum{startPos}, LowExclude: true, HighVal: []types.Datum{maxVal}}
hists, _, _, _, err := colExec.buildStats([]*ranger.Range{&ran}, false)
hists, _, _, _, _, err := colExec.buildStats([]*ranger.Range{&ran}, false)
if err != nil {
return analyzeResult{Err: err, job: colExec.job}
}
Expand All @@ -1360,6 +1375,7 @@ func analyzePKIncremental(colExec *analyzePKIncrementalExec) analyzeResult {
Hist: []*statistics.Histogram{hist},
Cms: []*statistics.CMSketch{nil},
TopNs: []*statistics.TopN{nil},
Fms: []*statistics.FMSketch{nil},
job: colExec.job,
StatsVer: statistics.Version1,
}
Expand All @@ -1375,6 +1391,7 @@ type analyzeResult struct {
Hist []*statistics.Histogram
Cms []*statistics.CMSketch
TopNs []*statistics.TopN
Fms []*statistics.FMSketch
ExtStats *statistics.ExtendedStatsColl
Count int64
IsIndex int
Expand Down
2 changes: 1 addition & 1 deletion executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ func (s *testInfoschemaClusterTableSuite) TestTableStorageStats(c *C) {
tk.MustQuery("select TABLE_SCHEMA, sum(TABLE_SIZE) from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'test' group by TABLE_SCHEMA;").Check(testkit.Rows(
"test 2",
))
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 22)
c.Assert(len(tk.MustQuery("select TABLE_NAME from information_schema.TABLE_STORAGE_STATS where TABLE_SCHEMA = 'mysql';").Rows()), Equals, 23)
}

func (s *testInfoschemaTableSuite) TestSequences(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (ds *DataSource) getColumnNDV(colID int64) (ndv float64) {
hist, ok := ds.statisticTable.Columns[colID]
if ok && hist.Count > 0 {
factor := float64(ds.statisticTable.Count) / float64(hist.Count)
ndv = float64(hist.NDV) * factor
ndv = float64(hist.Histogram.NDV) * factor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me why add Histogram? For me, both of these are fine. I just want to know if there are any more considerations for this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because there is an NDV function in FMSketch. This will cause ambiguity.

} else {
ndv = float64(ds.statisticTable.Count) * distinctFactor
}
Expand Down
Loading