Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: refactor some code of binding cache #58532

Merged
merged 1 commit into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/bindinfo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ go_library(
"//pkg/util/chunk",
"//pkg/util/hack",
"//pkg/util/hint",
"//pkg/util/intest",
"//pkg/util/parser",
"//pkg/util/sqlexec",
"//pkg/util/stringutil",
"@com_github_dgraph_io_ristretto//:ristretto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pkg_errors//:errors",
"@org_golang_x_sync//singleflight",
"@org_uber_go_zap//:zap",
],
)
Expand Down
103 changes: 4 additions & 99 deletions pkg/bindinfo/binding_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,16 @@
package bindinfo

import (
"errors"
"sync"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto"
"github.com/pingcap/tidb/pkg/bindinfo/internal/logutil"
"github.com/pingcap/tidb/pkg/bindinfo/norm"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/stringutil"
"go.uber.org/zap"
)

// GetBindingReturnNil is only for test
var GetBindingReturnNil = stringutil.StringerStr("GetBindingReturnNil")

// GetBindingReturnNilBool is only for test
var GetBindingReturnNilBool atomic.Bool

// GetBindingReturnNilAlways is only for test
var GetBindingReturnNilAlways = stringutil.StringerStr("getBindingReturnNilAlways")

// LoadBindingNothing is only for test
var LoadBindingNothing = stringutil.StringerStr("LoadBindingNothing")

