Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats: fix data race when updating stats cache (#13647) #13687

Merged
merged 2 commits into from
Nov 22, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions statistics/handle/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"

"github.com/cznic/mathutil"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand All @@ -31,7 +32,7 @@ import (
"go.uber.org/zap"
)

func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
physicalID := row.GetInt64(1)
table, ok := h.getTableByPhysicalID(is, physicalID)
Expand All @@ -53,11 +54,11 @@ func (h *Handle) initStatsMeta4Chunk(is infoschema.InfoSchema, tables StatsCache
Version: row.GetUint64(0),
Name: getFullTableName(is, tableInfo),
}
tables[physicalID] = tbl
cache.tables[physicalID] = tbl
}
}

func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) {
func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (statsCache, error) {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY version, table_id, modify_count, count from mysql.stats_meta"
Expand All @@ -66,27 +67,27 @@ func (h *Handle) initStatsMeta(is infoschema.InfoSchema) (StatsCache, error) {
defer terror.Call(rc[0].Close)
}
if err != nil {
return nil, errors.Trace(err)
return statsCache{}, errors.Trace(err)
}
tables := StatsCache{}
tables := statsCache{tables: make(map[int64]*statistics.Table)}
req := rc[0].NewChunk()
iter := chunk.NewIterator4Chunk(req)
for {
err := rc[0].Next(context.TODO(), req)
if err != nil {
return nil, errors.Trace(err)
return statsCache{}, errors.Trace(err)
}
if req.NumRows() == 0 {
break
}
h.initStatsMeta4Chunk(is, tables, iter)
h.initStatsMeta4Chunk(is, &tables, iter)
}
return tables, nil
}

func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables StatsCache, iter *chunk.Iterator4Chunk) {
func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
table, ok := tables[row.GetInt64(0)]
table, ok := cache.tables[row.GetInt64(0)]
if !ok {
continue
}
Expand Down Expand Up @@ -137,7 +138,7 @@ func (h *Handle) initStatsHistograms4Chunk(is infoschema.InfoSchema, tables Stat
}
}

func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache) error {
func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, distinct_count, version, null_count, cm_sketch, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms"
Expand All @@ -158,15 +159,15 @@ func (h *Handle) initStatsHistograms(is infoschema.InfoSchema, tables StatsCache
if req.NumRows() == 0 {
break
}
h.initStatsHistograms4Chunk(is, tables, iter)
h.initStatsHistograms4Chunk(is, cache, iter)
}
return nil
}

