diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 79d516d5c9f6c..048ea4fc8e318 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -55,9 +55,18 @@ const ( // statsCache caches the tables in memory for Handle. type statsCache struct { tables map[int64]*statistics.Table - // version is the latest version of cache. - version uint64 - memUsage int64 + // version is the latest version of cache. It is bumped when new records of `mysql.stats_meta` are loaded into cache. + version uint64 + // minorVersion is to differentiate the cache when the version is unchanged while the cache contents are + // modified indeed. This can happen when we load extra column histograms into cache, or when we modify the cache with + // statistics feedbacks, etc. We cannot bump the version then because no new changes of `mysql.stats_meta` are loaded, + // while the override of statsCache is in a copy-on-write way, to make sure the statsCache is unchanged by others during the + // the interval of 'copy' and 'write', every 'write' should bump / check this minorVersion if the version keeps + // unchanged. + // This bump / check logic is encapsulated in `statsCache.update` and `updateStatsCache`, callers don't need to care + // about this minorVersion actually. + minorVersion uint64 + memUsage int64 } // Handle can update stats info periodically. @@ -515,20 +524,27 @@ func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statist return tbl } -func (h *Handle) updateStatsCache(newCache statsCache) { +// updateStatsCache overrides the global statsCache with a new one, it may fail +// if the global statsCache has been modified by others already. +// Callers should add retry loop if necessary. +func (h *Handle) updateStatsCache(newCache statsCache) (updated bool) { h.statsCache.Lock() oldCache := h.statsCache.Load().(statsCache) - if oldCache.version <= newCache.version { + if oldCache.version < newCache.version || (oldCache.version == newCache.version && oldCache.minorVersion < newCache.minorVersion) { h.statsCache.memTracker.Consume(newCache.memUsage - oldCache.memUsage) h.statsCache.Store(newCache) + updated = true } h.statsCache.Unlock() + return } func (sc statsCache) copy() statsCache { newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), - version: sc.version, - memUsage: sc.memUsage} + version: sc.version, + minorVersion: sc.minorVersion, + memUsage: sc.memUsage, + } for k, v := range sc.tables { newCache.tables[k] = v } @@ -549,7 +565,12 @@ func (sc statsCache) initMemoryUsage() { // update updates the statistics table cache using copy on write. func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache { newCache := sc.copy() - newCache.version = newVersion + if newVersion == newCache.version { + newCache.minorVersion += uint64(1) + } else { + newCache.version = newVersion + newCache.minorVersion = uint64(0) + } for _, tbl := range tables { id := tbl.PhysicalID if ptbl, ok := newCache.tables[id]; ok { @@ -583,12 +604,11 @@ func (h *Handle) LoadNeededHistograms() (err error) { }() for _, col := range cols { - statsCache := h.statsCache.Load().(statsCache) - tbl, ok := statsCache.tables[col.TableID] + oldCache := h.statsCache.Load().(statsCache) + tbl, ok := oldCache.tables[col.TableID] if !ok { continue } - tbl = tbl.Copy() c, ok := tbl.Columns[col.ColumnID] if !ok || c.Len() > 0 { statistics.HistogramNeededColumns.Delete(col) @@ -613,7 +633,7 @@ func (h *Handle) LoadNeededHistograms() (err error) { if len(rows) == 0 { logutil.BgLogger().Error("fail to get stats version for this histogram", zap.Int64("table_id", col.TableID), zap.Int64("hist_id", col.ColumnID)) } - tbl.Columns[c.ID] = &statistics.Column{ + colHist := &statistics.Column{ PhysicalID: col.TableID, Histogram: *hg, Info: c.Info, @@ -624,9 +644,19 @@ func (h *Handle) LoadNeededHistograms() (err error) { IsHandle: c.IsHandle, StatsVer: rows[0].GetInt64(0), } - tbl.Columns[c.ID].Count = int64(tbl.Columns[c.ID].TotalRowCount()) - h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) - statistics.HistogramNeededColumns.Delete(col) + colHist.Count = int64(colHist.TotalRowCount()) + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + oldCache = h.statsCache.Load().(statsCache) + tbl, ok = oldCache.tables[col.TableID] + if !ok { + continue + } + tbl = tbl.Copy() + tbl.Columns[c.ID] = colHist + if h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version)) { + statistics.HistogramNeededColumns.Delete(col) + } } return nil } @@ -1225,7 +1255,7 @@ func (h *Handle) InsertExtendedStats(statsName string, colIDs []int64, tp int, t // MarkExtendedStatsDeleted update the status of mysql.stats_extended to be `deleted` and the version of mysql.stats_meta. func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExists bool) (err error) { ctx := context.Background() - rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %?", statsName, tableID) + rows, _, err := h.execRestrictedSQL(ctx, "SELECT name FROM mysql.stats_extended WHERE name = %? and table_id = %? and status in (%?, %?)", statsName, tableID, StatsStatusInited, StatsStatusAnalyzed) if err != nil { return errors.Trace(err) } @@ -1235,6 +1265,9 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi } return errors.New(fmt.Sprintf("extended statistics '%s' for the specified table does not exist", statsName)) } + if len(rows) > 1 { + logutil.BgLogger().Warn("unexpected duplicate extended stats records found", zap.String("name", statsName), zap.Int64("table_id", tableID)) + } h.mu.Lock() defer h.mu.Unlock() @@ -1244,7 +1277,11 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi return errors.Trace(err) } defer func() { - err = finishTransaction(ctx, exec, err) + err1 := finishTransaction(ctx, exec, err) + if err == nil && err1 == nil { + h.removeExtendedStatsItem(tableID, statsName) + } + err = err1 }() txn, err := h.mu.ctx.Txn(true) if err != nil { @@ -1260,28 +1297,53 @@ func (h *Handle) MarkExtendedStatsDeleted(statsName string, tableID int64, ifExi return nil } +const updateStatsCacheRetryCnt = 5 + +func (h *Handle) removeExtendedStatsItem(tableID int64, statsName string) { + for retry := updateStatsCacheRetryCnt; retry > 0; retry-- { + oldCache := h.statsCache.Load().(statsCache) + tbl, ok := oldCache.tables[tableID] + if !ok || tbl.ExtendedStats == nil || len(tbl.ExtendedStats.Stats) == 0 { + return + } + newTbl := tbl.Copy() + delete(newTbl.ExtendedStats.Stats, statsName) + if h.updateStatsCache(oldCache.update([]*statistics.Table{newTbl}, nil, oldCache.version)) { + return + } + if retry == 1 { + logutil.BgLogger().Info("remove extended stats cache failed", zap.String("stats_name", statsName), zap.Int64("table_id", tableID)) + } else { + logutil.BgLogger().Info("remove extended stats cache failed, retrying", zap.String("stats_name", statsName), zap.Int64("table_id", tableID)) + } + } +} + // ReloadExtendedStatistics drops the cache for extended statistics and reload data from mysql.stats_extended. func (h *Handle) ReloadExtendedStatistics() error { - reader, err := h.getStatsReader(0) - if err != nil { - return err - } - oldCache := h.statsCache.Load().(statsCache) - tables := make([]*statistics.Table, 0, len(oldCache.tables)) - for physicalID, tbl := range oldCache.tables { - t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true) + for retry := updateStatsCacheRetryCnt; retry > 0; retry-- { + reader, err := h.getStatsReader(0) if err != nil { return err } - tables = append(tables, t) - } - err = h.releaseStatsReader(reader) - if err != nil { - return err + oldCache := h.statsCache.Load().(statsCache) + tables := make([]*statistics.Table, 0, len(oldCache.tables)) + for physicalID, tbl := range oldCache.tables { + t, err := h.extendedStatsFromStorage(reader, tbl.Copy(), physicalID, true) + if err != nil { + return err + } + tables = append(tables, t) + } + err = h.releaseStatsReader(reader) + if err != nil { + return err + } + if h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) { + return nil + } } - // Note that this update may fail when the statsCache.version has been modified by others. - h.updateStatsCache(oldCache.update(tables, nil, oldCache.version)) - return nil + return errors.New(fmt.Sprintf("update stats cache failed for %d attempts", updateStatsCacheRetryCnt)) } // BuildExtendedStats build extended stats for column groups if needed based on the column samples. diff --git a/statistics/handle/handle_test.go b/statistics/handle/handle_test.go index 775b99d3b4b23..47fdc81edc931 100644 --- a/statistics/handle/handle_test.go +++ b/statistics/handle/handle_test.go @@ -1694,6 +1694,38 @@ func (s *testStatsSuite) TestCorrelationStatsCompute(c *C) { c.Assert(foundS1 && foundS2, IsTrue) } +func (s *testStatsSuite) TestSyncStatsExtendedRemoval(c *C) { + defer cleanEnv(c, s.store, s.do) + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + do := s.do + is := do.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + c.Assert(err, IsNil) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + c.Assert(statsTbl, NotNil) + c.Assert(statsTbl.ExtendedStats, NotNil) + c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 1) + item := statsTbl.ExtendedStats.Stats["s1"] + c.Assert(item, NotNil) + result := tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + c.Assert(len(result.Rows()), Equals, 1) + + tk.MustExec("alter table t drop stats_extended s1") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + c.Assert(statsTbl, NotNil) + c.Assert(statsTbl.ExtendedStats, NotNil) + c.Assert(len(statsTbl.ExtendedStats.Stats), Equals, 0) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + c.Assert(len(result.Rows()), Equals, 0) +} + func (s *testStatsSuite) TestStaticPartitionPruneMode(c *C) { defer cleanEnv(c, s.store, s.do) tk := testkit.NewTestKit(c, s.store) diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 4fb8da677c892..8a2266888f44d 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -627,8 +627,12 @@ OUTER: newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag) newTblStats.Columns[fb.Hist.ID] = &newCol } - oldCache := h.statsCache.Load().(statsCache) - h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) + for retry := updateStatsCacheRetryCnt; retry > 0; retry-- { + oldCache := h.statsCache.Load().(statsCache) + if h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) { + break + } + } } } } @@ -660,8 +664,12 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { delete(h.mu.rateMap, id) } h.mu.Unlock() - oldCache := h.statsCache.Load().(statsCache) - h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) + for retry := updateStatsCacheRetryCnt; retry > 0; retry-- { + oldCache := h.statsCache.Load().(statsCache) + if h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) { + break + } + } } // HandleUpdateStats update the stats using feedback.