// digestBiMap represents a bidirectional map between noDBDigest and sqlDigest, used to support cross-db binding.
// One noDBDigest can map to multiple sqlDigests, but one sqlDigest can only map to one noDBDigest.
type digestBiMap interface {
Expand Down Expand Up @@ -168,12 +148,9 @@ type BindingCache interface {
type bindingCache struct {
digestBiMap digestBiMap // mapping between noDBDigest and sqlDigest, used to support cross-db binding.
cache *ristretto.Cache // the underlying cache to store the bindings.

// loadBindingFromStorageFunc is used to load binding from storage if cache miss.
loadBindingFromStorageFunc func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)
}

func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error)) BindingCache {
func newBindCache() BindingCache {
cache, _ := ristretto.NewCache(&ristretto.Config{
NumCounters: 1e6,
MaxCost: variable.MemQuotaBindingCache.Load(),
Expand All @@ -185,100 +162,28 @@ func newBindCache(bindingLoad func(sctx sessionctx.Context, sqlDigest string) ([
IgnoreInternalCost: true,
})
c := bindingCache{
cache: cache,
digestBiMap: newDigestBiMap(),
loadBindingFromStorageFunc: bindingLoad,
cache: cache,
digestBiMap: newDigestBiMap(),
}
return &c
}

func (c *bindingCache) shouldMetric() bool {
return c.loadBindingFromStorageFunc != nil // only metric for GlobalBindingCache, whose loadBindingFromStorageFunc is not nil.
}

func (c *bindingCache) MatchingBinding(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool) {
matchedBinding, isMatched, missingSQLDigest := c.getFromMemory(sctx, noDBDigest, tableNames)
if len(missingSQLDigest) == 0 {
if c.shouldMetric() && isMatched {
metrics.BindingCacheHitCounter.Inc()
}
return
}
if c.shouldMetric() {
metrics.BindingCacheMissCounter.Inc()
}
if c.loadBindingFromStorageFunc == nil {
return
}
c.loadFromStore(sctx, missingSQLDigest) // loadFromStore's SetBinding has a Mutex inside, so it's safe to call it without lock
matchedBinding, isMatched, _ = c.getFromMemory(sctx, noDBDigest, tableNames)
return
}

func (c *bindingCache) getFromMemory(sctx sessionctx.Context, noDBDigest string, tableNames []*ast.TableName) (matchedBinding *Binding, isMatched bool, missingSQLDigest []string) {
if c.Size() == 0 {
return
}
possibleBindings := make([]*Binding, 0, 2)
for _, sqlDigest := range c.digestBiMap.NoDBDigest2SQLDigest(noDBDigest) {
binding := c.GetBinding(sqlDigest)
if intest.InTest {
if sctx.Value(GetBindingReturnNil) != nil {
if GetBindingReturnNilBool.CompareAndSwap(false, true) {
binding = nil
}
}
if sctx.Value(GetBindingReturnNilAlways) != nil {
binding = nil
}
}
if binding != nil {
possibleBindings = append(possibleBindings, binding)
} else {
missingSQLDigest = append(missingSQLDigest, sqlDigest)
}
}
if len(missingSQLDigest) != 0 {
return
// TODO: handle cache miss safely.
}
matchedBinding, isMatched = crossDBMatchBindings(sctx, tableNames, possibleBindings)
return
}

func (c *bindingCache) loadFromStore(sctx sessionctx.Context, missingSQLDigest []string) {
if intest.InTest && sctx.Value(LoadBindingNothing) != nil {
return
}
defer func(start time.Time) {
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("loading binding from storage takes " + time.Since(start).String()))
}(time.Now())

for _, sqlDigest := range missingSQLDigest {
start := time.Now()
bindings, err := c.loadBindingFromStorageFunc(sctx, sqlDigest)
if err != nil {
logutil.BindLogger().Warn("failed to load binding from storage",
zap.String("sqlDigest", sqlDigest),
zap.Error(err),
zap.Duration("duration", time.Since(start)),
)
continue
}
// put binding into the cache
oldBinding := c.GetBinding(sqlDigest)
cachedBinding := pickCachedBinding(oldBinding, bindings...)
if cachedBinding != nil {
err = c.SetBinding(sqlDigest, cachedBinding)
if err != nil {
// When the memory capacity of bing_cache is not enough,
// there will be some memory-related errors in multiple places.
// Only needs to be handled once.
logutil.BindLogger().Warn("update binding cache error", zap.Error(err))
}
}
}
}

// GetBinding gets the Bindings from the cache.
// The return value is not read-only, but it shouldn't be changed in the caller functions.
// The function is thread-safe.
Expand Down
4 changes: 2 additions & 2 deletions pkg/bindinfo/binding_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func bindingNoDBDigest(t *testing.T, b *Binding) string {
}

func TestCrossDBBindingCache(t *testing.T) {
fbc := newBindCache(nil).(*bindingCache)
fbc := newBindCache().(*bindingCache)
b1 := &Binding{BindSQL: "SELECT * FROM db1.t1", SQLDigest: "b1"}
fDigest1 := bindingNoDBDigest(t, b1)
b2 := &Binding{BindSQL: "SELECT * FROM db2.t1", SQLDigest: "b2"}
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestBindCache(t *testing.T) {
variable.MemQuotaBindingCache.Store(v)
}(variable.MemQuotaBindingCache.Load())
variable.MemQuotaBindingCache.Store(int64(kvSize*3) - 1)
bindCache := newBindCache(nil)
bindCache := newBindCache()
defer bindCache.Close()

err := bindCache.SetBinding("digest1", binding)
Expand Down
89 changes: 8 additions & 81 deletions pkg/bindinfo/global_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
utilparser "github.com/pingcap/tidb/pkg/util/parser"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)

// GlobalBindingHandle is used to handle all global sql bind operations.
Expand All @@ -67,9 +66,6 @@ type GlobalBindingHandle interface {

// Methods for load and clear global sql bindings.

// Reset is to reset the BindHandle and clean old info.
Reset()

// LoadFromStorageToCache loads global bindings from storage to the memory cache.
LoadFromStorageToCache(fullLoad bool) (err error)

Expand All @@ -78,17 +74,14 @@ type GlobalBindingHandle interface {

// Methods for memory control.

// SetBindingCacheCapacity reset the capacity for the bindingCache.
SetBindingCacheCapacity(capacity int64)

// GetMemUsage returns the memory usage for the bind cache.
GetMemUsage() (memUsage int64)

// GetMemCapacity returns the memory capacity for the bind cache.
GetMemCapacity() (memCapacity int64)

// Close closes the binding cache.
CloseCache()
// Close closes the binding handler.
Close()

variable.Statistics
}
Expand All @@ -102,9 +95,6 @@ type globalBindingHandle struct {
// lastTaskTime records the last update time for the global sql bind cache.
// This value is used to avoid reload duplicated bindings from storage.
lastUpdateTime atomic.Value

// syncBindingSingleflight is used to synchronize the execution of `LoadFromStorageToCache` method.
syncBindingSingleflight singleflight.Group
}

// Lease influences the duration of loading bind info and handling invalid bind.
Expand Down Expand Up @@ -132,16 +122,11 @@ const (

// NewGlobalBindingHandle creates a new GlobalBindingHandle.
func NewGlobalBindingHandle(sPool util.SessionPool) GlobalBindingHandle {
handle := &globalBindingHandle{sPool: sPool}
handle.Reset()
return handle
}

// Reset is to reset the BindHandle and clean old info.
func (h *globalBindingHandle) Reset() {
h := &globalBindingHandle{sPool: sPool}
h.lastUpdateTime.Store(types.ZeroTimestamp)
h.bindingCache = newBindCache(h.LoadBindingsFromStorage)
h.bindingCache = newBindCache()
variable.RegisterStatistics(h)
return h
}

func (h *globalBindingHandle) getLastUpdateTime() types.Time {
Expand Down Expand Up @@ -410,12 +395,6 @@ func (h *globalBindingHandle) GetAllGlobalBindings() (bindings []*Binding) {
return h.bindingCache.GetAllBindings()
}

// SetBindingCacheCapacity reset the capacity for the bindingCache.
// It will not affect already cached Bindings.
func (h *globalBindingHandle) SetBindingCacheCapacity(capacity int64) {
h.bindingCache.SetMemCapacity(capacity)
}

// GetMemUsage returns the memory usage for the bind cache.
func (h *globalBindingHandle) GetMemUsage() (memUsage int64) {
return h.bindingCache.GetMemUsage()
Expand Down Expand Up @@ -609,62 +588,10 @@ func (h *globalBindingHandle) Stats(_ *variable.SessionVars) (map[string]any, er
return m, nil
}

// Close closes the binding cache.
func (h *globalBindingHandle) CloseCache() {
// Close closes the binding handler.
func (h *globalBindingHandle) Close() {
h.bindingCache.Close()
}

// LoadBindingsFromStorageToCache loads global bindings from storage to the memory cache.
func (h *globalBindingHandle) LoadBindingsFromStorage(sctx sessionctx.Context, sqlDigest string) ([]*Binding, error) {
if sqlDigest == "" {
return nil, nil
}
timeout := time.Duration(sctx.GetSessionVars().LoadBindingTimeout) * time.Millisecond
resultChan := h.syncBindingSingleflight.DoChan(sqlDigest, func() (any, error) {
return h.loadBindingsFromStorageInternal(sqlDigest)
})
select {
case result := <-resultChan:
if result.Err != nil {
return nil, result.Err
}
bindings := result.Val
if bindings == nil {
return nil, nil
}
return bindings.([]*Binding), nil
case <-time.After(timeout):
return nil, errors.New("load bindings from storage timeout")
}
}

func (h *globalBindingHandle) loadBindingsFromStorageInternal(sqlDigest string) (any, error) {
failpoint.Inject("load_bindings_from_storage_internal_timeout", func() {
time.Sleep(time.Second)
})
var bindings []*Binding
selectStmt := fmt.Sprintf("SELECT original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source, sql_digest, plan_digest FROM mysql.bind_info where sql_digest = '%s'", sqlDigest)
err := h.callWithSCtx(false, func(sctx sessionctx.Context) error {
rows, _, err := execRows(sctx, selectStmt)
if err != nil {
return err
}
bindings = make([]*Binding, 0, len(rows))
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
_, binding, err := newBinding(sctx, row)
if err != nil {
logutil.BindLogger().Warn("failed to generate bind record from data row", zap.Error(err))
continue
}
bindings = append(bindings, binding)
}
return nil
})
return bindings, err
h.sPool.Close()
}

// exec is a helper function to execute sql and return RecordSet.
Expand Down
2 changes: 1 addition & 1 deletion pkg/bindinfo/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 19,
shard_count = 18,
deps = [
"//pkg/bindinfo",
"//pkg/bindinfo/internal",
Expand Down
Loading