Skip to content

Commit

Permalink
statistics: delete extended stats cache item in current tidb synchron…
Browse files Browse the repository at this point in the history
…ously (#23088)
  • Loading branch information
eurekaka authored Mar 12, 2021
1 parent f0ccc57 commit 6d258c7
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 37 deletions.
128 changes: 95 additions & 33 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ const (
// statsCache caches the tables in memory for Handle.
type statsCache struct {
tables map[int64]*statistics.Table
// version is the latest version of cache.
version uint64
memUsage int64
// version is the latest version of cache. It is bumped when new records of `mysql.stats_meta` are loaded into cache.
version uint64
// minorVersion is to differentiate the cache when the version is unchanged while the cache contents are
// modified indeed. This can happen when we load extra column histograms into cache, or when we modify the cache with
// statistics feedbacks, etc. We cannot bump the version then because no new changes of `mysql.stats_meta` are loaded,
// while the override of statsCache is in a copy-on-write way, to make sure the statsCache is unchanged by others during the
// the interval of 'copy' and 'write', every 'write' should bump / check this minorVersion if the version keeps
// unchanged.
// This bump / check logic is encapsulated in `statsCache.update` and `updateStatsCache`, callers don't need to care
// about this minorVersion actually.
minorVersion uint64
memUsage int64
}

// Handle can update stats info periodically.
Expand Down Expand Up @@ -515,20 +524,27 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist
return tbl
}

func (h *Handle) updateStatsCache(newCache statsCache) {
// updateStatsCache overrides the global statsCache with a new one, it may fail
// if the global statsCache has been modified by others already.
// Callers should add retry loop if necessary.
func (h *Handle) updateStatsCache(newCache statsCache) (updated bool) {
h.statsCache.Lock()
oldCache := h.statsCache.Load().(statsCache)
if oldCache.version <= newCache.version {
if oldCache.version < newCache.version || (oldCache.version == newCache.version && oldCache.minorVersion < newCache.minorVersion) {
h.statsCache.memTracker.Consume(newCache.memUsage - oldCache.memUsage)
h.statsCache.Store(newCache)
updated = true
}
h.statsCache.Unlock()
return
}

func (sc statsCache) copy() statsCache {
newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)),
version: sc.version,
memUsage: sc.memUsage}
version: sc.version,
minorVersion: sc.minorVersion,
memUsage: sc.memUsage,
}
for k, v := range sc.tables {
newCache.tables[k] = v
}
Expand All @@ -549,7 +565,12 @@ func (sc statsCache) initMemoryUsage() {
// update updates the statistics table cache using copy on write.
func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache {
newCache := sc.copy()
newCache.version = newVersion
if newVersion == newCache.version {
newCache.minorVersion += uint64(1)
} else {
newCache.version = newVersion
newCache.minorVersion = uint64(0)
}
for _, tbl := range tables {
id := tbl.PhysicalID
if ptbl, ok := newCache.tables[id]; ok {
Expand Down Expand Up @@ -583,12 +604,11 @@ func (h *Handle) LoadNeededHistograms() (err error) {
}()

for _, col := range cols {
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[col.TableID]
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.tables[col.TableID]
if !ok {
continue
}
tbl = tbl.Copy()
c, ok := tbl.Columns[col.ColumnID]
if !ok || c.Len() > 0 {
statistics.HistogramNeededColumns.Delete(col)
Expand All @@ -613,7 +633,7 @@ func (h *Handle) LoadNeededHistograms() (err error) {
if len(rows) == 0 {
logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID))
}
tbl.Columns[c.ID] = &statistics.Column{
colHist := &statistics.Column{
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
Expand All @@ -624,9 +644,19 @@ func (h *Handle) LoadNeededHistograms() (err error) {
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
}
tbl.Columns[c.ID].Count = int64(tbl.Columns[c.ID].TotalRowCount())
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
statistics.HistogramNeededColumns.Delete(col)
colHist.Count = int64(colHist.TotalRowCount())
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
oldCache = h.statsCache.Load().(statsCache)
tbl, ok = oldCache.tables[col.TableID]
if !ok {
continue
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
if h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version)) {
statistics.HistogramNeededColumns.Delete(col)
}
}
return nil
}
Expand Down Expand Up @@ -1225,7 +1255,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta.
func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) {
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID)
rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, StatsStatusInited, StatsStatusAnalyzed)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1235,6 +1265,9 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
}
return errors.New(fmt.Sprintf("extended statistics '%s' for the specified table does not exist", statsName))
}
if len(rows) > 1 {
logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID))
}

