Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: delete extended stats cache item in current tidb synchronously #23088

Merged
merged 4 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,18 @@ const (
// statsCache caches the tables in memory for Handle.
type statsCache struct {
tables map[int64]*statistics.Table
// version is the latest version of cache.
version uint64
memUsage int64
// version is the latest version of cache. It is bumped when new records of `mysql.stats_meta` are loaded into cache.
version uint64
// minorVersion is to differentiate the cache when the version is unchanged while the cache contents are
// modified indeed. This can happen when we load extra column histograms into cache, or when we modify the cache with
// statistics feedbacks, etc. We cannot bump the version then because no new changes of `mysql.stats_meta` are loaded,
// while the override of statsCache is in a copy-on-write way, to make sure the statsCache is unchanged by others during the
// the interval of 'copy' and 'write', every 'write' should bump / check this minorVersion if the version keeps
// unchanged.
// This bump / check logic is encapsulated in `statsCache.update` and `updateStatsCache`, callers don't need to care
// about this minorVersion actually.
minorVersion uint64
memUsage int64
}

// Handle can update stats info periodically.
Expand Down Expand Up @@ -516,20 +525,27 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist
return tbl
}

func (h *Handle) updateStatsCache(newCache statsCache) {
// updateStatsCache overrides the global statsCache with a new one, it may fail
// if the global statsCache has been modified by others already.
// Callers should add retry loop if necessary.
func (h *Handle) updateStatsCache(newCache statsCache) (updated bool) {
eurekaka marked this conversation as resolved.
Show resolved Hide resolved
h.statsCache.Lock()
oldCache := h.statsCache.Load().(statsCache)
if oldCache.version <= newCache.version {
if oldCache.version < newCache.version || (oldCache.version == newCache.version && oldCache.minorVersion < newCache.minorVersion) {
h.statsCache.memTracker.Consume(newCache.memUsage - oldCache.memUsage)
h.statsCache.Store(newCache)
updated = true
}
h.statsCache.Unlock()
return
}

func (sc statsCache) copy() statsCache {
newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)),
version: sc.version,
memUsage: sc.memUsage}
version: sc.version,
minorVersion: sc.minorVersion,
memUsage: sc.memUsage,
}
for k, v := range sc.tables {
newCache.tables[k] = v
}
Expand All @@ -550,7 +566,12 @@ func (sc statsCache) initMemoryUsage() {
// update updates the statistics table cache using copy on write.
func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache {
newCache := sc.copy()
newCache.version = newVersion
if newVersion == newCache.version {
newCache.minorVersion += uint64(1)
} else {
newCache.version = newVersion
newCache.minorVersion = uint64(0)
}
for _, tbl := range tables {
id := tbl.PhysicalID
if ptbl, ok := newCache.tables[id]; ok {
Expand Down Expand Up @@ -584,12 +605,11 @@ func (h *Handle) LoadNeededHistograms() (err error) {
}()

for _, col := range cols {
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[col.TableID]
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.tables[col.TableID]
if !ok {
continue
}
tbl = tbl.Copy()
c, ok := tbl.Columns[col.ColumnID]
if !ok || c.Len() > 0 {
statistics.HistogramNeededColumns.Delete(col)
Expand All @@ -614,7 +634,7 @@ func (h *Handle) LoadNeededHistograms() (err error) {
if len(rows) == 0 {
logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID))
}
tbl.Columns[c.ID] = &statistics.Column{
colHist := &statistics.Column{
PhysicalID: col.TableID,
Histogram: *hg,
Info: c.Info,
Expand All @@ -625,9 +645,19 @@ func (h *Handle) LoadNeededHistograms() (err error) {
IsHandle: c.IsHandle,
StatsVer: rows[0].GetInt64(0),
}
tbl.Columns[c.ID].Count = int64(tbl.Columns[c.ID].TotalRowCount())
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
statistics.HistogramNeededColumns.Delete(col)
colHist.Count = int64(colHist.TotalRowCount())
// Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions
// like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already.
oldCache = h.statsCache.Load().(statsCache)
tbl, ok = oldCache.tables[col.TableID]
if !ok {
continue
}
tbl = tbl.Copy()
tbl.Columns[c.ID] = colHist
if h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version)) {
statistics.HistogramNeededColumns.Delete(col)
}
}
return nil
}
Expand Down Expand Up @@ -1226,7 +1256,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t
// MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta.
func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) {
ctx := context.Background()
rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID)
rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, StatsStatusInited, StatsStatusAnalyzed)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1236,6 +1266,9 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
}
return errors.New(fmt.Sprintf("extended statistics '%s' for the specified table does not exist", statsName))
}
if len(rows) > 1 {
logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID))
}

