diff --git a/statistics/handle/bootstrap.go b/statistics/handle/bootstrap.go index e4e83963017ac..b1b04fb828e9f 100644 --- a/statistics/handle/bootstrap.go +++ b/statistics/handle/bootstrap.go @@ -17,6 +17,7 @@ import ( "context" "fmt" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -31,7 +32,7 @@ import ( "go.uber.org/zap" ) -func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) { +func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { physicalID := row.GetInt64(1) table, ok := h.getTableByPhysicalID(is, physicalID) @@ -53,11 +54,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache Version: row.GetUint64(0), Name: getFullTableName(is, tableInfo), } - tables[physicalID] = tbl + cache.tables[physicalID] = tbl } } -func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) { +func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) { h.mu.Lock() defer h.mu.Unlock() sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta" @@ -66,27 +67,27 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) { defer terror.Call(rc[0].Close) } if err != nil { - return nil, errors.Trace(err) + return statsCache{}, errors.Trace(err) } - tables := StatsCache{} + tables := statsCache{tables: make(map[int64]*statistics.Table)} req := rc[0].NewChunk() iter := chunk.NewIterator4Chunk(req) for { err := rc[0].Next(context.TODO(), req) if err != nil { - return nil, errors.Trace(err) + return statsCache{}, errors.Trace(err) } if req.NumRows() == 0 { break } - h.initStatsMeta4Chunk(is, tables, iter) + h.initStatsMeta4Chunk(is, &tables, iter) } return tables, nil } -func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) { +func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { - table, ok := tables[row.GetInt64(0)] + table, ok := cache.tables[row.GetInt64(0)] if !ok { continue } @@ -137,7 +138,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat } } -func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error { +func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error { h.mu.Lock() defer h.mu.Unlock() sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms" @@ -158,15 +159,15 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache if req.NumRows() == 0 { break } - h.initStatsHistograms4Chunk(is, tables, iter) + h.initStatsHistograms4Chunk(is, cache, iter) } return nil } -func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chunk.Iterator4Chunk) { +func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) { for row := iter.Begin(); row != iter.End(); row = iter.Next() { tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2) - table, ok := tables[tableID] + table, ok := cache.tables[tableID] if !ok { continue } @@ -209,7 +210,7 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chu } } -func (h *Handle) initStatsBuckets(tables StatsCache) error { +func (h *Handle) initStatsBuckets(cache *statsCache) error { h.mu.Lock() defer h.mu.Unlock() sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id" @@ -230,12 +231,11 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error { if req.NumRows() == 0 { break } - initStatsBuckets4Chunk(h.mu.ctx, tables, iter) + initStatsBuckets4Chunk(h.mu.ctx, cache, iter) } - for _, table := range tables { - if h.mu.lastVersion < table.Version { - h.mu.lastVersion = table.Version - } + lastVersion := uint64(0) + for _, table := range cache.tables { + lastVersion = mathutil.MaxUint64(lastVersion, table.Version) for _, idx := range table.Indices { for i := 1; i < idx.Len(); i++ { idx.Buckets[i].Count += idx.Buckets[i-1].Count @@ -249,24 +249,25 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error { col.PreCalculateScalar() } } + cache.version = lastVersion return nil } // InitStats will init the stats cache using full load strategy. func (h *Handle) InitStats(is infoschema.InfoSchema) error { - tables, err := h.initStatsMeta(is) + cache, err := h.initStatsMeta(is) if err != nil { return errors.Trace(err) } - err = h.initStatsHistograms(is, tables) + err = h.initStatsHistograms(is, &cache) if err != nil { return errors.Trace(err) } - err = h.initStatsBuckets(tables) + err = h.initStatsBuckets(&cache) if err != nil { return errors.Trace(err) } - h.StatsCache.Store(tables) + h.updateStatsCache(cache) return nil } diff --git a/statistics/handle/dump_test.go b/statistics/handle/dump_test.go index 934518aa44a1c..2d4dcc52ff637 100644 --- a/statistics/handle/dump_test.go +++ b/statistics/handle/dump_test.go @@ -15,6 +15,7 @@ package handle_test import ( "fmt" + "sync" . "github.com/pingcap/check" "github.com/pingcap/parser/model" @@ -48,7 +49,15 @@ func (s *testStatsSuite) TestConversion(c *C) { tbl := h.GetTableStats(tableInfo.Meta()) assertTableEqual(c, loadTbl, tbl) + cleanEnv(c, s.store, s.do) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + c.Assert(h.Update(is), IsNil) + wg.Done() + }() err = h.LoadStatsFromJSON(is, jsonTbl) + wg.Wait() c.Assert(err, IsNil) loadTblInStorage := h.GetTableStats(tableInfo.Meta()) assertTableEqual(c, loadTblInStorage, tbl) diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index bdf16a79405e3..e2b65287490ec 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -39,16 +39,18 @@ import ( "go.uber.org/zap" ) -// StatsCache caches the tables in memory for Handle. -type StatsCache map[int64]*statistics.Table +// statsCache caches the tables in memory for Handle. +type statsCache struct { + tables map[int64]*statistics.Table + // version is the latest version of cache. + version uint64 +} // Handle can update stats info periodically. type Handle struct { mu struct { sync.Mutex ctx sessionctx.Context - // lastVersion is the latest update version before last lease. - lastVersion uint64 // rateMap contains the error rate delta from feedback. rateMap errorRateDeltaMap // pid2tid is the map from partition ID to table ID. @@ -57,9 +59,15 @@ type Handle struct { schemaVersion int64 } + // It can be read by multiply readers at the same time without acquire lock, but it can be + // written only after acquire the lock. + statsCache struct { + sync.Mutex + atomic.Value + } + restrictedExec sqlexec.RestrictedSQLExecutor - StatsCache atomic.Value // ddlEventCh is a channel to notify a ddl operation has happened. // It is sent only by owner or the drop stats executor, and read by stats handle. ddlEventCh chan *util.Event @@ -73,11 +81,10 @@ type Handle struct { lease atomic2.Duration } -// Clear the StatsCache, only for test. +// Clear the statsCache, only for test. func (h *Handle) Clear() { h.mu.Lock() - h.StatsCache.Store(StatsCache{}) - h.mu.lastVersion = 0 + h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) for len(h.ddlEventCh) > 0 { <-h.ddlEventCh } @@ -109,7 +116,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle { } handle.mu.ctx = ctx handle.mu.rateMap = make(errorRateDeltaMap) - handle.StatsCache.Store(StatsCache{}) + handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)}) return handle } @@ -138,14 +145,15 @@ func DurationToTS(d time.Duration) uint64 { // Update reads stats meta from store and updates the stats map. func (h *Handle) Update(is infoschema.InfoSchema) error { - lastVersion := h.LastUpdateVersion() + oldCache := h.statsCache.Load().(statsCache) + lastVersion := oldCache.version // We need this because for two tables, the smaller version may write later than the one with larger version. // Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1), // and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read // the table stats of A0 if we read stats that greater than lastVersion which is B0. // We can read the stats if the diff between commit time and version is less than three lease. offset := DurationToTS(3 * h.Lease()) - if lastVersion >= offset { + if oldCache.version >= offset { lastVersion = lastVersion - offset } else { lastVersion = 0 @@ -189,10 +197,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error { tbl.Name = getFullTableName(is, tableInfo) tables = append(tables, tbl) } - h.mu.Lock() - h.mu.lastVersion = lastVersion - h.UpdateTableStats(tables, deletedTableIDs) - h.mu.Unlock() + h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion)) return nil } @@ -231,43 +236,54 @@ func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table { // GetPartitionStats retrieves the partition stats from cache. func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table { - tbl, ok := h.StatsCache.Load().(StatsCache)[pid] + statsCache := h.statsCache.Load().(statsCache) + tbl, ok := statsCache.tables[pid] if !ok { tbl = statistics.PseudoTable(tblInfo) tbl.PhysicalID = pid - h.UpdateTableStats([]*statistics.Table{tbl}, nil) + h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) return tbl } return tbl } -func (h *Handle) copyFromOldCache() StatsCache { - newCache := StatsCache{} - oldCache := h.StatsCache.Load().(StatsCache) - for k, v := range oldCache { - newCache[k] = v +func (h *Handle) updateStatsCache(newCache statsCache) { + h.statsCache.Lock() + oldCache := h.statsCache.Load().(statsCache) + if oldCache.version <= newCache.version { + h.statsCache.Store(newCache) + } + h.statsCache.Unlock() +} + +func (sc statsCache) copy() statsCache { + newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), version: sc.version} + for k, v := range sc.tables { + newCache.tables[k] = v } return newCache } -// UpdateTableStats updates the statistics table cache using copy on write. -func (h *Handle) UpdateTableStats(tables []*statistics.Table, deletedIDs []int64) { - newCache := h.copyFromOldCache() +// update updates the statistics table cache using copy on write. +func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache { + newCache := sc.copy() + newCache.version = newVersion for _, tbl := range tables { id := tbl.PhysicalID - newCache[id] = tbl + newCache.tables[id] = tbl } for _, id := range deletedIDs { - delete(newCache, id) + delete(newCache.tables, id) } - h.StatsCache.Store(newCache) + return newCache } // LoadNeededHistograms will load histograms for those needed columns. func (h *Handle) LoadNeededHistograms() error { cols := statistics.HistogramNeededColumns.AllCols() for _, col := range cols { - tbl, ok := h.StatsCache.Load().(StatsCache)[col.TableID] + statsCache := h.statsCache.Load().(statsCache) + tbl, ok := statsCache.tables[col.TableID] if !ok { continue } @@ -293,7 +309,7 @@ func (h *Handle) LoadNeededHistograms() error { Count: int64(hg.TotalRowCount()), IsHandle: c.IsHandle, } - h.UpdateTableStats([]*statistics.Table{tbl}, nil) + h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version)) statistics.HistogramNeededColumns.Delete(col) } return nil @@ -301,16 +317,13 @@ func (h *Handle) LoadNeededHistograms() error { // LastUpdateVersion gets the last update version. func (h *Handle) LastUpdateVersion() uint64 { - h.mu.Lock() - defer h.mu.Unlock() - return h.mu.lastVersion + return h.statsCache.Load().(statsCache).version } // SetLastUpdateVersion sets the last update version. func (h *Handle) SetLastUpdateVersion(version uint64) { - h.mu.Lock() - defer h.mu.Unlock() - h.mu.lastVersion = version + statsCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(statsCache.update(nil, nil, version)) } // FlushStats flushes the cached stats update into store. @@ -479,7 +492,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table, // tableStatsFromStorage loads table stats info from storage. func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) { - table, ok := h.StatsCache.Load().(StatsCache)[physicalID] + table, ok := h.statsCache.Load().(statsCache).tables[physicalID] // If table stats is pseudo, we also need to copy it, since we will use the column stats when // the average error rate of it is small. if !ok || historyStatsExec != nil { diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 8f29ff52cfa68..500e5d827ee51 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -230,7 +230,7 @@ func needDumpStatsDelta(h *Handle, id int64, item variable.TableDelta, currentTi if item.InitTime.IsZero() { item.InitTime = currentTime } - tbl, ok := h.StatsCache.Load().(StatsCache)[id] + tbl, ok := h.statsCache.Load().(statsCache).tables[id] if !ok { // No need to dump if the stats is invalid. return false @@ -373,7 +373,7 @@ func (h *Handle) DumpStatsFeedbackToKV() error { if fb.Tp == statistics.PkType { err = h.DumpFeedbackToKV(fb) } else { - t, ok := h.StatsCache.Load().(StatsCache)[fb.PhysicalID] + t, ok := h.statsCache.Load().(statsCache).tables[fb.PhysicalID] if ok { err = h.DumpFeedbackForIndex(fb, t) } @@ -452,7 +452,8 @@ func (h *Handle) UpdateStatsByLocalFeedback(is infoschema.InfoSchema) { newCol.Flag = statistics.ResetAnalyzeFlag(newCol.Flag) newTblStats.Columns[fb.Hist.ID] = &newCol } - h.UpdateTableStats([]*statistics.Table{newTblStats}, nil) + oldCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(oldCache.update([]*statistics.Table{newTblStats}, nil, oldCache.version)) } } @@ -483,7 +484,8 @@ func (h *Handle) UpdateErrorRate(is infoschema.InfoSchema) { delete(h.mu.rateMap, id) } h.mu.Unlock() - h.UpdateTableStats(tbls, nil) + oldCache := h.statsCache.Load().(statsCache) + h.updateStatsCache(oldCache.update(tbls, nil, oldCache.version)) } // HandleUpdateStats update the stats using feedback. @@ -871,7 +873,7 @@ func logForIndex(prefix string, t *statistics.Table, idx *statistics.Index, rang } func (h *Handle) logDetailedInfo(q *statistics.QueryFeedback) { - t, ok := h.StatsCache.Load().(StatsCache)[q.PhysicalID] + t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] if !ok { return } @@ -912,7 +914,7 @@ func logForPK(prefix string, c *statistics.Column, ranges []*ranger.Range, actua // RecalculateExpectCount recalculates the expect row count if the origin row count is estimated by pseudo. func (h *Handle) RecalculateExpectCount(q *statistics.QueryFeedback) error { - t, ok := h.StatsCache.Load().(StatsCache)[q.PhysicalID] + t, ok := h.statsCache.Load().(statsCache).tables[q.PhysicalID] if !ok { return nil }