h.mu.Lock()
defer h.mu.Unlock()
Expand All @@ -1244,7 +1277,11 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err1 := finishTransaction(ctx, exec, err)
if err == nil && err1 == nil {
h.removeExtendedStatsItem(tableID, statsName)
}
err = err1
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
Expand All @@ -1260,28 +1297,53 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
return nil
}

const updateStatsCacheRetryCnt = 5

func (h *Handle) removeExtendedStatsItem(tableID int64, statsName string) {
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.tables[tableID]
if !ok || tbl.ExtendedStats == nil || len(tbl.ExtendedStats.Stats) == 0 {
return
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
if h.updateStatsCache(oldCache.update([]*statistics.Table{newTbl}, nil, oldCache.version)) {
return
}
if retry == 1 {
logutil.BgLogger().Info("remove extended stats cache failed", zap.String("stats_name", statsName), zap.Int64("table_id", tableID))
} else {
logutil.BgLogger().Info("remove extended stats cache failed, retrying", zap.String("stats_name", statsName), zap.Int64("table_id", tableID))
}
}
}

// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended.
func (h *Handle) ReloadExtendedStatistics() error {
reader, err := h.getStatsReader(0)
if err != nil {
return err
}
oldCache := h.statsCache.Load().(statsCache)
tables := make([]*statistics.Table, 0, len(oldCache.tables))
for physicalID, tbl := range oldCache.tables {
t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true)
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
reader, err := h.getStatsReader(0)
if err != nil {
return err
}
tables = append(tables, t)
}
err = h.releaseStatsReader(reader)
if err != nil {
return err
oldCache := h.statsCache.Load().(statsCache)
tables := make([]*statistics.Table, 0, len(oldCache.tables))
for physicalID, tbl := range oldCache.tables {
t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true)
if err != nil {
return err
}
tables = append(tables, t)
}
err = h.releaseStatsReader(reader)
if err != nil {
return err
}
if h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) {
return nil
}
}
// Note that this update may fail when the statsCache.version has been modified by others.
h.updateStatsCache(oldCache.update(tables, nil, oldCache.version))
return nil
return errors.New(fmt.Sprintf("update stats cache failed for %d attempts", updateStatsCacheRetryCnt))
}

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
Expand Down
32 changes: 32 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1694,6 +1694,38 @@ func (s *testStatsSuite) TestCorrelationStatsCompute(c *C) {
c.Assert(foundS1 && foundS2, IsTrue)
}

func (s *testStatsSuite) TestSyncStatsExtendedRemoval(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set session tidb_enable_extended_stats = on")
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1,1),(2,2),(3,3)")
tk.MustExec("alter table t add stats_extended s1 correlation(a,b)")
tk.MustExec("analyze table t")
do := s.do
is := do.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := tbl.Meta()
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl, NotNil)
c.Assert(statsTbl.ExtendedStats, NotNil)
c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 1)
item := statsTbl.ExtendedStats.Stats["s1"]
c.Assert(item, NotNil)
result := tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'")
c.Assert(len(result.Rows()), Equals, 1)

tk.MustExec("alter table t drop stats_extended s1")
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl, NotNil)
c.Assert(statsTbl.ExtendedStats, NotNil)
c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 0)
result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'")
c.Assert(len(result.Rows()), Equals, 0)
}

func (s *testStatsSuite) TestStaticPartitionPruneMode(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
Expand Down
16 changes: 12 additions & 4 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,12 @@ OUTER:
newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag)
newTblStats.Columns[fb.Hist.ID] = &newCol
}
oldCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version))
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
if h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) {
break
}
}
}
}
}
Expand Down Expand Up @@ -660,8 +664,12 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) {
delete(h.mu.rateMap, id)
}
h.mu.Unlock()
oldCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version))
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
if h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) {
break
}
}
}

// HandleUpdateStats update the stats using feedback.
Expand Down

0 comments on commit 6d258c7

Please sign in to comment.