Skip to content

Commit

Permalink
statistics, executor: hotfix for out-of-range estimation and count/mo…
Browse files Browse the repository at this point in the history
…difyCount update of global stats (#39032)
  • Loading branch information
time-and-fate authored Nov 9, 2022
1 parent 8473f27 commit 0bc0037
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 47 deletions.
13 changes: 12 additions & 1 deletion executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,18 @@ func (e *AnalyzeExec) Next(ctx context.Context, req *chunk.Chunk) error {
for i := 0; i < globalStats.Num; i++ {
hg, cms, topN, fms := globalStats.Hg[i], globalStats.Cms[i], globalStats.TopN[i], globalStats.Fms[i]
// fms for global stats doesn't need to dump to kv.
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID, globalStats.Count, info.isIndex, hg, cms, topN, fms, info.statsVersion, 1, false, true)
err = statsHandle.SaveStatsToStorage(globalStatsID.tableID,
globalStats.Count,
globalStats.ModifyCount,
info.isIndex,
hg,
cms,
topN,
fms,
info.statsVersion,
1,
false,
true)
if err != nil {
logutil.Logger(ctx).Error("save global-level stats to storage failed", zap.Error(err))
}
Expand Down
4 changes: 2 additions & 2 deletions statistics/handle/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
for i := 0; i < newColGlobalStats.Num; i++ {
hg, cms, topN, fms := newColGlobalStats.Hg[i], newColGlobalStats.Cms[i], newColGlobalStats.TopN[i], newColGlobalStats.Fms[i]
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, 0, hg, cms, topN, fms, 2, 1, false, false)
err = h.SaveStatsToStorage(tableID, newColGlobalStats.Count, newColGlobalStats.ModifyCount, 0, hg, cms, topN, fms, 2, 1, false, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (h *Handle) updateGlobalStats(tblInfo *model.TableInfo) error {
for i := 0; i < newIndexGlobalStats.Num; i++ {
hg, cms, topN, fms := newIndexGlobalStats.Hg[i], newIndexGlobalStats.Cms[i], newIndexGlobalStats.TopN[i], newIndexGlobalStats.Fms[i]
// fms for global stats doesn't need to dump to kv.
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, 1, hg, cms, topN, fms, 2, 1, false, false)
err = h.SaveStatsToStorage(tableID, newIndexGlobalStats.Count, newColGlobalStats.ModifyCount, 1, hg, cms, topN, fms, 2, 1, false, false)
if err != nil {
return err
}
Expand Down
8 changes: 6 additions & 2 deletions statistics/handle/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,18 @@ func (h *Handle) loadStatsFromJSON(tableInfo *model.TableInfo, physicalID int64,

for _, col := range tbl.Columns {
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1, false, false)
// The table level Count and Modify_count would be overridden by the SaveMetaToStorage below, so we don't need
// to care about them here.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 0, &col.Histogram, col.CMSketch, col.TopN, col.FMSketch, int(col.StatsVer), 1, false, false)
if err != nil {
return errors.Trace(err)
}
}
for _, idx := range tbl.Indices {
// loadStatsFromJSON doesn't support partition table now.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1, false, false)
// The table level Count and Modify_count would be overridden by the SaveMetaToStorage below, so we don't need
// to care about them here.
err = h.SaveStatsToStorage(tbl.PhysicalID, tbl.Count, 0, 1, &idx.Histogram, idx.CMSketch, idx.TopN, nil, int(idx.StatsVer), 1, false, false)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func TestDumpCMSketchWithTopN(t *testing.T) {
cms, _, _, _ := statistics.NewCMSketchAndTopN(5, 2048, fakeData, 20, 100)

stat := h.GetTableStats(tableInfo)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1, false, false)
err = h.SaveStatsToStorage(tableInfo.ID, 1, 0, 0, &stat.Columns[tableInfo.Columns[0].ID].Histogram, cms, nil, nil, statistics.Version2, 1, false, false)
require.NoError(t, err)
require.Nil(t, h.Update(is))

Expand Down
24 changes: 14 additions & 10 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,13 @@ func (h *Handle) UpdateSessionVar() error {
// In the column statistics, the variable `num` is equal to the number of columns in the partition table.
// In the index statistics, the variable `num` is always equal to one.
type GlobalStats struct {
Num int
Count int64
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
Fms []*statistics.FMSketch
Num int
Count int64
ModifyCount int64
Hg []*statistics.Histogram
Cms []*statistics.CMSketch
TopN []*statistics.TopN
Fms []*statistics.FMSketch
}

// MergePartitionStats2GlobalStatsByTableID merge the partition-level stats to global-level stats based on the tableID.
Expand Down Expand Up @@ -472,7 +473,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, opts map
return
}
for i := 0; i < globalStats.Num; i++ {
count, hg, cms, topN, fms := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1)
_, hg, cms, topN, fms := partitionStats.GetStatsInfo(histIDs[i], isIndex == 1)
// partition stats is not empty but column stats(hist, topn) is missing
if partitionStats.Count > 0 && (hg == nil || hg.TotalRowCount() <= 0) && (topN == nil || topN.TotalCount() <= 0) {
var errMsg string
Expand All @@ -486,7 +487,8 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context, opts map
}
if i == 0 {
// In a partition, we will only update globalStats.Count once
globalStats.Count += count
globalStats.Count += partitionStats.Count
globalStats.ModifyCount += partitionStats.ModifyCount
}
allHg[i] = append(allHg[i], hg)
allCms[i] = append(allCms[i], cms)
Expand Down Expand Up @@ -1197,8 +1199,10 @@ func (h *Handle) SaveTableStatsToStorage(results *statistics.AnalyzeResults, nee
}

// SaveStatsToStorage saves the stats to storage.
// If count is negative, both count and modify count would not be used and not be written to the table. Unless, corresponding
// fields in the stats_meta table will be updated.
// TODO: refactor to reduce the number of parameters
func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool, updateAnalyzeTime bool) (err error) {
func (h *Handle) SaveStatsToStorage(tableID int64, count, modifyCount int64, isIndex int, hg *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int, isAnalyzed int64, needDumpFMS bool, updateAnalyzeTime bool) (err error) {
statsVer := uint64(0)
defer func() {
if err == nil && statsVer != 0 {
Expand All @@ -1224,7 +1228,7 @@ func (h *Handle) SaveStatsToStorage(tableID int64, count int64, isIndex int, hg
version := txn.StartTS()
// If the count is less than 0, then we do not want to update the modify count and count.
if count >= 0 {
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count) values (%?, %?, %?)", version, tableID, count)
_, err = exec.ExecuteInternal(ctx, "replace into mysql.stats_meta (version, table_id, count, modify_count) values (%?, %?, %?, %?)", version, tableID, count, modifyCount)
} else {
_, err = exec.ExecuteInternal(ctx, "update mysql.stats_meta set version = %? where table_id = %?", version, tableID)
}
Expand Down
10 changes: 6 additions & 4 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,7 @@ partition by range (a) (
partition p0 values less than (10),
partition p1 values less than (20)
)`)
require.NoError(t, dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()))
tk.MustExec("insert into t values (1), (5), (null), (11), (15)")
require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))

Expand Down Expand Up @@ -1670,14 +1671,15 @@ partition by range (a) (
require.NoError(t, err)
tableInfo := tbl.Meta()
globalStats := h.GetTableStats(tableInfo)
// global.count = p0.count(3) + p1.count(2) + p2.count(2)
// We did not analyze partition p1, so the value here has not changed
require.Equal(t, int64(7), globalStats.Count)
// global.count = p0.count(3) + p1.count(4) + p2.count(2)
// modify count is 2 because we didn't analyze p1 after the second insert
require.Equal(t, int64(9), globalStats.Count)
require.Equal(t, int64(2), globalStats.ModifyCount)

tk.MustExec("analyze table t partition p1;")
globalStats = h.GetTableStats(tableInfo)
// global.count = p0.count(3) + p1.count(4) + p2.count(4)
// The value of p1.Count is correct now.
// The value of modify count is 0 now.
require.Equal(t, int64(9), globalStats.Count)
require.Equal(t, int64(0), globalStats.ModifyCount)

Expand Down
6 changes: 3 additions & 3 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ func (h *Handle) deleteOutdatedFeedback(tableID, histID, isIndex int64) error {
func (h *Handle) dumpStatsUpdateToKV(tableID, isIndex int64, q *statistics.QueryFeedback, hist *statistics.Histogram, cms *statistics.CMSketch, topN *statistics.TopN, fms *statistics.FMSketch, statsVersion int64) error {
hist = statistics.UpdateHistogram(hist, q, int(statsVersion))
// feedback for partition is not ready.
err := h.SaveStatsToStorage(tableID, -1, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0, false, false)
err := h.SaveStatsToStorage(tableID, -1, 0, int(isIndex), hist, cms, topN, fms, int(statsVersion), 0, false, false)
metrics.UpdateStatsCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
return errors.Trace(err)
}
Expand Down Expand Up @@ -1402,10 +1402,10 @@ func (h *Handle) RecalculateExpectCount(q *statistics.QueryFeedback) error {
expected := 0.0
if isIndex {
idx := t.Indices[id]
expected, err = idx.GetRowCount(sctx, nil, ranges, t.Count)
expected, err = idx.GetRowCount(sctx, nil, ranges, t.Count, t.ModifyCount)
} else {
c := t.Columns[id]
expected, err = c.GetColumnRowCount(sctx, ranges, t.Count, true)
expected, err = c.GetColumnRowCount(sctx, ranges, t.Count, t.ModifyCount, true)
}
q.Expected = int64(expected)
return err
Expand Down
24 changes: 8 additions & 16 deletions statistics/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ func (hg *Histogram) outOfRange(val types.Datum) bool {

// outOfRangeRowCount estimate the row count of part of [lDatum, rDatum] which is out of range of the histogram.
// Here we assume the density of data is decreasing from the lower/upper bound of the histogram toward outside.
// The maximum row count it can get is the increaseCount. It reaches the maximum when out-of-range width reaches histogram range width.
// The maximum row count it can get is the modifyCount. It reaches the maximum when out-of-range width reaches histogram range width.
// As it shows below. To calculate the out-of-range row count, we need to calculate the percentage of the shaded area.
// Note that we assume histL-boundL == histR-histL == boundR-histR here.
//
Expand All @@ -892,7 +892,7 @@ func (hg *Histogram) outOfRange(val types.Datum) bool {
// boundL │ │histL histR boundR
// │ │
// lDatum rDatum
func (hg *Histogram) outOfRangeRowCount(lDatum, rDatum *types.Datum, increaseCount int64) float64 {
func (hg *Histogram) outOfRangeRowCount(lDatum, rDatum *types.Datum, modifyCount int64) float64 {
if hg.Len() == 0 {
return 0
}
Expand Down Expand Up @@ -976,8 +976,8 @@ func (hg *Histogram) outOfRangeRowCount(lDatum, rDatum *types.Datum, increaseCou
totalPercent = 1
}
rowCount := totalPercent * hg.notNullCount()
if rowCount > float64(increaseCount) {
return float64(increaseCount)
if rowCount > float64(modifyCount) {
return float64(modifyCount)
}
return rowCount
}
Expand Down Expand Up @@ -1202,7 +1202,7 @@ func (c *Column) equalRowCount(sctx sessionctx.Context, val types.Datum, encoded
}

// GetColumnRowCount estimates the row count by a slice of Range.
func (c *Column) GetColumnRowCount(sctx sessionctx.Context, ranges []*ranger.Range, realtimeRowCount int64, pkIsHandle bool) (float64, error) {
func (c *Column) GetColumnRowCount(sctx sessionctx.Context, ranges []*ranger.Range, realtimeRowCount, modifyCount int64, pkIsHandle bool) (float64, error) {
sc := sctx.GetSessionVars().StmtCtx
var rowCount float64
for _, rg := range ranges {
Expand Down Expand Up @@ -1299,11 +1299,7 @@ func (c *Column) GetColumnRowCount(sctx sessionctx.Context, ranges []*ranger.Ran

// handling the out-of-range part
if (c.outOfRange(lowVal) && !lowVal.IsNull()) || c.outOfRange(highVal) {
increaseCount := realtimeRowCount - int64(c.TotalRowCount())
if increaseCount < 0 {
increaseCount = 0
}
cnt += c.Histogram.outOfRangeRowCount(&lowVal, &highVal, increaseCount)
cnt += c.Histogram.outOfRangeRowCount(&lowVal, &highVal, modifyCount)
}

rowCount += cnt
Expand Down Expand Up @@ -1426,7 +1422,7 @@ func (idx *Index) QueryBytes(d []byte) uint64 {

// GetRowCount returns the row count of the given ranges.
// It uses the modifyCount to adjust the influence of modifications on the table.
func (idx *Index) GetRowCount(sctx sessionctx.Context, coll *HistColl, indexRanges []*ranger.Range, realtimeRowCount int64) (float64, error) {
func (idx *Index) GetRowCount(sctx sessionctx.Context, coll *HistColl, indexRanges []*ranger.Range, realtimeRowCount, modifyCount int64) (float64, error) {
sc := sctx.GetSessionVars().StmtCtx
totalCount := float64(0)
isSingleCol := len(idx.Info.Columns) == 1
Expand Down Expand Up @@ -1518,11 +1514,7 @@ func (idx *Index) GetRowCount(sctx sessionctx.Context, coll *HistColl, indexRang

// handling the out-of-range part
if (idx.outOfRange(l) && !(isSingleCol && lowIsNull)) || idx.outOfRange(r) {
increaseCount := realtimeRowCount - int64(idx.TotalRowCount())
if increaseCount < 0 {
increaseCount = 0
}
totalCount += idx.Histogram.outOfRangeRowCount(&l, &r, increaseCount)
totalCount += idx.Histogram.outOfRangeRowCount(&l, &r, modifyCount)
}
}
totalCount = mathutil.Clamp(totalCount, 0, float64(realtimeRowCount))
Expand Down
8 changes: 5 additions & 3 deletions statistics/selectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestOutOfRangeEstimation(t *testing.T) {
statsTbl := h.GetTableStats(table.Meta())
sctx := mock.NewContext()
col := statsTbl.Columns[table.Meta().Columns[0].ID]
count, err := col.GetColumnRowCount(sctx, getRange(900, 900), statsTbl.Count, false)
count, err := col.GetColumnRowCount(sctx, getRange(900, 900), statsTbl.Count, statsTbl.ModifyCount, false)
require.NoError(t, err)
// Because the ANALYZE collect data by random sampling, so the result is not an accurate value.
// so we use a range here.
Expand All @@ -147,8 +147,9 @@ func TestOutOfRangeEstimation(t *testing.T) {
statsSuiteData := statistics.GetStatsSuiteData()
statsSuiteData.GetTestCases(t, &input, &output)
increasedTblRowCount := int64(float64(statsTbl.Count) * 1.5)
modifyCount := int64(float64(statsTbl.Count) * 0.5)
for i, ran := range input {
count, err = col.GetColumnRowCount(sctx, getRange(ran.Start, ran.End), increasedTblRowCount, false)
count, err = col.GetColumnRowCount(sctx, getRange(ran.Start, ran.End), increasedTblRowCount, modifyCount, false)
require.NoError(t, err)
testdata.OnRecord(func() {
output[i].Start = ran.Start
Expand Down Expand Up @@ -551,6 +552,7 @@ func TestSelectivity(t *testing.T) {
require.Truef(t, math.Abs(ratio-tt.selectivity) < eps, "for %s, needed: %v, got: %v", tt.exprs, tt.selectivity, ratio)

histColl.Count *= 10
histColl.ModifyCount = histColl.Count * 9
ratio, _, err = histColl.Selectivity(sctx, sel.Conditions, nil)
require.NoErrorf(t, err, "for %s", tt.exprs)
require.Truef(t, math.Abs(ratio-tt.selectivityAfterIncrease) < eps, "for %s, needed: %v, got: %v", tt.exprs, tt.selectivityAfterIncrease, ratio)
Expand Down Expand Up @@ -762,7 +764,7 @@ func TestSmallRangeEstimation(t *testing.T) {
statsSuiteData := statistics.GetStatsSuiteData()
statsSuiteData.GetTestCases(t, &input, &output)
for i, ran := range input {
count, err := col.GetColumnRowCount(sctx, getRange(ran.Start, ran.End), statsTbl.Count, false)
count, err := col.GetColumnRowCount(sctx, getRange(ran.Start, ran.End), statsTbl.Count, statsTbl.ModifyCount, false)
require.NoError(t, err)
testdata.OnRecord(func() {
output[i].Start = ran.Start
Expand Down
10 changes: 5 additions & 5 deletions statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (coll *HistColl) GetRowCountByIntColumnRanges(sctx sessionctx.Context, colI
}
return result, nil
}
result, err := c.GetColumnRowCount(sctx, intRanges, coll.Count, true)
result, err := c.GetColumnRowCount(sctx, intRanges, coll.Count, coll.ModifyCount, true)
if sc.EnableOptimizerCETrace {
CETraceRange(sctx, coll.PhysicalID, []string{c.Info.Name.O}, intRanges, "Column Stats", uint64(result))
}
Expand All @@ -484,7 +484,7 @@ func (coll *HistColl) GetRowCountByColumnRanges(sctx sessionctx.Context, colID i
}
return result, err
}
result, err := c.GetColumnRowCount(sctx, colRanges, coll.Count, false)
result, err := c.GetColumnRowCount(sctx, colRanges, coll.Count, coll.ModifyCount, false)
if sc.EnableOptimizerCETrace {
CETraceRange(sctx, coll.PhysicalID, []string{c.Info.Name.O}, colRanges, "Column Stats", uint64(result))
}
Expand Down Expand Up @@ -517,7 +517,7 @@ func (coll *HistColl) GetRowCountByIndexRanges(sctx sessionctx.Context, idxID in
if idx.CMSketch != nil && idx.StatsVer == Version1 {
result, err = coll.getIndexRowCount(sctx, idxID, indexRanges)
} else {
result, err = idx.GetRowCount(sctx, coll, indexRanges, coll.Count)
result, err = idx.GetRowCount(sctx, coll, indexRanges, coll.Count, coll.ModifyCount)
}
if sc.EnableOptimizerCETrace {
CETraceRange(sctx, coll.PhysicalID, colNames, indexRanges, "Index Stats", uint64(result))
Expand Down Expand Up @@ -708,7 +708,7 @@ func (coll *HistColl) crossValidationSelectivity(sctx sessionctx.Context, idx *I
Collators: []collate.Collator{idxPointRange.Collators[i]},
}

rowCount, err := col.GetColumnRowCount(sctx, []*ranger.Range{&rang}, coll.Count, col.IsHandle)
rowCount, err := col.GetColumnRowCount(sctx, []*ranger.Range{&rang}, coll.Count, coll.ModifyCount, col.IsHandle)
if err != nil {
return 0, 0, err
}
Expand Down Expand Up @@ -780,7 +780,7 @@ func (coll *HistColl) getIndexRowCount(sctx sessionctx.Context, idxID int64, ind
// on single-column index, use previous way as well, because CMSketch does not contain null
// values in this case.
if rangePosition == 0 || isSingleColIdxNullRange(idx, ran) {
count, err := idx.GetRowCount(sctx, nil, []*ranger.Range{ran}, coll.Count)
count, err := idx.GetRowCount(sctx, nil, []*ranger.Range{ran}, coll.Count, coll.ModifyCount)
if err != nil {
return 0, errors.Trace(err)
}
Expand Down

0 comments on commit 0bc0037

Please sign in to comment.