func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chunk.Iterator4Chunk) {
func initStatsBuckets4Chunk(ctx sessionctx.Context, cache *statsCache, iter *chunk.Iterator4Chunk) {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
tableID, isIndex, histID := row.GetInt64(0), row.GetInt64(1), row.GetInt64(2)
table, ok := tables[tableID]
table, ok := cache.tables[tableID]
if !ok {
continue
}
Expand Down Expand Up @@ -209,7 +210,7 @@ func initStatsBuckets4Chunk(ctx sessionctx.Context, tables StatsCache, iter *chu
}
}

func (h *Handle) initStatsBuckets(tables StatsCache) error {
func (h *Handle) initStatsBuckets(cache *statsCache) error {
h.mu.Lock()
defer h.mu.Unlock()
sql := "select HIGH_PRIORITY table_id, is_index, hist_id, count, repeats, lower_bound, upper_bound from mysql.stats_buckets order by table_id, is_index, hist_id, bucket_id"
Expand All @@ -230,12 +231,11 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error {
if req.NumRows() == 0 {
break
}
initStatsBuckets4Chunk(h.mu.ctx, tables, iter)
initStatsBuckets4Chunk(h.mu.ctx, cache, iter)
}
for _, table := range tables {
if h.mu.lastVersion < table.Version {
h.mu.lastVersion = table.Version
}
lastVersion := uint64(0)
for _, table := range cache.tables {
lastVersion = mathutil.MaxUint64(lastVersion, table.Version)
for _, idx := range table.Indices {
for i := 1; i < idx.Len(); i++ {
idx.Buckets[i].Count += idx.Buckets[i-1].Count
Expand All @@ -249,24 +249,25 @@ func (h *Handle) initStatsBuckets(tables StatsCache) error {
col.PreCalculateScalar()
}
}
cache.version = lastVersion
return nil
}

// InitStats will init the stats cache using full load strategy.
func (h *Handle) InitStats(is infoschema.InfoSchema) error {
tables, err := h.initStatsMeta(is)
cache, err := h.initStatsMeta(is)
if err != nil {
return errors.Trace(err)
}
err = h.initStatsHistograms(is, tables)
err = h.initStatsHistograms(is, &cache)
if err != nil {
return errors.Trace(err)
}
err = h.initStatsBuckets(tables)
err = h.initStatsBuckets(&cache)
if err != nil {
return errors.Trace(err)
}
h.StatsCache.Store(tables)
h.updateStatsCache(cache)
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions statistics/handle/dump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package handle_test

import (
"fmt"
"sync"

. "github.com/pingcap/check"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -48,7 +49,15 @@ func (s *testStatsSuite) TestConversion(c *C) {
tbl := h.GetTableStats(tableInfo.Meta())
assertTableEqual(c, loadTbl, tbl)

cleanEnv(c, s.store, s.do)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
c.Assert(h.Update(is), IsNil)
wg.Done()
}()
err = h.LoadStatsFromJSON(is, jsonTbl)
wg.Wait()
c.Assert(err, IsNil)
loadTblInStorage := h.GetTableStats(tableInfo.Meta())
assertTableEqual(c, loadTblInStorage, tbl)
Expand Down
87 changes: 50 additions & 37 deletions statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ import (
"go.uber.org/zap"
)

// StatsCache caches the tables in memory for Handle.
type StatsCache map[int64]*statistics.Table
// statsCache caches the tables in memory for Handle.
type statsCache struct {
tables map[int64]*statistics.Table
// version is the latest version of cache.
version uint64
}

// Handle can update stats info periodically.
type Handle struct {
mu struct {
sync.Mutex
ctx sessionctx.Context
// lastVersion is the latest update version before last lease.
lastVersion uint64
// rateMap contains the error rate delta from feedback.
rateMap errorRateDeltaMap
// pid2tid is the map from partition ID to table ID.
Expand All @@ -57,9 +59,15 @@ type Handle struct {
schemaVersion int64
}

// It can be read by multiply readers at the same time without acquire lock, but it can be
// written only after acquire the lock.
statsCache struct {
sync.Mutex
atomic.Value
}

restrictedExec sqlexec.RestrictedSQLExecutor

StatsCache atomic.Value
// ddlEventCh is a channel to notify a ddl operation has happened.
// It is sent only by owner or the drop stats executor, and read by stats handle.
ddlEventCh chan *util.Event
Expand All @@ -73,11 +81,10 @@ type Handle struct {
lease atomic2.Duration
}

// Clear the StatsCache, only for test.
// Clear the statsCache, only for test.
func (h *Handle) Clear() {
h.mu.Lock()
h.StatsCache.Store(StatsCache{})
h.mu.lastVersion = 0
h.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
for len(h.ddlEventCh) > 0 {
<-h.ddlEventCh
}
Expand Down Expand Up @@ -109,7 +116,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration) *Handle {
}
handle.mu.ctx = ctx
handle.mu.rateMap = make(errorRateDeltaMap)
handle.StatsCache.Store(StatsCache{})
handle.statsCache.Store(statsCache{tables: make(map[int64]*statistics.Table)})
return handle
}

Expand Down Expand Up @@ -138,14 +145,15 @@ func DurationToTS(d time.Duration) uint64 {

// Update reads stats meta from store and updates the stats map.
func (h *Handle) Update(is infoschema.InfoSchema) error {
lastVersion := h.LastUpdateVersion()
oldCache := h.statsCache.Load().(statsCache)
lastVersion := oldCache.version
// We need this because for two tables, the smaller version may write later than the one with larger version.
// Consider the case that there are two tables A and B, their version and commit time is (A0, A1) and (B0, B1),
// and A0 < B0 < B1 < A1. We will first read the stats of B, and update the lastVersion to B0, but we cannot read
// the table stats of A0 if we read stats that greater than lastVersion which is B0.
// We can read the stats if the diff between commit time and version is less than three lease.
offset := DurationToTS(3 * h.Lease())
if lastVersion >= offset {
if oldCache.version >= offset {
lastVersion = lastVersion - offset
} else {
lastVersion = 0
Expand Down Expand Up @@ -189,10 +197,7 @@ func (h *Handle) Update(is infoschema.InfoSchema) error {
tbl.Name = getFullTableName(is, tableInfo)
tables = append(tables, tbl)
}
h.mu.Lock()
h.mu.lastVersion = lastVersion
h.UpdateTableStats(tables, deletedTableIDs)
h.mu.Unlock()
h.updateStatsCache(oldCache.update(tables, deletedTableIDs, lastVersion))
return nil
}

Expand Down Expand Up @@ -231,43 +236,54 @@ func (h *Handle) GetTableStats(tblInfo *model.TableInfo) *statistics.Table {

// GetPartitionStats retrieves the partition stats from cache.
func (h *Handle) GetPartitionStats(tblInfo *model.TableInfo, pid int64) *statistics.Table {
tbl, ok := h.StatsCache.Load().(StatsCache)[pid]
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[pid]
if !ok {
tbl = statistics.PseudoTable(tblInfo)
tbl.PhysicalID = pid
h.UpdateTableStats([]*statistics.Table{tbl}, nil)
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
return tbl
}
return tbl
}

func (h *Handle) copyFromOldCache() StatsCache {
newCache := StatsCache{}
oldCache := h.StatsCache.Load().(StatsCache)
for k, v := range oldCache {
newCache[k] = v
func (h *Handle) updateStatsCache(newCache statsCache) {
h.statsCache.Lock()
oldCache := h.statsCache.Load().(statsCache)
if oldCache.version <= newCache.version {
h.statsCache.Store(newCache)
}
h.statsCache.Unlock()
}

func (sc statsCache) copy() statsCache {
newCache := statsCache{tables: make(map[int64]*statistics.Table, len(sc.tables)), version: sc.version}
for k, v := range sc.tables {
newCache.tables[k] = v
}
return newCache
}

// UpdateTableStats updates the statistics table cache using copy on write.
func (h *Handle) UpdateTableStats(tables []*statistics.Table, deletedIDs []int64) {
newCache := h.copyFromOldCache()
// update updates the statistics table cache using copy on write.
func (sc statsCache) update(tables []*statistics.Table, deletedIDs []int64, newVersion uint64) statsCache {
newCache := sc.copy()
newCache.version = newVersion
for _, tbl := range tables {
id := tbl.PhysicalID
newCache[id] = tbl
newCache.tables[id] = tbl
}
for _, id := range deletedIDs {
delete(newCache, id)
delete(newCache.tables, id)
}
h.StatsCache.Store(newCache)
return newCache
}

// LoadNeededHistograms will load histograms for those needed columns.
func (h *Handle) LoadNeededHistograms() error {
cols := statistics.HistogramNeededColumns.AllCols()
for _, col := range cols {
tbl, ok := h.StatsCache.Load().(StatsCache)[col.TableID]
statsCache := h.statsCache.Load().(statsCache)
tbl, ok := statsCache.tables[col.TableID]
if !ok {
continue
}
Expand All @@ -293,24 +309,21 @@ func (h *Handle) LoadNeededHistograms() error {
Count: int64(hg.TotalRowCount()),
IsHandle: c.IsHandle,
}
h.UpdateTableStats([]*statistics.Table{tbl}, nil)
h.updateStatsCache(statsCache.update([]*statistics.Table{tbl}, nil, statsCache.version))
statistics.HistogramNeededColumns.Delete(col)
}
return nil
}

// LastUpdateVersion gets the last update version.
func (h *Handle) LastUpdateVersion() uint64 {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.lastVersion
return h.statsCache.Load().(statsCache).version
}

// SetLastUpdateVersion sets the last update version.
func (h *Handle) SetLastUpdateVersion(version uint64) {
h.mu.Lock()
defer h.mu.Unlock()
h.mu.lastVersion = version
statsCache := h.statsCache.Load().(statsCache)
h.updateStatsCache(statsCache.update(nil, nil, version))
}

// FlushStats flushes the cached stats update into store.
Expand Down Expand Up @@ -479,7 +492,7 @@ func (h *Handle) columnStatsFromStorage(row chunk.Row, table *statistics.Table,

// tableStatsFromStorage loads table stats info from storage.
func (h *Handle) tableStatsFromStorage(tableInfo *model.TableInfo, physicalID int64, loadAll bool, historyStatsExec sqlexec.RestrictedSQLExecutor) (_ *statistics.Table, err error) {
table, ok := h.StatsCache.Load().(StatsCache)[physicalID]
table, ok := h.statsCache.Load().(statsCache).tables[physicalID]
// If table stats is pseudo, we also need to copy it, since we will use the column stats when
// the average error rate of it is small.
if !ok || historyStatsExec != nil {
Expand Down
Loading