Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add generics lfu #51352

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions pkg/statistics/handle/cache/internal/lfu/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "lfu",
srcs = [
"key_set.go",
"key_set_shard.go",
"lfu_cache.go",
],
srcs = ["lfu_cache.go"],
importpath = "github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/lfu",
visibility = ["//pkg/statistics/handle/cache:__subpackages__"],
deps = [
"//pkg/statistics",
"//pkg/statistics/handle/cache/internal",
"//pkg/statistics/handle/cache/internal/metrics",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"//pkg/util/lfu",
"@com_github_dgraph_io_ristretto//:ristretto",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//rand",
"@org_uber_go_zap//:zap",
],
)

Expand Down
206 changes: 34 additions & 172 deletions pkg/statistics/handle/cache/internal/lfu/lfu_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,223 +15,96 @@
package lfu

import (
"sync"
"sync/atomic"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/cache/internal"
"github.com/pingcap/tidb/pkg/statistics/handle/cache/internal/metrics"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"go.uber.org/zap"
"golang.org/x/exp/rand"
"github.com/pingcap/tidb/pkg/util/lfu"
)

// LFU is a LFU based on the ristretto.Cache
type LFU struct {
cache *ristretto.Cache
// This is a secondary cache layer used to store all tables,
// including those that have been evicted from the primary cache.
resultKeySet *keySetShard
cost atomic.Int64
closed atomic.Bool
closeOnce sync.Once
cache *lfu.LFU[int64, *statistics.Table]
}

// NewLFU creates a new LFU cache.
func NewLFU(totalMemCost int64) (*LFU, error) {
cost, err := adjustMemCost(totalMemCost)
if err != nil {
return nil, err
}
if intest.InTest && totalMemCost == 0 {
// In test, we set the cost to 5MB to avoid using too many memory in the LFU's CM sketch.
cost = 5000000
}
metrics.CapacityGauge.Set(float64(cost))
result := &LFU{}
bufferItems := int64(64)

cache, err := ristretto.NewCache(
&ristretto.Config{
NumCounters: max(min(cost/128, 1_000_000), 10), // assume the cost per table stats is 128
MaxCost: cost,
BufferItems: bufferItems,
OnEvict: result.onEvict,
OnExit: result.onExit,
OnReject: result.onReject,
IgnoreInternalCost: intest.InTest,
Metrics: intest.InTest,
},
)
cache, err := lfu.NewLFU[int64, *statistics.Table](totalMemCost, DropEvicted, metrics.CapacityGauge)
if err != nil {
return nil, err
}
result.cache = cache
result.resultKeySet = newKeySetShard()
return result, err
}

// adjustMemCost adjusts the memory cost according to the total memory cost.
// When the total memory cost is 0, the memory cost is set to half of the total memory.
func adjustMemCost(totalMemCost int64) (result int64, err error) {
if totalMemCost == 0 {
memTotal, err := memory.MemTotal()
if err != nil {
return 0, err
}
return int64(memTotal / 2), nil
}
return totalMemCost, nil
cache.RegisterMissCounter(metrics.MissCounter)
cache.RegisterHitCounter(metrics.HitCounter)
cache.RegisterUpdateCounter(metrics.UpdateCounter)
cache.RegisterDelCounter(metrics.DelCounter)
cache.RegisterEvictCounter(metrics.EvictCounter)
cache.RegisterRejectCounter(metrics.RejectCounter)
cache.RegisterCostGauge(metrics.CostGauge)
return &LFU{
cache: cache,
}, nil
}

// Get implements statsCacheInner
func (s *LFU) Get(tid int64) (*statistics.Table, bool) {
result, ok := s.cache.Get(tid)
if !ok {
return s.resultKeySet.Get(tid)
}
return result.(*statistics.Table), ok
return s.cache.Get(tid)
}

// Put implements statsCacheInner
func (s *LFU) Put(tblID int64, tbl *statistics.Table) bool {
cost := tbl.MemoryUsage().TotalTrackingMemUsage()
s.resultKeySet.AddKeyValue(tblID, tbl)
s.addCost(cost)
return s.cache.Set(tblID, tbl, cost)
return s.cache.Put(tblID, tbl)
}

// Del implements statsCacheInner
func (s *LFU) Del(tblID int64) {
s.cache.Del(tblID)
s.resultKeySet.Remove(tblID)
}

// Cost implements statsCacheInner
func (s *LFU) Cost() int64 {
return s.cost.Load()
return s.cache.Cost()
}

// Values implements statsCacheInner
func (s *LFU) Values() []*statistics.Table {
result := make([]*statistics.Table, 0, 512)
for _, k := range s.resultKeySet.Keys() {
if value, ok := s.resultKeySet.Get(k); ok {
result = append(result, value)
}
}
return result
return s.Values()
}

// DropEvicted drop stats for table column/index
func DropEvicted(item statistics.TableCacheItem) {
if !item.IsStatsInitialized() ||
item.GetEvictedStatus() == statistics.AllEvicted {
return
}
item.DropUnnecessaryData()
}

func (s *LFU) onReject(item *ristretto.Item) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("panic in onReject", zap.Any("error", r), zap.Stack("stack"))
}
}()
s.dropMemory(item)
metrics.RejectCounter.Inc()
}

