Skip to content

Commit

Permalink
multi-query prefetch support pessimistic transaction (#18439)
Browse files Browse the repository at this point in the history
Co-authored-by: ti-srebot <[email protected]>
  • Loading branch information
coocood and ti-srebot authored Jul 10, 2020
1 parent a74b258 commit 8a66104
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 24 deletions.
16 changes: 9 additions & 7 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
keys := make([]kv.Key, 0, len(e.idxVals))
for _, idxVals := range e.idxVals {
physID := getPhysID(e.tblInfo, kv.IntHandle(idxVals[e.partPos].GetInt64()))
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, idxVals, physID)
idxKey, err1 := EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, idxVals, physID)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
lockKeys = append(lockKeys, idxKey)
}
}
err = e.lockKeys(ctx, lockKeys)
err = LockKeys(ctx, e.ctx, e.waitTime, lockKeys...)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +278,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}
// Lock exists keys only for Read Committed Isolation.
if e.lock && rc {
err = e.lockKeys(ctx, existKeys)
err = LockKeys(ctx, e.ctx, e.waitTime, existKeys...)
if err != nil {
return err
}
Expand All @@ -286,14 +287,15 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
return nil
}

func (e *BatchPointGetExec) lockKeys(ctx context.Context, keys []kv.Key) error {
txnCtx := e.ctx.GetSessionVars().TxnCtx
lctx := newLockCtx(e.ctx.GetSessionVars(), e.waitTime)
// LockKeys locks the keys for pessimistic transaction.
func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := seCtx.GetSessionVars().TxnCtx
lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime)
if txnCtx.IsPessimistic {
lctx.ReturnValues = true
lctx.Values = make(map[string]kv.ReturnedValue, len(keys))
}
err := doLockKeys(ctx, e.ctx, lctx, keys...)
err := doLockKeys(ctx, seCtx, lctx, keys...)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
} else {
e.idxKey, err = encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, e.idxVals, tblID)
e.idxKey, err = EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals, tblID)
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
Expand Down Expand Up @@ -318,8 +319,9 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
return e.snapshot.Get(ctx, key)
}

func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
// EncodeUniqueIndexKey encodes a unique index key.
func EncodeUniqueIndexKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
sc := ctx.GetSessionVars().StmtCtx
for i := range idxVals {
colInfo := tblInfo.Columns[idxInfo.Columns[i].Offset]
// table.CastValue will append 0x0 if the string value's length is smaller than the BINARY column's length.
Expand All @@ -330,7 +332,7 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
str, err = idxVals[i].ToString()
idxVals[i].SetString(str, colInfo.FieldType.Collate)
} else {
idxVals[i], err = table.CastValue(e.ctx, idxVals[i], colInfo, true, false)
idxVals[i], err = table.CastValue(ctx, idxVals[i], colInfo, true, false)
if types.ErrOverflow.Equal(err) {
return nil, kv.ErrNotExist
}
Expand Down
40 changes: 27 additions & 13 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -1297,7 +1296,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
var pointPlans []plannercore.Plan
if len(stmts) > 1 {
// Only pre-build point plans for multi-statement query
pointPlans, err = cc.prefetchPointPlanKeys(stmts)
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts)
if err != nil {
return err
}
Expand All @@ -1323,7 +1322,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {

// prefetchPointPlanKeys extracts the point keys in multi-statement query,
// use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost.
func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore.Plan, error) {
// For pessimistic transaction, the keys will be batch locked.
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) {
txn, err := cc.ctx.Txn(false)
if err != nil {
return nil, err
Expand All @@ -1335,10 +1335,14 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
}
vars := cc.ctx.GetSessionVars()
if vars.TxnCtx.IsPessimistic {
// Pessimistic transaction do not benefit from the prefetch because the keys will be returned by Lock,
// we can get the keys from the lock cache.
// TODO: In the future we can support pre lock point keys for pessimistic transactions.
return nil, nil
if vars.IsReadConsistencyTxn() {
// TODO: to support READ-COMMITTED, we need to avoid getting new TS for each statement in the query.
return nil, nil
}
if vars.TxnCtx.GetForUpdateTS() != vars.TxnCtx.StartTS {
// Do not handle the case that ForUpdateTS is changed for simplicity.
return nil, nil
}
}
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key
Expand Down Expand Up @@ -1366,11 +1370,11 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
}
if pp.IndexInfo != nil {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
encoded, err1 := codec.EncodeKey(sc, nil, pp.IndexValues...)
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.ctx, pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID)
if err1 != nil {
return nil, err1
}
idxKeys = append(idxKeys, tablecodec.EncodeIndexSeekKey(pp.TblInfo.ID, pp.IndexInfo.ID, encoded))
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
Expand All @@ -1381,7 +1385,7 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
return pointPlans, nil
}
snapshot := txn.GetSnapshot()
idxVals, err1 := snapshot.BatchGet(context.Background(), idxKeys)
idxVals, err1 := snapshot.BatchGet(ctx, idxKeys)
if err1 != nil {
return nil, err1
}
Expand All @@ -1393,9 +1397,19 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
tblID := tablecodec.DecodeTableID(hack.Slice(idxKey))
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tblID, h))
}
_, err = snapshot.BatchGet(context.Background(), rowKeys)
if err != nil {
return nil, err
if vars.TxnCtx.IsPessimistic {
allKeys := append(rowKeys, idxKeys...)
err = executor.LockKeys(ctx, cc.ctx, vars.LockWaitTimeout, allKeys...)
if err != nil {
// suppress the lock error, we are not going to handle it here for simplicity.
err = nil
logutil.BgLogger().Warn("lock keys error on prefetch", zap.Error(err))
}
} else {
_, err = snapshot.BatchGet(ctx, rowKeys)
if err != nil {
return nil, err
}
}
return pointPlans, nil
}
Expand Down
12 changes: 12 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,16 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4"))

tk.MustExec("begin pessimistic")
tk.MustExec("update prefetch set c = c + 1 where a = 2 and b = 2")
c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 1)
err = cc.handleQuery(ctx, query)
c.Assert(err, IsNil)
txn, err = tk.Se.Txn(false)
c.Assert(err, IsNil)
c.Assert(txn.Valid(), IsTrue)
c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 5)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type TransactionContext struct {
// pessimisticLockCache is the cache for pessimistic locked keys,
// The value never changes during the transaction.
pessimisticLockCache map[string][]byte
PessimisticCacheHit int

// CreateTime For metrics.
CreateTime time.Time
Expand Down Expand Up @@ -236,6 +237,9 @@ func (tc *TransactionContext) GetKeyInPessimisticLockCache(key kv.Key) (val []by
return nil, false
}
val, ok = tc.pessimisticLockCache[string(key)]
if ok {
tc.PessimisticCacheHit++
}
return
}

Expand Down

0 comments on commit 8a66104

Please sign in to comment.