From 950b594def13e168d31a6b08ee6a4143246486f5 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Wed, 8 Jul 2020 17:02:07 +0800 Subject: [PATCH] multi-query prefetch support pessimistic transaction --- executor/batch_point_get.go | 16 ++++++++------ executor/point_get.go | 10 +++++---- server/conn.go | 40 +++++++++++++++++++++++----------- server/conn_test.go | 12 ++++++++++ sessionctx/variable/session.go | 4 ++++ 5 files changed, 58 insertions(+), 24 deletions(-) diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 07e8c6e6edb2e..fbae52044c1a8 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" @@ -152,7 +153,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { keys := make([]kv.Key, 0, len(e.idxVals)) for _, idxVals := range e.idxVals { physID := getPhysID(e.tblInfo, kv.IntHandle(idxVals[e.partPos].GetInt64())) - idxKey, err1 := encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, idxVals, physID) + idxKey, err1 := EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, idxVals, physID) if err1 != nil && !kv.ErrNotExist.Equal(err1) { return err1 } @@ -244,7 +245,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { lockKeys = append(lockKeys, idxKey) } } - err = e.lockKeys(ctx, lockKeys) + err = LockKeys(ctx, e.ctx, e.waitTime, lockKeys...) if err != nil { return err } @@ -277,7 +278,7 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { } // Lock exists keys only for Read Committed Isolation. if e.lock && rc { - err = e.lockKeys(ctx, existKeys) + err = LockKeys(ctx, e.ctx, e.waitTime, existKeys...) if err != nil { return err } @@ -286,14 +287,15 @@ func (e *BatchPointGetExec) initialize(ctx context.Context) error { return nil } -func (e *BatchPointGetExec) lockKeys(ctx context.Context, keys []kv.Key) error { - txnCtx := e.ctx.GetSessionVars().TxnCtx - lctx := newLockCtx(e.ctx.GetSessionVars(), e.waitTime) +// LockKeys locks the keys for pessimistic transaction. +func LockKeys(ctx context.Context, seCtx sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error { + txnCtx := seCtx.GetSessionVars().TxnCtx + lctx := newLockCtx(seCtx.GetSessionVars(), lockWaitTime) if txnCtx.IsPessimistic { lctx.ReturnValues = true lctx.Values = make(map[string]kv.ReturnedValue, len(keys)) } - err := doLockKeys(ctx, e.ctx, lctx, keys...) + err := doLockKeys(ctx, seCtx, lctx, keys...) if err != nil { return err } diff --git a/executor/point_get.go b/executor/point_get.go index 54595c615a2d6..a4e12c1b2fd16 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/tablecodec" @@ -170,7 +171,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { return err } } else { - e.idxKey, err = encodeIndexKey(e.base(), e.tblInfo, e.idxInfo, e.idxVals, tblID) + e.idxKey, err = EncodeUniqueIndexKey(e.ctx, e.tblInfo, e.idxInfo, e.idxVals, tblID) if err != nil && !kv.ErrNotExist.Equal(err) { return err } @@ -318,8 +319,9 @@ func (e *PointGetExecutor) get(ctx context.Context, key kv.Key) ([]byte, error) return e.snapshot.Get(ctx, key) } -func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) { - sc := e.ctx.GetSessionVars().StmtCtx +// EncodeUniqueIndexKey encodes a unique index key. +func EncodeUniqueIndexKey(ctx sessionctx.Context, tblInfo *model.TableInfo, idxInfo *model.IndexInfo, idxVals []types.Datum, tID int64) (_ []byte, err error) { + sc := ctx.GetSessionVars().StmtCtx for i := range idxVals { colInfo := tblInfo.Columns[idxInfo.Columns[i].Offset] // table.CastValue will append 0x0 if the string value's length is smaller than the BINARY column's length. @@ -330,7 +332,7 @@ func encodeIndexKey(e *baseExecutor, tblInfo *model.TableInfo, idxInfo *model.In str, err = idxVals[i].ToString() idxVals[i].SetString(str, colInfo.FieldType.Collate) } else { - idxVals[i], err = table.CastValue(e.ctx, idxVals[i], colInfo, true, false) + idxVals[i], err = table.CastValue(ctx, idxVals[i], colInfo, true, false) if types.ErrOverflow.Equal(err) { return nil, kv.ErrNotExist } diff --git a/server/conn.go b/server/conn.go index 3fc1af0a6bb17..c8e69d7bfed91 100644 --- a/server/conn.go +++ b/server/conn.go @@ -72,7 +72,6 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/arena" "github.com/pingcap/tidb/util/chunk" - "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" @@ -1292,7 +1291,7 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { var pointPlans []plannercore.Plan if len(stmts) > 1 { // Only pre-build point plans for multi-statement query - pointPlans, err = cc.prefetchPointPlanKeys(stmts) + pointPlans, err = cc.prefetchPointPlanKeys(ctx, stmts) if err != nil { return err } @@ -1318,7 +1317,8 @@ func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) { // prefetchPointPlanKeys extracts the point keys in multi-statement query, // use BatchGet to get the keys, so the values will be cached in the snapshot cache, save RPC call cost. -func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore.Plan, error) { +// For pessimistic transaction, the keys will be batch locked. +func (cc *clientConn) prefetchPointPlanKeys(ctx context.Context, stmts []ast.StmtNode) ([]plannercore.Plan, error) { txn, err := cc.ctx.Txn(false) if err != nil { return nil, err @@ -1330,10 +1330,14 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore } vars := cc.ctx.GetSessionVars() if vars.TxnCtx.IsPessimistic { - // Pessimistic transaction do not benefit from the prefetch because the keys will be returned by Lock, - // we can get the keys from the lock cache. - // TODO: In the future we can support pre lock point keys for pessimistic transactions. - return nil, nil + if vars.IsReadConsistencyTxn() { + // TODO: to support READ-COMMITTED, we need to avoid getting new TS for each statement in the query. + return nil, nil + } + if vars.TxnCtx.GetForUpdateTS() != vars.TxnCtx.StartTS { + // Do not handle the case that ForUpdateTS is changed for simplicity. + return nil, nil + } } pointPlans := make([]plannercore.Plan, len(stmts)) var idxKeys []kv.Key @@ -1361,11 +1365,11 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore } if pp.IndexInfo != nil { executor.ResetUpdateStmtCtx(sc, updateStmt, vars) - encoded, err1 := codec.EncodeKey(sc, nil, pp.IndexValues...) + idxKey, err1 := executor.EncodeUniqueIndexKey(cc.ctx, pp.TblInfo, pp.IndexInfo, pp.IndexValues, pp.TblInfo.ID) if err1 != nil { return nil, err1 } - idxKeys = append(idxKeys, tablecodec.EncodeIndexSeekKey(pp.TblInfo.ID, pp.IndexInfo.ID, encoded)) + idxKeys = append(idxKeys, idxKey) } else { rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(pp.TblInfo.ID, pp.Handle)) } @@ -1376,7 +1380,7 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore return pointPlans, nil } snapshot := txn.GetSnapshot() - idxVals, err1 := snapshot.BatchGet(context.Background(), idxKeys) + idxVals, err1 := snapshot.BatchGet(ctx, idxKeys) if err1 != nil { return nil, err1 } @@ -1388,9 +1392,19 @@ func (cc *clientConn) prefetchPointPlanKeys(stmts []ast.StmtNode) ([]plannercore tblID := tablecodec.DecodeTableID(hack.Slice(idxKey)) rowKeys = append(rowKeys, tablecodec.EncodeRowKeyWithHandle(tblID, h)) } - _, err = snapshot.BatchGet(context.Background(), rowKeys) - if err != nil { - return nil, err + if vars.TxnCtx.IsPessimistic { + allKeys := append(rowKeys, idxKeys...) + err = executor.LockKeys(ctx, cc.ctx, vars.LockWaitTimeout, allKeys...) + if err != nil { + // suppress the lock error, we are not going to handle it here for simplicity. + err = nil + logutil.BgLogger().Warn("lock keys error on prefetch", zap.Error(err)) + } + } else { + _, err = snapshot.BatchGet(ctx, rowKeys) + if err != nil { + return nil, err + } } return pointPlans, nil } diff --git a/server/conn_test.go b/server/conn_test.go index 7beb523c85103..719a3491371e1 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -629,4 +629,16 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) { c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4) tk.MustExec("commit") tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4")) + + tk.MustExec("begin pessimistic") + tk.MustExec("update prefetch set c = c + 1 where a = 2 and b = 2") + c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 1) + err = cc.handleQuery(ctx, query) + c.Assert(err, IsNil) + txn, err = tk.Se.Txn(false) + c.Assert(err, IsNil) + c.Assert(txn.Valid(), IsTrue) + c.Assert(tk.Se.GetSessionVars().TxnCtx.PessimisticCacheHit, Equals, 5) + tk.MustExec("commit") + tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 3", "2 2 6", "3 3 5")) } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index c4bedecff0c11..89eeffa8d1810 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -158,6 +158,7 @@ type TransactionContext struct { // pessimisticLockCache is the cache for pessimistic locked keys, // The value never changes during the transaction. pessimisticLockCache map[string][]byte + PessimisticCacheHit int // CreateTime For metrics. CreateTime time.Time @@ -236,6 +237,9 @@ func (tc *TransactionContext) GetKeyInPessimisticLockCache(key kv.Key) (val []by return nil, false } val, ok = tc.pessimisticLockCache[string(key)] + if ok { + tc.PessimisticCacheHit++ + } return }