Skip to content

Commit

Permalink
bindinfo: add warning message when the memory usage of the cache exce…
Browse files Browse the repository at this point in the history
…eds its capacity (#32866)

ref #32466
  • Loading branch information
Reminiscent authored Mar 15, 2022
1 parent 403dcfd commit 911cf53
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 51 deletions.
40 changes: 27 additions & 13 deletions bindinfo/bind_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package bindinfo

import (
"errors"
"sync"

"github.com/cznic/mathutil"
Expand Down Expand Up @@ -54,7 +55,7 @@ func newBindCache() *bindCache {
cache := kvcache.NewSimpleLRUCache(mathutil.MaxUint, 0, 0)
c := bindCache{
cache: cache,
memCapacity: variable.MemQuotaBindCache.Load(),
memCapacity: variable.MemQuotaBindingCache.Load(),
memTracker: memory.NewTracker(memory.LabelForBindCache, -1),
}
return &c
Expand Down Expand Up @@ -91,26 +92,30 @@ func (c *bindCache) getCopiedVal(key bindCacheKey) []*BindRecord {

// set inserts an item to the cache. It's not thread-safe.
// Only other functions of the bindCache can use this function.
func (c *bindCache) set(key bindCacheKey, value []*BindRecord) bool {
// The set operation will return error message when the memory usage of binding_cache exceeds its capacity.
func (c *bindCache) set(key bindCacheKey, value []*BindRecord) (ok bool, err error) {
mem := calcBindCacheKVMem(key, value)
if mem > c.memCapacity { // ignore this kv pair if its size is too large
return false
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
return
}
bindRecords := c.get(key)
if bindRecords != nil {
// Remove the origin key-value pair.
mem -= calcBindCacheKVMem(key, bindRecords)
}
for mem+c.memTracker.BytesConsumed() > c.memCapacity {
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
evictedKey, evictedValue, evicted := c.cache.RemoveOldest()
if !evicted {
return false
return
}
c.memTracker.Consume(-calcBindCacheKVMem(evictedKey.(bindCacheKey), evictedValue.([]*BindRecord)))
}
c.memTracker.Consume(mem)
c.cache.Put(key, value)
return true
ok = true
return
}

// delete remove an item from the cache. It's not thread-safe.
Expand All @@ -121,8 +126,9 @@ func (c *bindCache) delete(key bindCacheKey) bool {
mem := calcBindCacheKVMem(key, bindRecords)
c.cache.Delete(key)
c.memTracker.Consume(-mem)
return true
}
return true
return false
}

// GetBindRecord gets the BindRecord from the cache.
Expand Down Expand Up @@ -156,7 +162,7 @@ func (c *bindCache) GetAllBindRecords() []*BindRecord {

// SetBindRecord sets the BindRecord to the cache.
// The function is thread-safe.
func (c *bindCache) SetBindRecord(hash string, meta *BindRecord) {
func (c *bindCache) SetBindRecord(hash string, meta *BindRecord) (err error) {
c.lock.Lock()
defer c.lock.Unlock()
cacheKey := bindCacheKey(hash)
Expand All @@ -166,7 +172,8 @@ func (c *bindCache) SetBindRecord(hash string, meta *BindRecord) {
metas[i] = meta
}
}
c.set(cacheKey, []*BindRecord{meta})
_, err = c.set(cacheKey, []*BindRecord{meta})
return
}

// RemoveBindRecord removes the BindRecord which has same originSQL with specified BindRecord.
Expand All @@ -191,7 +198,9 @@ func (c *bindCache) RemoveBindRecord(hash string, meta *BindRecord) {
}
}
}
c.set(bindCacheKey(hash), metas)
// This function can guarantee the memory usage for the cache will never grow up.
// So we don't need to handle the return value here.
_, _ = c.set(bindCacheKey(hash), metas)
}

// SetMemCapacity sets the memory capacity for the cache.
Expand Down Expand Up @@ -221,17 +230,22 @@ func (c *bindCache) GetMemCapacity() int64 {

// Copy copies a new bindCache from the origin cache.
// The function is thread-safe.
func (c *bindCache) Copy() *bindCache {
func (c *bindCache) Copy() (newCache *bindCache, err error) {
c.lock.Lock()
defer c.lock.Unlock()
newCache := newBindCache()
newCache = newBindCache()
if c.memTracker.BytesConsumed() > newCache.GetMemCapacity() {
err = errors.New("The memory usage of all available bindings exceeds the cache's mem quota. As a result, all available bindings cannot be held on the cache. Please increase the value of the system variable 'tidb_mem_quota_binding_cache' and execute 'admin reload bindings' to ensure that all bindings exist in the cache and can be used normally")
}
keys := c.cache.Keys()
for _, key := range keys {
cacheKey := key.(bindCacheKey)
v := c.get(cacheKey)
bindRecords := make([]*BindRecord, len(v))
copy(bindRecords, v)
newCache.set(cacheKey, bindRecords)
// The memory usage of cache has been handled at the beginning of this function.
// So we don't need to handle the return value here.
_, _ = newCache.set(cacheKey, bindRecords)
}
return newCache
return newCache, err
}
26 changes: 21 additions & 5 deletions bindinfo/bind_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,55 @@ import (
)

func TestBindCache(t *testing.T) {
variable.MemQuotaBindCache.Store(100)
variable.MemQuotaBindingCache.Store(200)
bindCache := newBindCache()

value := make([][]*BindRecord, 3)
key := make([]bindCacheKey, 3)
var bigKey string
for i := 0; i < 3; i++ {
cacheKey := strings.Repeat(strconv.Itoa(i), 50)
key[i] = bindCacheKey(hack.Slice(cacheKey))
record := &BindRecord{OriginalSQL: cacheKey, Db: ""}
value[i] = []*BindRecord{record}
bigKey += cacheKey

require.Equal(t, int64(100), calcBindCacheKVMem(key[i], value[i]))
}

ok := bindCache.set(key[0], value[0])
ok, err := bindCache.set(key[0], value[0])
require.True(t, ok)
require.Nil(t, err)
result := bindCache.get(key[0])
require.NotNil(t, result)

ok = bindCache.set(key[1], value[1])
ok, err = bindCache.set(key[1], value[1])
require.True(t, ok)
require.Nil(t, err)
result = bindCache.get(key[1])
require.NotNil(t, result)

ok = bindCache.set(key[2], value[2])
ok, err = bindCache.set(key[2], value[2])
require.True(t, ok)
require.NotNil(t, err)
result = bindCache.get(key[2])
require.NotNil(t, result)

// Both key[0] and key[1] are not in the cache
// key[0] is not in the cache
result = bindCache.get(key[0])
require.Nil(t, result)

// key[1] is still in the cache
result = bindCache.get(key[1])
require.NotNil(t, result)

bigBindCacheKey := bindCacheKey(hack.Slice(bigKey))
bigRecord := &BindRecord{OriginalSQL: bigKey, Db: ""}
bigBindCacheValue := []*BindRecord{bigRecord}
require.Equal(t, int64(300), calcBindCacheKVMem(bigBindCacheKey, bigBindCacheValue))
ok, err = bindCache.set(bigBindCacheKey, bigBindCacheValue)
require.False(t, ok)
require.NotNil(t, err)
result = bindCache.get(bigBindCacheKey)
require.Nil(t, result)
}
43 changes: 36 additions & 7 deletions bindinfo/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,18 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
return err
}

newCache := h.bindInfo.Value.Load().(*bindCache).Copy()
newCache, memExceededErr := h.bindInfo.Value.Load().(*bindCache).Copy()
defer func() {
h.bindInfo.lastUpdateTime = lastUpdateTime
h.bindInfo.Value.Store(newCache)
h.bindInfo.Unlock()
}()

for _, row := range rows {
// If the memory usage of the binding_cache exceeds its capacity, we will break and do not handle.
if memExceededErr != nil {
break
}
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
Expand All @@ -171,12 +175,21 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
oldRecord := newCache.GetBindRecord(hash, meta.OriginalSQL, meta.Db)
newRecord := merge(oldRecord, meta).removeDeletedBindings()
if len(newRecord.Bindings) > 0 {
newCache.SetBindRecord(hash, newRecord)
err = newCache.SetBindRecord(hash, newRecord)
if err != nil {
memExceededErr = err
}
} else {
newCache.RemoveBindRecord(hash, newRecord)
}
updateMetrics(metrics.ScopeGlobal, oldRecord, newCache.GetBindRecord(hash, meta.OriginalSQL, meta.Db), true)
}
if memExceededErr != nil {
// When the memory capacity of bing_cache is not enough,
// there will be some memory-related errors in multiple places.
// Only needs to be handled once.
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err))
}
return nil
}

