diff --git a/statistics/handle/cache/internal/inner.go b/statistics/handle/cache/internal/inner.go index a4009f9ce3fca..eb934d5be4f40 100644 --- a/statistics/handle/cache/internal/inner.go +++ b/statistics/handle/cache/internal/inner.go @@ -39,10 +39,6 @@ type StatsCacheInner interface { Copy() StatsCacheInner // SetCapacity sets the capacity of the cache SetCapacity(int64) - - // Front returns the front element's owner tableID, only used for test - // TODO: this method is mainly for test, remove it in the future. - Front() int64 // Close stops the cache Close() } diff --git a/statistics/handle/cache/internal/lfu/BUILD.bazel b/statistics/handle/cache/internal/lfu/BUILD.bazel index 7897826d96e2b..5473d15f83612 100644 --- a/statistics/handle/cache/internal/lfu/BUILD.bazel +++ b/statistics/handle/cache/internal/lfu/BUILD.bazel @@ -28,8 +28,9 @@ go_test( embed = [":lfu"], flaky = True, race = "on", - shard_count = 6, + shard_count = 7, deps = [ + "//statistics", "//statistics/handle/cache/internal/testutil", "@com_github_stretchr_testify//require", ], diff --git a/statistics/handle/cache/internal/lfu/key_set.go b/statistics/handle/cache/internal/lfu/key_set.go index b4dcd1e3df703..7b335d8986627 100644 --- a/statistics/handle/cache/internal/lfu/key_set.go +++ b/statistics/handle/cache/internal/lfu/key_set.go @@ -53,19 +53,10 @@ func (ks *keySet) Len() int { return result } -func (ks *keySet) AddKeyValue(key int64, value *statistics.Table) (cost int64) { +func (ks *keySet) AddKeyValue(key int64, value *statistics.Table) { ks.mu.Lock() - if v, ok := ks.set[key]; ok && v != nil { - cost = v.MemoryUsage().TotalTrackingMemUsage() - } ks.set[key] = value ks.mu.Unlock() - if value != nil { - cost = value.MemoryUsage().TotalTrackingMemUsage() - cost - } else { - cost = -cost - } - return cost } func (ks *keySet) Get(key int64) (*statistics.Table, bool) { diff --git a/statistics/handle/cache/internal/lfu/lfu_cache.go b/statistics/handle/cache/internal/lfu/lfu_cache.go index fc058100df7cb..6ec45100c62e5 100644 --- a/statistics/handle/cache/internal/lfu/lfu_cache.go +++ b/statistics/handle/cache/internal/lfu/lfu_cache.go @@ -78,11 +78,15 @@ func (s *LFU) Get(tid int64, _ bool) (*statistics.Table, bool) { // Put implements statsCacheInner func (s *LFU) Put(tblID int64, tbl *statistics.Table) bool { - ok := s.cache.Set(tblID, tbl, tbl.MemoryUsage().TotalTrackingMemUsage()) - if ok { // NOTE: `s.cache` and `s.resultKeySet` may be inconsistent since the update operation is not atomic, but it's acceptable for our scenario - s.resultKeySet.AddKeyValue(tblID, tbl) - s.cost.Add(tbl.MemoryUsage().TotalTrackingMemUsage()) - } + cost := tbl.MemoryUsage().TotalTrackingMemUsage() + // Here we need to insert resultKeySet first and then write to LFU, + // in order to prevent data race. If the LFU cost is already full, + // a rejection may occur, triggering the onEvict event. + // Both inserting into resultKeySet and evicting will modify the memory cost, + // so we need to stagger these two actions. + s.resultKeySet.AddKeyValue(tblID, tbl) + s.cost.Add(cost) + ok := s.cache.Set(tblID, tbl, cost) metrics.CostGauge.Set(float64(s.cost.Load())) return ok } @@ -102,8 +106,8 @@ func (s *LFU) Cost() int64 { func (s *LFU) Values() []*statistics.Table { result := make([]*statistics.Table, 0, 512) for _, k := range s.resultKeySet.Keys() { - if value, ok := s.cache.Get(k); ok { - result = append(result, value.(*statistics.Table)) + if value, ok := s.resultKeySet.Get(k); ok { + result = append(result, value) } } return result @@ -117,8 +121,9 @@ func DropEvicted(item statistics.TableCacheItem) { item.DropUnnecessaryData() } -func (*LFU) onReject(*ristretto.Item) { +func (s *LFU) onReject(item *ristretto.Item) { metrics.RejectCounter.Add(1.0) + s.onEvict(item) } func (s *LFU) onEvict(item *ristretto.Item) { @@ -159,11 +164,6 @@ func (s *LFU) Len() int { return s.resultKeySet.Len() } -// Front implements statsCacheInner -func (*LFU) Front() int64 { - return 0 -} - // Copy implements statsCacheInner func (s *LFU) Copy() internal.StatsCacheInner { return s diff --git a/statistics/handle/cache/internal/lfu/lfu_cache_test.go b/statistics/handle/cache/internal/lfu/lfu_cache_test.go index d441af3ce13ad..4e19eb6c86dda 100644 --- a/statistics/handle/cache/internal/lfu/lfu_cache_test.go +++ b/statistics/handle/cache/internal/lfu/lfu_cache_test.go @@ -17,7 +17,9 @@ package lfu import ( "sync" "testing" + "time" + "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/statistics/handle/cache/internal/testutil" "github.com/stretchr/testify/require" ) @@ -46,11 +48,14 @@ func TestLFUPutGetDel(t *testing.T) { } func TestLFUFreshMemUsage(t *testing.T) { - lfu, err := NewLFU(1000) + lfu, err := NewLFU(10000) require.NoError(t, err) t1 := testutil.NewMockStatisticsTable(1, 1, true, false, false) + require.Equal(t, mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalMemUsage) t2 := testutil.NewMockStatisticsTable(2, 2, true, false, false) + require.Equal(t, 2*mockCMSMemoryUsage+2*mockCMSMemoryUsage, t2.MemoryUsage().TotalMemUsage) t3 := testutil.NewMockStatisticsTable(3, 3, true, false, false) + require.Equal(t, 3*mockCMSMemoryUsage+3*mockCMSMemoryUsage, t3.MemoryUsage().TotalMemUsage) lfu.Put(int64(1), t1) lfu.Put(int64(2), t2) lfu.Put(int64(3), t3) @@ -59,7 +64,7 @@ func TestLFUFreshMemUsage(t *testing.T) { t4 := testutil.NewMockStatisticsTable(2, 1, true, false, false) lfu.Put(int64(1), t4) lfu.wait() - require.Equal(t, lfu.Cost(), 6*mockCMSMemoryUsage+7*mockCMSMemoryUsage) + require.Equal(t, lfu.Cost(), 7*mockCMSMemoryUsage+6*mockCMSMemoryUsage) t5 := testutil.NewMockStatisticsTable(2, 2, true, false, false) lfu.Put(int64(1), t5) lfu.wait() @@ -164,3 +169,31 @@ func TestLFUCachePutGetWithManyConcurrency2(t *testing.T) { require.Equal(t, uint64(lfu.Cost()), lfu.metrics().CostAdded()-lfu.metrics().CostEvicted()) require.Equal(t, 1000, len(lfu.Values())) } + +func TestLFUReject(t *testing.T) { + capacity := int64(100000000000) + lfu, err := NewLFU(capacity) + require.NoError(t, err) + t1 := testutil.NewMockStatisticsTable(2, 1, true, false, false) + require.Equal(t, 2*mockCMSMemoryUsage+mockCMSMemoryUsage, t1.MemoryUsage().TotalTrackingMemUsage()) + lfu.Put(1, t1) + lfu.wait() + require.Equal(t, lfu.Cost(), 2*mockCMSMemoryUsage+mockCMSMemoryUsage) + + lfu.SetCapacity(2*mockCMSMemoryUsage + mockCMSMemoryUsage - 1) + + t2 := testutil.NewMockStatisticsTable(2, 1, true, false, false) + require.True(t, lfu.Put(2, t2)) + lfu.wait() + time.Sleep(3 * time.Second) + require.Equal(t, int64(12), lfu.Cost()) + require.Len(t, lfu.Values(), 2) + v, ok := lfu.Get(2, false) + require.True(t, ok) + for _, c := range v.Columns { + require.Equal(t, c.GetEvictedStatus(), statistics.AllEvicted) + } + for _, i := range v.Indices { + require.Equal(t, i.GetEvictedStatus(), statistics.AllEvicted) + } +} diff --git a/statistics/handle/cache/internal/mapcache/map_cache.go b/statistics/handle/cache/internal/mapcache/map_cache.go index 2a22e2764a5d4..8e9f9a2532120 100644 --- a/statistics/handle/cache/internal/mapcache/map_cache.go +++ b/statistics/handle/cache/internal/mapcache/map_cache.go @@ -129,10 +129,5 @@ func (m *MapCache) Copy() internal.StatsCacheInner { // SetCapacity implements StatsCacheInner func (*MapCache) SetCapacity(int64) {} -// Front implements StatsCacheInner -func (*MapCache) Front() int64 { - return 0 -} - // Close implements StatsCacheInner func (*MapCache) Close() {} diff --git a/statistics/handle/cache/statscacheinner.go b/statistics/handle/cache/statscacheinner.go index a9688e1a4fec9..3d60ff6ec01a0 100644 --- a/statistics/handle/cache/statscacheinner.go +++ b/statistics/handle/cache/statscacheinner.go @@ -170,11 +170,6 @@ func (sc *StatsCache) Version() uint64 { return sc.maxTblStatsVer.Load() } -// Front returns the front element's owner tableID, only used for test. -func (sc *StatsCache) Front() int64 { - return sc.c.Front() -} - // CopyAndUpdate copies a new cache and updates the new statistics table cache. It is only used in the COW mode. func (sc *StatsCache) CopyAndUpdate(tables []*statistics.Table, deletedIDs []int64, opts ...TableStatsOpt) *StatsCache { option := &TableStatsOption{}