h.mu.Lock()
defer h.mu.Unlock()
Expand All @@ -1245,7 +1278,11 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
return errors.Trace(err)
}
defer func() {
err = finishTransaction(ctx, exec, err)
err1 := finishTransaction(ctx, exec, err)
if err == nil && err1 == nil {
h.removeExtendedStatsItem(tableID, statsName)
}
err = err1
}()
txn, err := h.mu.ctx.Txn(true)
if err != nil {
Expand All @@ -1261,28 +1298,53 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi
return nil
}

const updateStatsCacheRetryCnt = 5

func (h *Handle) removeExtendedStatsItem(tableID int64, statsName string) {
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
tbl, ok := oldCache.tables[tableID]
if !ok || tbl.ExtendedStats == nil || len(tbl.ExtendedStats.Stats) == 0 {
return
}
newTbl := tbl.Copy()
delete(newTbl.ExtendedStats.Stats, statsName)
if h.updateStatsCache(oldCache.update([]*statistics.Table{newTbl}, nil, oldCache.version)) {
return
}
if retry == 1 {
logutil.BgLogger().Info("remove extended stats cache failed", zap.String("stats_name", statsName), zap.Int64("table_id", tableID))
} else {
logutil.BgLogger().Info("remove extended stats cache failed, retrying", zap.String("stats_name", statsName), zap.Int64("table_id", tableID))
}
}
}

// ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended.
func (h *Handle) ReloadExtendedStatistics() error {
reader, err := h.getStatsReader(0)
if err != nil {
return err
}
oldCache := h.statsCache.Load().(statsCache)
tables := make([]*statistics.Table, 0, len(oldCache.tables))
for physicalID, tbl := range oldCache.tables {
t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true)
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
reader, err := h.getStatsReader(0)
if err != nil {
return err
}
tables = append(tables, t)
}
err = h.releaseStatsReader(reader)
if err != nil {
return err
oldCache := h.statsCache.Load().(statsCache)
tables := make([]*statistics.Table, 0, len(oldCache.tables))
for physicalID, tbl := range oldCache.tables {
t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true)
if err != nil {
return err
}
tables = append(tables, t)
}
err = h.releaseStatsReader(reader)
if err != nil {
return err
}
if h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) {
return nil
}
}
// Note that this update may fail when the statsCache.version has been modified by others.
h.updateStatsCache(oldCache.update(tables, nil, oldCache.version))
return nil
return errors.New(fmt.Sprintf("update stats cache failed for %d attempts", updateStatsCacheRetryCnt))
}

// BuildExtendedStats build extended stats for column groups if needed based on the column samples.
Expand Down
32 changes: 32 additions & 0 deletions statistics/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,38 @@ func (s *testStatsSuite) TestCorrelationStatsCompute(c *C) {
c.Assert(foundS1 && foundS2, IsTrue)
}

func (s *testStatsSuite) TestSyncStatsExtendedRemoval(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set session tidb_enable_extended_stats = on")
tk.MustExec("use test")
tk.MustExec("create table t(a int, b int)")
tk.MustExec("insert into t values(1,1),(2,2),(3,3)")
tk.MustExec("alter table t add stats_extended s1 correlation(a,b)")
tk.MustExec("analyze table t")
do := s.do
is := do.InfoSchema()
tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
tableInfo := tbl.Meta()
statsTbl := do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl, NotNil)
c.Assert(statsTbl.ExtendedStats, NotNil)
c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 1)
item := statsTbl.ExtendedStats.Stats["s1"]
c.Assert(item, NotNil)
result := tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'")
c.Assert(len(result.Rows()), Equals, 1)

tk.MustExec("alter table t drop stats_extended s1")
statsTbl = do.StatsHandle().GetTableStats(tableInfo)
c.Assert(statsTbl, NotNil)
c.Assert(statsTbl.ExtendedStats, NotNil)
c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 0)
result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'")
c.Assert(len(result.Rows()), Equals, 0)
}

func (s *testStatsSuite) TestStaticPartitionPruneMode(c *C) {
defer cleanEnv(c, s.store, s.do)
tk := testkit.NewTestKit(c, s.store)
Expand Down
16 changes: 12 additions & 4 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,8 +605,12 @@ OUTER:
newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag)
newTblStats.Columns[fb.Hist.ID] = &newCol
}
oldCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version))
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
if h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) {
break
}
}
}
}
}
Expand Down Expand Up @@ -638,8 +642,12 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) {
delete(h.mu.rateMap, id)
}
h.mu.Unlock()
oldCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version))
for retry := updateStatsCacheRetryCnt; retry > 0; retry-- {
oldCache := h.statsCache.Load().(statsCache)
if h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) {
break
}
}
}

// HandleUpdateStats update the stats using feedback.
Expand Down