Expand Down Expand Up @@ -580,27 +593,43 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord,
// it will be overridden.
func (h *BindHandle) setBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(*bindCache).Copy()
newCache, err0 := h.bindInfo.Value.Load().(*bindCache).Copy()
if err0 != nil {
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err0))
}
oldRecord := newCache.GetBindRecord(hash, meta.OriginalSQL, meta.Db)
newCache.SetBindRecord(hash, meta)
err1 := newCache.SetBindRecord(hash, meta)
if err1 != nil && err0 == nil {
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err1))
}
h.bindInfo.Value.Store(newCache)
updateMetrics(metrics.ScopeGlobal, oldRecord, meta, false)
}

// appendBindRecord addes the BindRecord to the cache, all the stale BindRecords are
// removed from the cache after this operation.
func (h *BindHandle) appendBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(*bindCache).Copy()
newCache, err0 := h.bindInfo.Value.Load().(*bindCache).Copy()
if err0 != nil {
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err0))
}
oldRecord := newCache.GetBindRecord(hash, meta.OriginalSQL, meta.Db)
newRecord := merge(oldRecord, meta)
newCache.SetBindRecord(hash, newRecord)
err1 := newCache.SetBindRecord(hash, newRecord)
if err1 != nil && err0 == nil {
// Only need to handle the error once.
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err1))
}
h.bindInfo.Value.Store(newCache)
updateMetrics(metrics.ScopeGlobal, oldRecord, newRecord, false)
}

