Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: multi-query prefetch support pessimistic transaction #18439

Merged
merged 3 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -152,7 +153,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
keys := make([]kv.Key, 0, len(e.idxVals))
for _, idxVals := range e.idxVals {
physID := getPhysID(e.tblInfo, kv.IntHandle(idxVals[e.partPos].GetInt64()))
idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, idxVals, physID)
idxKey, err1 := EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, idxVals, physID)
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
return err1
}
Expand Down Expand Up @@ -244,7 +245,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
lockKeys = append(lockKeys, idxKey)
}
}
err = e.lockKeys(ctx, lockKeys)
err = LockKeys(ctx, e.ctx, e.waitTime, lockKeys...)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +278,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
}
// Lock exists keys only for Read Committed Isolation.
if e.lock && rc {
err = e.lockKeys(ctx, existKeys)
err = LockKeys(ctx, e.ctx, e.waitTime, existKeys...)
if err != nil {
return err
}
Expand All @@ -286,14 +287,15 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error {
return nil
}

func (e *BatchPointGetExec) lockKeys(ctx context.Context, keys []kv.Key) error {
txnCtx := e.ctx.GetSessionVars().TxnCtx
lctx := newLockCtx(e.ctx.GetSessionVars(), e.waitTime)
// LockKeys locks the keys for pessimistic transaction.
func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
txnCtx := seCtx.GetSessionVars().TxnCtx
lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime)
if txnCtx.IsPessimistic {
lctx.ReturnValues = true
lctx.Values = make(map[string]kv.ReturnedValue, len(keys))
}
err := doLockKeys(ctx, e.ctx, lctx, keys...)
err := doLockKeys(ctx, seCtx, lctx, keys...)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -170,7 +171,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
} else {
e.idxKey, err = encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, e.idxVals, tblID)
e.idxKey, err = EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals, tblID)
if err != nil && !kv.ErrNotExist.Equal(err) {
return err
}
Expand Down Expand Up @@ -318,8 +319,9 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error)
return e.snapshot.Get(ctx, key)
}

func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
sc := e.ctx.GetSessionVars().StmtCtx
// EncodeUniqueIndexKey encodes a unique index key.
func EncodeUniqueIndexKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) {
sc := ctx.GetSessionVars().StmtCtx
for i := range idxVals {
colInfo := tblInfo.Columns[idxInfo.Columns[i].Offset]
// table.CastValue will append 0x0 if the string value's length is smaller than the BINARY column's length.
Expand All @@ -330,7 +332,7 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In
str, err = idxVals[i].ToString()
idxVals[i].SetString(str, colInfo.FieldType.Collate)
} else {
idxVals[i], err = table.CastValue(e.ctx, idxVals[i], colInfo, true, false)
idxVals[i], err = table.CastValue(ctx, idxVals[i], colInfo, true, false)
if types.ErrOverflow.Equal(err) {
return nil, kv.ErrNotExist
}
Expand Down
40 changes: 27 additions & 13 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/arena"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -1297,7 +1296,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {
var pointPlans []plannercore.Plan
if len(stmts) > 1 {
// Only pre-build point plans for multi-statement query
pointPlans, err = cc.prefetchPointPlanKeys(stmts)
pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts)
if err != nil {
return err
}
Expand All @@ -1323,7 +1322,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {

// prefetchPointPlanKeys extracts the point keys in multi-statement query,
// use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost.
func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore.Plan, error) {
// For pessimistic transaction, the keys will be batch locked.
func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) {
txn, err := cc.ctx.Txn(false)
if err != nil {
return nil, err
Expand All @@ -1335,10 +1335,14 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
}
vars := cc.ctx.GetSessionVars()
if vars.TxnCtx.IsPessimistic {
// Pessimistic transaction do not benefit from the prefetch because the keys will be returned by Lock,
// we can get the keys from the lock cache.
// TODO: In the future we can support pre lock point keys for pessimistic transactions.
return nil, nil
if vars.IsReadConsistencyTxn() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if it is a multi-query, we could prefetch data for RC transaction.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do it in the future.

// TODO: to support READ-COMMITTED, we need to avoid getting new TS for each statement in the query.
return nil, nil
}
if vars.TxnCtx.GetForUpdateTS() != vars.TxnCtx.StartTS {
// Do not handle the case that ForUpdateTS is changed for simplicity.
return nil, nil
}
}
pointPlans := make([]plannercore.Plan, len(stmts))
var idxKeys []kv.Key
Expand Down Expand Up @@ -1366,11 +1370,11 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
}
if pp.IndexInfo != nil {
executor.ResetUpdateStmtCtx(sc, updateStmt, vars)
encoded, err1 := codec.EncodeKey(sc, nil, pp.IndexValues...)
idxKey, err1 := executor.EncodeUniqueIndexKey(cc.ctx, pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID)
if err1 != nil {
return nil, err1
}
idxKeys = append(idxKeys, tablecodec.EncodeIndexSeekKey(pp.TblInfo.ID, pp.IndexInfo.ID, encoded))
idxKeys = append(idxKeys, idxKey)
} else {
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle))
}
Expand All @@ -1381,7 +1385,7 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
return pointPlans, nil
}
snapshot := txn.GetSnapshot()
idxVals, err1 := snapshot.BatchGet(context.Background(), idxKeys)
idxVals, err1 := snapshot.BatchGet(ctx, idxKeys)
if err1 != nil {
return nil, err1
}
Expand All @@ -1393,9 +1397,19 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore
tblID := tablecodec.DecodeTableID(hack.Slice(idxKey))
rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tblID, h))
}
_, err = snapshot.BatchGet(context.Background(), rowKeys)
if err != nil {
return nil, err
if vars.TxnCtx.IsPessimistic {
allKeys := append(rowKeys, idxKeys...)
err = executor.LockKeys(ctx, cc.ctx, vars.LockWaitTimeout, allKeys...)
if err != nil {
// suppress the lock error, we are not going to handle it here for simplicity.
err = nil
logutil.BgLogger().Warn("lock keys error on prefetch", zap.Error(err))
}
} else {
_, err = snapshot.BatchGet(ctx, rowKeys)
if err != nil {
return nil, err
}
}
return pointPlans, nil
}
Expand Down
12 changes: 12 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,4 +629,16 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4"))

tk.MustExec("begin pessimistic")
tk.MustExec("update prefetch set c = c + 1 where a = 2 and b = 2")
c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 1)
err = cc.handleQuery(ctx, query)
c.Assert(err, IsNil)
txn, err = tk.Se.Txn(false)
c.Assert(err, IsNil)
c.Assert(txn.Valid(), IsTrue)
c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 5)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5"))
}
4 changes: 4 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type TransactionContext struct {
// pessimisticLockCache is the cache for pessimistic locked keys,
// The value never changes during the transaction.
pessimisticLockCache map[string][]byte
PessimisticCacheHit int

// CreateTime For metrics.
CreateTime time.Time
Expand Down Expand Up @@ -236,6 +237,9 @@ func (tc *TransactionContext) GetKeyInPessimisticLockCache(key kv.Key) (val []by
return nil, false
}
val, ok = tc.pessimisticLockCache[string(key)]
if ok {
tc.PessimisticCacheHit++
}
return
}

Expand Down