func (s *LFU) onEvict(item *ristretto.Item) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("panic in onEvict", zap.Any("error", r), zap.Stack("stack"))
}
}()
s.dropMemory(item)
metrics.EvictCounter.Inc()
}

func (s *LFU) dropMemory(item *ristretto.Item) {
if item.Value == nil {
// Sometimes the same key may be passed to the "onEvict/onExit"
// function twice, and in the second invocation, the value is empty,
// so it should not be processed.
return
}
if s.closed.Load() {
return
}
// We do not need to calculate the cost during onEvict,
// because the onexit function is also called when the evict event occurs.
// TODO(hawkingrei): not copy the useless part.
table := item.Value.(*statistics.Table).Copy()
for _, column := range table.Columns {
DropEvicted(column)
func DropEvicted(table any) {
t := table.(*statistics.Table)
for _, column := range t.Columns {
dropEvicted(column)
}
for _, indix := range table.Indices {
DropEvicted(indix)
for _, indix := range t.Indices {
dropEvicted(indix)
}
s.resultKeySet.AddKeyValue(int64(item.Key), table)
after := table.MemoryUsage().TotalTrackingMemUsage()
// why add before again? because the cost will be subtracted in onExit.
// in fact, it is after - before
s.addCost(after)
s.triggerEvict()
}

func (s *LFU) triggerEvict() {
// When the memory usage of the cache exceeds the maximum value, Many item need to evict. But
// ristretto'c cache execute the evict operation when to write the cache. for we can evict as soon as possible,
// we will write some fake item to the cache. fake item have a negative key, and the value is nil.
if s.Cost() > s.cache.MaxCost() {
//nolint: gosec
s.cache.Set(-rand.Int(), nil, 0)
}
}

func (s *LFU) onExit(val any) {
defer func() {
if r := recover(); r != nil {
logutil.BgLogger().Warn("panic in onExit", zap.Any("error", r), zap.Stack("stack"))
}
}()
if val == nil {
// Sometimes the same key may be passed to the "onEvict/onExit" function twice,
// and in the second invocation, the value is empty, so it should not be processed.
return
}
if s.closed.Load() {
// dropEvicted drop stats for table column/index
func dropEvicted(item statistics.TableCacheItem) {
if !item.IsStatsInitialized() ||
item.GetEvictedStatus() == statistics.AllEvicted {
return
}
// Subtract the memory usage of the table from the total memory usage.
s.addCost(-val.(*statistics.Table).MemoryUsage().TotalTrackingMemUsage())
item.DropUnnecessaryData()
}

// Len implements statsCacheInner
func (s *LFU) Len() int {
return s.resultKeySet.Len()
return s.cache.Len()
}

// Copy implements statsCacheInner
func (s *LFU) Copy() internal.StatsCacheInner {
return s
cache := s.cache.Copy()
return &LFU{cache: cache}
}

// SetCapacity implements statsCacheInner
func (s *LFU) SetCapacity(maxCost int64) {
cost, err := adjustMemCost(maxCost)
if err != nil {
logutil.BgLogger().Warn("adjustMemCost failed", zap.Error(err))
return
}
s.cache.UpdateMaxCost(cost)
s.triggerEvict()
metrics.CapacityGauge.Set(float64(cost))
metrics.CostGauge.Set(float64(s.Cost()))
s.cache.SetCapacity(maxCost)
}

// wait blocks until all buffered writes have been applied. This ensures a call to Set()
Expand All @@ -241,26 +114,15 @@ func (s *LFU) wait() {
}

func (s *LFU) metrics() *ristretto.Metrics {
return s.cache.Metrics
return s.cache.Metrics()
}

// Close implements statsCacheInner
func (s *LFU) Close() {
s.closeOnce.Do(func() {
s.closed.Store(true)
s.Clear()
s.cache.Close()
s.cache.Wait()
})
s.cache.Close()
}

// Clear implements statsCacheInner
func (s *LFU) Clear() {
s.cache.Clear()
s.resultKeySet.Clear()
}

func (s *LFU) addCost(v int64) {
newv := s.cost.Add(v)
metrics.CostGauge.Set(float64(newv))
}
13 changes: 13 additions & 0 deletions pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ type Table struct {
TblInfoUpdateTS uint64
}

// DeepCopy implements the interface of LFU' key.
func (t *Table) DeepCopy() any {
return t.Copy()
}

// ExtendedStatsItem is the cached item of a mysql.stats_extended record.
type ExtendedStatsItem struct {
StringVals string
Expand Down Expand Up @@ -277,6 +282,14 @@ func (t *Table) MemoryUsage() *TableMemoryUsage {
return tMemUsage
}

// TotalTrackingMemUsage return Total Tracking Mem Usage
func (t *Table) TotalTrackingMemUsage() int64 {
if t == nil {
return 0
}
return t.MemoryUsage().TotalTrackingMemUsage()
}

// Copy copies the current table.
func (t *Table) Copy() *Table {
newHistColl := HistColl{
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/lfu/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "lfu",
srcs = [
"key.go",
"key_set.go",
"key_set_shard.go",
"lfu_cache.go",
],
importpath = "github.com/pingcap/tidb/pkg/util/lfu",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/memory",
"@com_github_cespare_xxhash_v2//:xxhash",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_prometheus_client_golang//prometheus",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//rand",
"@org_uber_go_zap//:zap",
],
)
Loading