// removeBindRecord removes the BindRecord from the cache.
func (h *BindHandle) removeBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(*bindCache).Copy()
newCache, err := h.bindInfo.Value.Load().(*bindCache).Copy()
if err != nil {
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err))
}
oldRecord := newCache.GetBindRecord(hash, meta.OriginalSQL, meta.Db)
newCache.RemoveBindRecord(hash, meta)
h.bindInfo.Value.Store(newCache)
Expand Down
13 changes: 11 additions & 2 deletions bindinfo/session_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// SessionHandle is used to handle all session sql bind operations.
Expand All @@ -42,7 +44,10 @@ func NewSessionBindHandle(parser *parser.Parser) *SessionHandle {
// removed from the cache after this operation.
func (h *SessionHandle) appendBindRecord(hash string, meta *BindRecord) {
oldRecord := h.ch.GetBindRecord(hash, meta.OriginalSQL, meta.Db)
h.ch.SetBindRecord(hash, meta)
err := h.ch.SetBindRecord(hash, meta)
if err != nil {
logutil.BgLogger().Warn("[sql-bind] ", zap.Error(err))
}
updateMetrics(metrics.ScopeSession, oldRecord, meta, false)
}

Expand Down Expand Up @@ -80,7 +85,11 @@ func (h *SessionHandle) DropBindRecord(originalSQL, db string, binding *Binding)
} else {
newRecord = record
}
h.ch.SetBindRecord(hash, newRecord)
err := h.ch.SetBindRecord(hash, newRecord)
if err != nil {
// Should never reach here, just return an error for safety
return err
}
updateMetrics(metrics.ScopeSession, oldRecord, newRecord, false)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (do *Domain) rebuildSysVarCache(ctx sessionctx.Context) error {
func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
var err error
switch name {
case variable.TiDBMemQuotaBindCache:
variable.MemQuotaBindCache.Store(variable.TidbOptInt64(sVal, variable.DefTiDBMemQuotaBindCache))
case variable.TiDBMemQuotaBindingCache:
variable.MemQuotaBindingCache.Store(variable.TidbOptInt64(sVal, variable.DefTiDBMemQuotaBindingCache))
case variable.TiDBTSOClientBatchMaxWaitTime:
var val float64
val, err = strconv.ParseFloat(sVal, 64)
Expand Down
18 changes: 9 additions & 9 deletions executor/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,15 @@ func TestSetVar(t *testing.T) {
tk.MustQuery(`select @@tidb_mem_quota_apply_cache`).Check(testkit.Rows("123"))

// test for tidb_mem_quota_bind_cache
defVal = fmt.Sprintf("%v", variable.DefTiDBMemQuotaBindCache)
tk.MustQuery(`select @@tidb_mem_quota_bind_cache`).Check(testkit.Rows(defVal))
tk.MustExec(`set global tidb_mem_quota_bind_cache = 1`)
tk.MustQuery(`select @@global.tidb_mem_quota_bind_cache`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_mem_quota_bind_cache = 0`)
tk.MustQuery(`select @@global.tidb_mem_quota_bind_cache`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_mem_quota_bind_cache = 123`)
tk.MustQuery(`select @@global.tidb_mem_quota_bind_cache`).Check(testkit.Rows("123"))
tk.MustQuery(`select @@global.tidb_mem_quota_bind_cache`).Check(testkit.Rows("123"))
defVal = fmt.Sprintf("%v", variable.DefTiDBMemQuotaBindingCache)
tk.MustQuery(`select @@tidb_mem_quota_binding_cache`).Check(testkit.Rows(defVal))
tk.MustExec(`set global tidb_mem_quota_binding_cache = 1`)
tk.MustQuery(`select @@global.tidb_mem_quota_binding_cache`).Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_mem_quota_binding_cache = 0`)
tk.MustQuery(`select @@global.tidb_mem_quota_binding_cache`).Check(testkit.Rows("0"))
tk.MustExec(`set global tidb_mem_quota_binding_cache = 123`)
tk.MustQuery(`select @@global.tidb_mem_quota_binding_cache`).Check(testkit.Rows("123"))
tk.MustQuery(`select @@global.tidb_mem_quota_binding_cache`).Check(testkit.Rows("123"))

// test for tidb_enable_parallel_apply
tk.MustQuery(`select @@tidb_enable_parallel_apply`).Check(testkit.Rows("0"))
Expand Down
12 changes: 6 additions & 6 deletions executor/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1747,8 +1747,8 @@ func TestShowBindingCache(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int, b int)")
tk.MustExec(`set global tidb_mem_quota_bind_cache = 1`)
tk.MustQuery("select @@global.tidb_mem_quota_bind_cache").Check(testkit.Rows("1"))
tk.MustExec(`set global tidb_mem_quota_binding_cache = 1`)
tk.MustQuery("select @@global.tidb_mem_quota_binding_cache").Check(testkit.Rows("1"))
tk.MustExec("admin reload bindings;")
res := tk.MustQuery("show global bindings")
require.Equal(t, 0, len(res.Rows()))
Expand All @@ -1757,8 +1757,8 @@ func TestShowBindingCache(t *testing.T) {
res = tk.MustQuery("show global bindings")
require.Equal(t, 0, len(res.Rows()))

tk.MustExec(`set global tidb_mem_quota_bind_cache = default`)
tk.MustQuery("select @@global.tidb_mem_quota_bind_cache").Check(testkit.Rows("67108864"))
tk.MustExec(`set global tidb_mem_quota_binding_cache = default`)
tk.MustQuery("select @@global.tidb_mem_quota_binding_cache").Check(testkit.Rows("67108864"))
tk.MustExec("admin reload bindings")
res = tk.MustQuery("show global bindings")
require.Equal(t, 1, len(res.Rows()))
Expand Down Expand Up @@ -1791,8 +1791,8 @@ func TestShowBindingCacheStatus(t *testing.T) {
tk.MustQuery("show binding_cache status").Check(testkit.Rows(
"1 1 159 Bytes 64 MB"))

tk.MustExec(`set global tidb_mem_quota_bind_cache = 250`)
tk.MustQuery(`select @@global.tidb_mem_quota_bind_cache`).Check(testkit.Rows("250"))
tk.MustExec(`set global tidb_mem_quota_binding_cache = 250`)
tk.MustQuery(`select @@global.tidb_mem_quota_binding_cache`).Check(testkit.Rows("250"))
tk.MustExec("admin reload bindings;")
tk.MustExec("create global binding for select * from t where a > 1 using select * from t where a > 1")
result = tk.MustQuery("show global bindings")
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,10 +605,10 @@ var defaultSysVars = []*SysVar{
s.MemQuotaApplyCache = TidbOptInt64(val, DefTiDBMemQuotaApplyCache)
return nil
}},
{Scope: ScopeGlobal, Name: TiDBMemQuotaBindCache, Value: strconv.FormatInt(DefTiDBMemQuotaBindCache, 10), Type: TypeUnsigned, MaxValue: math.MaxInt32, GetGlobal: func(sv *SessionVars) (string, error) {
return strconv.FormatInt(MemQuotaBindCache.Load(), 10), nil
{Scope: ScopeGlobal, Name: TiDBMemQuotaBindingCache, Value: strconv.FormatInt(DefTiDBMemQuotaBindingCache, 10), Type: TypeUnsigned, MaxValue: math.MaxInt32, GetGlobal: func(sv *SessionVars) (string, error) {
return strconv.FormatInt(MemQuotaBindingCache.Load(), 10), nil
}, SetGlobal: func(s *SessionVars, val string) error {
MemQuotaBindCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindCache))
MemQuotaBindingCache.Store(TidbOptInt64(val, DefTiDBMemQuotaBindingCache))
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBBackoffLockFast, Value: strconv.Itoa(tikvstore.DefBackoffLockFast), Type: TypeUnsigned, MinValue: 1, MaxValue: math.MaxInt32, SetSession: func(s *SessionVars, val string) error {
Expand Down
Loading

0 comments on commit 911cf53

Please sign in to comment.