Skip to content

Commit

Permalink
planner: ignore close-stmt to make more queries can hit plan cache (#…
Browse files Browse the repository at this point in the history
…31687)

* planner: ignore close-stmt to make more queries can hit plan cache (#31653)
  • Loading branch information
qw4990 authored Jan 14, 2022
1 parent 765219b commit 3633323
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 32 deletions.
9 changes: 6 additions & 3 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

preparedObj := &plannercore.CachedPrepareStmt{
PreparedStmtText: prepared.Stmt.Text(),
PreparedAst: prepared,
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalizedSQL,
Expand Down Expand Up @@ -314,9 +315,11 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {
prepared := preparedObj.PreparedAst
delete(vars.PreparedStmtNameToID, e.Name)
if plannercore.PreparedPlanCacheEnabled() {
e.ctx.PreparedPlanCache().Delete(plannercore.NewPSTMTPlanCacheKey(
vars, id, prepared.SchemaVersion,
))
key, err := plannercore.NewPSTMTPlanCacheKey(vars, preparedObj.PreparedStmtText, prepared.SchemaVersion, )
if err != nil {
return err
}
e.ctx.PreparedPlanCache().Delete(key)
}
vars.RemovePreparedStmt(id)
return nil
Expand Down
19 changes: 12 additions & 7 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -68,7 +69,7 @@ func PreparedPlanCacheEnabled() bool {
type pstmtPlanCacheKey struct {
database string
connID uint64
pstmtID uint32
preparedStmtText string
schemaVersion int64
sqlMode mysql.SQLMode
timezoneOffset int
Expand All @@ -90,7 +91,7 @@ func (key *pstmtPlanCacheKey) Hash() []byte {
}
key.hash = append(key.hash, dbBytes...)
key.hash = codec.EncodeInt(key.hash, int64(key.connID))
key.hash = codec.EncodeInt(key.hash, int64(key.pstmtID))
key.hash = append(key.hash, hack.Slice(key.preparedStmtText)...)
key.hash = codec.EncodeInt(key.hash, key.schemaVersion)
key.hash = codec.EncodeInt(key.hash, int64(key.sqlMode))
key.hash = codec.EncodeInt(key.hash, int64(key.timezoneOffset))
Expand All @@ -110,12 +111,12 @@ func (key *pstmtPlanCacheKey) Hash() []byte {

// SetPstmtIDSchemaVersion implements PstmtCacheKeyMutator interface to change pstmtID and schemaVersion of cacheKey.
// so we can reuse Key instead of new every time.
func SetPstmtIDSchemaVersion(key kvcache.Key, pstmtID uint32, schemaVersion int64, isolationReadEngines map[kv.StoreType]struct{}) {
func SetPstmtIDSchemaVersion(key kvcache.Key, preparedStmtText string, schemaVersion int64, isolationReadEngines map[kv.StoreType]struct{}) {
psStmtKey, isPsStmtKey := key.(*pstmtPlanCacheKey)
if !isPsStmtKey {
return
}
psStmtKey.pstmtID = pstmtID
psStmtKey.preparedStmtText = preparedStmtText
psStmtKey.schemaVersion = schemaVersion
psStmtKey.isolationReadEngines = make(map[kv.StoreType]struct{})
for k, v := range isolationReadEngines {
Expand All @@ -125,15 +126,18 @@ func SetPstmtIDSchemaVersion(key kvcache.Key, pstmtID uint32, schemaVersion int6
}

// NewPSTMTPlanCacheKey creates a new pstmtPlanCacheKey object.
func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, schemaVersion int64) kvcache.Key {
func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, preparedStmtText string, schemaVersion int64) (kvcache.Key, error) {
if preparedStmtText == "" {
return nil, errors.New("empty prepared statement text")
}
timezoneOffset := 0
if sessionVars.TimeZone != nil {
_, timezoneOffset = time.Now().In(sessionVars.TimeZone).Zone()
}
key := &pstmtPlanCacheKey{
database: sessionVars.CurrentDB,
connID: sessionVars.ConnectionID,
pstmtID: pstmtID,
preparedStmtText: preparedStmtText,
schemaVersion: schemaVersion,
sqlMode: sessionVars.SQLMode,
timezoneOffset: timezoneOffset,
Expand All @@ -143,7 +147,7 @@ func NewPSTMTPlanCacheKey(sessionVars *variable.SessionVars, pstmtID uint32, sch
for k, v := range sessionVars.IsolationReadEngines {
key.isolationReadEngines[k] = v
}
return key
return key, nil
}

// FieldSlice is the slice of the types.FieldType
Expand Down Expand Up @@ -199,6 +203,7 @@ func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*mod

// CachedPrepareStmt store prepared ast from PrepareExec and other related fields
type CachedPrepareStmt struct {
PreparedStmtText string
PreparedAst *ast.Prepared
VisitInfos []visitInfo
ColumnInfos interface{}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ func TestCacheKey(t *testing.T) {
ctx.GetSessionVars().SQLMode = mysql.ModeNone
ctx.GetSessionVars().TimeZone = time.UTC
ctx.GetSessionVars().ConnectionID = 0
key := NewPSTMTPlanCacheKey(ctx.GetSessionVars(), 1, 1)
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, key.Hash())
key, _ := NewPSTMTPlanCacheKey(ctx.GetSessionVars(), "select 1", 1)
require.Equal(t, []byte{0x74, 0x65, 0x73, 0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x73, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x20, 0x31, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x74, 0x69, 0x64, 0x62, 0x74, 0x69, 0x6b, 0x76, 0x74, 0x69, 0x66, 0x6c, 0x61, 0x73, 0x68, 0x7f, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, key.Hash())
}
19 changes: 13 additions & 6 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,11 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context,
stmtCtx.UseCache = prepared.UseCache
var cacheKey kvcache.Key
if prepared.UseCache {
cacheKey = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion)
var err error
cacheKey, err = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), preparedStmt.PreparedStmtText, prepared.SchemaVersion)
if err != nil {
return err
}
}
tps := make([]*types.FieldType, len(e.UsingVars))
for i, param := range e.UsingVars {
Expand Down Expand Up @@ -485,7 +489,10 @@ REBUILD:
// rebuild key to exclude kv.TiFlash when stmt is not read only
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
cacheKey = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion)
cacheKey, err = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), preparedStmt.PreparedStmtText, prepared.SchemaVersion)
if err != nil {
return err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
}
cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, tps)
Expand Down Expand Up @@ -622,7 +629,7 @@ func (e *Execute) rebuildRange(p Plan) error {
}
if x.HandleParam != nil {
var iv int64
iv, err = x.HandleParam.Datum.ToInt64(sc)
iv, err = sctx.GetSessionVars().PreparedParams[x.HandleParam.Order].ToInt64(sc)
if err != nil {
return err
}
Expand All @@ -631,7 +638,7 @@ func (e *Execute) rebuildRange(p Plan) error {
}
for i, param := range x.IndexValueParams {
if param != nil {
x.IndexValues[i] = param.Datum
x.IndexValues[i] = sctx.GetSessionVars().PreparedParams[param.Order]
}
}
return nil
Expand Down Expand Up @@ -669,7 +676,7 @@ func (e *Execute) rebuildRange(p Plan) error {
for i, param := range x.HandleParams {
if param != nil {
var iv int64
iv, err = param.Datum.ToInt64(sc)
iv, err = sctx.GetSessionVars().PreparedParams[param.Order].ToInt64(sc)
if err != nil {
return err
}
Expand All @@ -682,7 +689,7 @@ func (e *Execute) rebuildRange(p Plan) error {
}
for j, param := range params {
if param != nil {
x.IndexValues[i][j] = param.Datum
x.IndexValues[i][j] = sctx.GetSessionVars().PreparedParams[param.Order]
}
}
}
Expand Down
71 changes: 71 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,77 @@ type testPrepareSuite struct {
type testPrepareSerialSuite struct {
}

func (s *testPrepareSerialSuite) TestPrepareCacheKey(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec(`use test`)
tk.MustExec(`drop table if exists t`)
tk.MustExec(`create table t (a int)`)
tk.MustExec(`prepare st1 from 'select * from t'`)
tk.MustQuery(`execute st1`).Check(testkit.Rows())
tk.MustQuery(`execute st1`).Check(testkit.Rows())
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))

// st2 can use the plan of st1 directly since they have the same SQL
tk.MustExec(`prepare st2 from 'select * from t'`)
tk.MustQuery(`execute st2`).Check(testkit.Rows())
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
}

func (s *testPrepareSerialSuite) TestPreparePointGetWithDML(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec(`use test`)
tk.MustExec(`drop table if exists t`)
tk.MustExec(`create table t(a int, unique key(a))`)
tk.MustExec(`insert into t values(1), (2)`)

// txn1 left a cached plan
tk.MustExec(`begin`)
tk.MustExec(`prepare stmt from 'update t set a = ? where a = ?'`)
tk.MustExec(`set @a=1`)
tk.MustExec(`execute stmt using @a, @a`)
tk.MustExec(`commit`)

// txn2 can reuse the cached plan generated by txn1 directly
tk.MustExec(`begin`)
tk.MustExec(`prepare stmt from 'update t set a = ? where a = ?'`)
tk.MustExec(`set @a=2`)
tk.MustExec(`execute stmt using @a, @a`) // can reuse the cached plan directly
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustExec(`rollback`)
}

func (s *testPrepareSerialSuite) TestPrepareCache(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
Expand Down
24 changes: 12 additions & 12 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"go.uber.org/zap"
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/charset"
Expand Down Expand Up @@ -154,20 +153,21 @@ func (ts *TiDBStatement) Reset() {
// Close implements PreparedStatement Close method.
func (ts *TiDBStatement) Close() error {
// TODO close at tidb level
// TODO: introduce a new variable to control whether ignore close-stmt
if ts.ctx.GetSessionVars().TxnCtx != nil && ts.ctx.GetSessionVars().TxnCtx.CouldRetry {
err := ts.ctx.DropPreparedStmt(ts.id)
if err != nil {
return err
}
//err := ts.ctx.DropPreparedStmt(ts.id)
//if err != nil {
// return err
//}
} else {
if core.PreparedPlanCacheEnabled() {
preparedPointer := ts.ctx.GetSessionVars().PreparedStmts[ts.id]
preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt)
if !ok {
return errors.Errorf("invalid CachedPrepareStmt type")
}
ts.ctx.PreparedPlanCache().Delete(core.NewPSTMTPlanCacheKey(
ts.ctx.GetSessionVars(), ts.id, preparedObj.PreparedAst.SchemaVersion))
//preparedPointer := ts.ctx.GetSessionVars().PreparedStmts[ts.id]
//preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt)
//if !ok {
// return errors.Errorf("invalid CachedPrepareStmt type")
//}
//ts.ctx.PreparedPlanCache().Delete(core.NewPSTMTPlanCacheKey(
// ts.ctx.GetSessionVars(), preparedObj.PreparedStmtText, preparedObj.PreparedAst.SchemaVersion))
}
if ts.ctx.GetSessionVars().ConnectionID != 0 {
logutil.BgLogger().Warn("check prepare TiDBStatement.Close", zap.Uint64("connID", ts.ctx.GetSessionVars().ConnectionID))
Expand Down
6 changes: 4 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,23 @@ func (s *session) cleanRetryInfo() {

planCacheEnabled := plannercore.PreparedPlanCacheEnabled()
var cacheKey kvcache.Key
var preparedStmtText string
var preparedAst *ast.Prepared
if planCacheEnabled {
firstStmtID := retryInfo.DroppedPreparedStmtIDs[0]
if preparedPointer, ok := s.sessionVars.PreparedStmts[firstStmtID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
preparedStmtText = preparedObj.PreparedStmtText
if ok {
preparedAst = preparedObj.PreparedAst
cacheKey = plannercore.NewPSTMTPlanCacheKey(s.sessionVars, firstStmtID, preparedAst.SchemaVersion)
cacheKey, _ = plannercore.NewPSTMTPlanCacheKey(s.sessionVars, preparedStmtText, preparedAst.SchemaVersion)
}
}
}
for i, stmtID := range retryInfo.DroppedPreparedStmtIDs {
if planCacheEnabled {
if i > 0 && preparedAst != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtID, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
plannercore.SetPstmtIDSchemaVersion(cacheKey, preparedStmtText, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
}
s.PreparedPlanCache().Delete(cacheKey)
}
Expand Down

0 comments on commit 3633323

Please sign in to comment.