From e977efaa600309557188be2e8b85c53da5e8598c Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Fri, 15 Apr 2022 18:28:26 +0800 Subject: [PATCH] update --- executor/adapter.go | 8 ++ executor/simple.go | 53 ++------- planner/core/preprocess.go | 3 +- session/session.go | 22 ++-- session/txnmanager.go | 8 ++ sessiontxn/interface.go | 67 +++++++++++- sessiontxn/readcommitted/provider.go | 155 ++++++++++++++++++++------- sessiontxn/staleread/provider.go | 33 +++++- sessiontxn/txn.go | 98 +++++++++++++++++ 9 files changed, 338 insertions(+), 109 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 1fa91d66be3f0..935d397f0ff4f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -789,6 +789,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E // the async rollback operation rollbacked the lock just acquired if err != nil { tsErr := UpdateForUpdateTS(a.Ctx, 0) + if tsErr == nil { + tsErr = sessiontxn.AdviseNoConflictUpdateTS(a.Ctx, a.Ctx.GetSessionVars().TxnCtx.GetForUpdateTS()) + } + if tsErr != nil { logutil.Logger(ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr)) } @@ -805,6 +809,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E return nil, err } + if err = sessiontxn.AdviseNoConflictUpdateTS(a.Ctx, a.Ctx.GetSessionVars().TxnCtx.GetForUpdateTS()); err != nil { + return nil, err + } + if err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(); err != nil { return nil, err } diff --git a/executor/simple.go b/executor/simple.go index 645aec903343a..5614eda6b502f 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -43,7 +43,6 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/readcommitted" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" @@ -592,9 +591,6 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { } } if e.staleTxnStartTS > 0 { - if err := e.ctx.NewStaleTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil { - return err - } // With START TRANSACTION, autocommit remains disabled until you end // the transaction with COMMIT or ROLLBACK. The autocommit mode then // reverts to its previous state. @@ -602,51 +598,14 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error { if err := vars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil { return errors.Trace(err) } - vars.SetInTxn(true) - return nil - } - // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the - // need to call NewTxn, which commits the existing transaction and begins a new one. - // If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should - // always create a new transaction. - txnCtx := e.ctx.GetSessionVars().TxnCtx - if txnCtx.History != nil || txnCtx.IsStaleness { - err := e.ctx.NewTxn(ctx) - if err != nil { - return err - } - } - // With START TRANSACTION, autocommit remains disabled until you end - // the transaction with COMMIT or ROLLBACK. The autocommit mode then - // reverts to its previous state. - e.ctx.GetSessionVars().SetInTxn(true) - // Call ctx.Txn(true) to active pending txn. - txnMode := s.Mode - if txnMode == "" { - txnMode = e.ctx.GetSessionVars().TxnMode - } - if txnMode == ast.Pessimistic { - e.ctx.GetSessionVars().TxnCtx.IsPessimistic = true - } - txn, err := e.ctx.Txn(true) - if err != nil { - return err - } - if e.ctx.GetSessionVars().TxnCtx.IsPessimistic { - txn.SetOption(kv.Pessimistic, true) - } - if s.CausalConsistencyOnly { - txn.SetOption(kv.GuaranteeLinearizability, false) - } - - var txnCtxProvider sessiontxn.TxnContextProvider - if e.ctx.GetSessionVars().IsPessimisticReadConsistency() { - txnCtxProvider = readcommitted.NewRCTxnContextProvider(e.ctx) - } else { - txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{Sctx: e.ctx} } - return sessiontxn.GetTxnManager(e.ctx).SetContextProvider(txnCtxProvider) + return sessiontxn.CreateEnterNewTxnRequest(e.ctx). + ExplictBegin(). + StaleRead(e.staleTxnStartTS). + TxnMode(s.Mode). + CausalConsistencyOnly(s.CausalConsistencyOnly). + EnterNewSessionTxn(ctx) } func (e *SimpleExec) executeRevokeRole(ctx context.Context, s *ast.RevokeRoleStmt) error { diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 3991d75074b6e..88a395417a729 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1630,8 +1630,9 @@ func (p *preprocessor) updateStateFromStaleReadProcessor() error { p.ctx.GetSessionVars().StmtCtx.IsStaleness = true if !p.ctx.GetSessionVars().InTxn() { err := sessiontxn.GetTxnManager(p.ctx).SetContextProvider(staleread.NewStalenessTxnContextProvider( - p.InfoSchema, + p.ctx, p.LastSnapshotTS, + p.InfoSchema, )) if err != nil { diff --git a/session/session.go b/session/session.go index 4ac44e43804d3..d0c2614daf93a 100644 --- a/session/session.go +++ b/session/session.go @@ -48,7 +48,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/readcommitted" "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/helper" @@ -2321,7 +2320,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ snapshotTS = staleReadProcessor.GetStalenessReadTS() is = staleReadProcessor.GetStalenessInfoSchema() replicaReadScope = config.GetTxnScopeFromConfig() - if err = txnManager.SetContextProvider(staleread.NewStalenessTxnContextProvider(is, snapshotTS)); err != nil { + if err = txnManager.SetContextProvider(staleread.NewStalenessTxnContextProvider(s, snapshotTS, is)); err != nil { return nil, err } } @@ -2477,9 +2476,6 @@ func (s *session) NewTxn(ctx context.Context) error { TxnScope: s.sessionVars.CheckAndGetTxnScope(), } s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor()) - if s.sessionVars.IsPessimisticReadConsistency() { - - } return nil } @@ -2527,7 +2523,7 @@ func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) er TxnScope: txnScope, } s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor()) - return sessiontxn.GetTxnManager(s).SetContextProvider(staleread.NewStalenessTxnContextProvider(is, txn.StartTS())) + return nil } func (s *session) SetValue(key fmt.Stringer, value interface{}) { @@ -3125,17 +3121,13 @@ func (s *session) PrepareTxnCtx(ctx context.Context) error { } } - var txnCtxProvider sessiontxn.TxnContextProvider - if !s.sessionVars.IsPessimisticReadConsistency() { - txnCtxProvider = readcommitted.NewRCTxnContextProvider(s) + txnRequest := sessiontxn.CreateEnterNewTxnRequest(s) + if s.sessionVars.TxnCtx.IsPessimistic { + txnRequest.TxnMode(ast.Pessimistic) } else { - txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{ - Sctx: s, - InfoSchema: is.(infoschema.InfoSchema), - } + txnRequest.TxnMode(ast.Optimistic) } - - return sessiontxn.GetTxnManager(s).SetContextProvider(txnCtxProvider) + return txnRequest.EnterNewSessionTxn(ctx) } // PrepareTSFuture uses to try to get ts future. diff --git a/session/txnmanager.go b/session/txnmanager.go index 86d19c8a43acf..93c19d7ab1254 100644 --- a/session/txnmanager.go +++ b/session/txnmanager.go @@ -19,6 +19,7 @@ import ( "errors" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" ) @@ -78,6 +79,13 @@ func (m *txnManager) SetContextProvider(provider sessiontxn.TxnContextProvider) return nil } +func (m *txnManager) ActiveTxn(ctx context.Context) (kv.Transaction, error) { + if m.ctxProvider == nil { + return nil, errors.New("context provider not set") + } + return m.ctxProvider.ActiveTxn(ctx) +} + func (m *txnManager) OnStmtStart(ctx context.Context) error { if m.ctxProvider == nil { return errors.New("context provider not set") diff --git a/sessiontxn/interface.go b/sessiontxn/interface.go index eb730ed68b71f..a8c6cda601834 100644 --- a/sessiontxn/interface.go +++ b/sessiontxn/interface.go @@ -19,6 +19,8 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/sessionctx" ) @@ -26,6 +28,7 @@ type AdviceOption int64 const ( AdviceWarmUpNow AdviceOption = iota + AdviceNoConflictForUpdateTS ) // TxnContextProvider provides txn context @@ -37,6 +40,8 @@ type TxnContextProvider interface { // GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update GetForUpdateTS() (uint64, error) + // ActiveTxn actives the txn + ActiveTxn(ctx context.Context) (kv.Transaction, error) // OnStmtStart is the hook that should be called when a new statement started OnStmtStart(ctx context.Context) error // OnStmtRetry is the hook that should be called when a statement is retrying @@ -51,11 +56,13 @@ type TxnContextProvider interface { // It is only used in refactor stage // TODO: remove it after refactor finished type SimpleTxnContextProvider struct { - ctx context.Context - Sctx sessionctx.Context - InfoSchema infoschema.InfoSchema - GetReadTSFunc func() (uint64, error) - GetForUpdateTSFunc func() (uint64, error) + ctx context.Context + Sctx sessionctx.Context + CausalConsistencyOnly bool + stmtStarted bool + InfoSchema infoschema.InfoSchema + GetReadTSFunc func() (uint64, error) + GetForUpdateTSFunc func() (uint64, error) } // GetTxnInfoSchema returns the information schema used by txn @@ -79,10 +86,41 @@ func (p *SimpleTxnContextProvider) GetForUpdateTS() (uint64, error) { return p.GetForUpdateTSFunc() } +// ActiveTxn actives the txn +func (p *SimpleTxnContextProvider) ActiveTxn(ctx context.Context) (kv.Transaction, error) { + if !p.stmtStarted { + if err := p.Sctx.NewTxn(ctx); err != nil { + return nil, err + } + } else { + p.Sctx.PrepareTSFuture(ctx) + } + + txnMode, _ := ctx.Value("txnMode").(string) + if txnMode == "" { + txnMode = p.Sctx.GetSessionVars().TxnMode + } + if txnMode == ast.Pessimistic { + p.Sctx.GetSessionVars().TxnCtx.IsPessimistic = true + } + txn, err := p.Sctx.Txn(true) + if err != nil { + return nil, err + } + if p.Sctx.GetSessionVars().TxnCtx.IsPessimistic { + txn.SetOption(kv.Pessimistic, true) + } + if p.CausalConsistencyOnly { + txn.SetOption(kv.GuaranteeLinearizability, false) + } + return txn, nil +} + // OnStmtStart is the hook that should be called when a new statement started func (p *SimpleTxnContextProvider) OnStmtStart(ctx context.Context) error { p.ctx = ctx p.InfoSchema = p.Sctx.GetInfoSchema().(infoschema.InfoSchema) + p.stmtStarted = true return nil } @@ -114,6 +152,8 @@ type TxnManager interface { // GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update GetForUpdateTS() (uint64, error) + // ActiveTxn actives the txn + ActiveTxn(ctx context.Context) (kv.Transaction, error) // GetContextProvider returns the current TxnContextProvider GetContextProvider() TxnContextProvider // SetContextProvider sets the context provider @@ -133,5 +173,22 @@ func AdviseTxnWarmUp(sctx sessionctx.Context) error { return GetTxnManager(sctx).Advise(AdviceWarmUpNow) } +func AdviseNoConflictUpdateTS(sctx sessionctx.Context, ts uint64) error { + return GetTxnManager(sctx).Advise(AdviceNoConflictForUpdateTS, ts) +} + +func WithValUint64(val []interface{}, fn func(uint64) error) error { + if len(val) == 0 { + return errors.New("%v need a uint64 arg") + } + + if ts, ok := val[0].(uint64); ok { + return fn(ts) + } + + return errors.New("%v need a uint64 arg") + +} + // GetTxnManager returns the TxnManager object from session context var GetTxnManager func(sctx sessionctx.Context) TxnManager diff --git a/sessiontxn/readcommitted/provider.go b/sessiontxn/readcommitted/provider.go index 6f0894fac49a5..4fcb35d93860d 100644 --- a/sessiontxn/readcommitted/provider.go +++ b/sessiontxn/readcommitted/provider.go @@ -17,6 +17,7 @@ package readcommitted import ( "context" + "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" @@ -26,12 +27,37 @@ import ( ) type rcStmtContext struct { - sctx sessionctx.Context - ctx context.Context - prevStmt *rcStmtContext + sctx sessionctx.Context + ctx context.Context + prevStmt *rcStmtContext + // useTxnStartTS means whether to use the transaction's start ts as the read ts useTxnStartTS bool + // When RCCheckTS optimization is enabled, `usePrevStmtTS` can be true and try reuse the previous statement's ts first. + // This may cause an error from the storage layer. At that, the statement will retry and get the ts from oracle again. + usePrevStmtTS bool ts uint64 tsFuture oracle.Future + // advisedRetryTS is the advised ts when retry + advisedRetryTS uint64 +} + +func newRcStmtContext(ctx context.Context, sctx sessionctx.Context, prevStmt *rcStmtContext) (*rcStmtContext, error) { + useTxnStartTS := false + if prevStmt == nil { + txn, err := sctx.Txn(false) + if err != nil { + return nil, err + } + useTxnStartTS = !txn.Valid() + } + + return &rcStmtContext{ + ctx: ctx, + sctx: sctx, + prevStmt: prevStmt, + useTxnStartTS: useTxnStartTS, + usePrevStmtTS: prevStmt != nil && prevStmt.ts > 0 && sctx.GetSessionVars().StmtCtx.RCCheckTS, + }, nil } func (s *rcStmtContext) getTS() (ts uint64, err error) { @@ -45,13 +71,7 @@ func (s *rcStmtContext) getTS() (ts uint64, err error) { return 0, err } - if s.useTxnStartTS { - ts = txn.StartTS() - } else { - ts, err = s.tsFuture.Wait() - } - - if err != nil { + if ts, err = s.tsFuture.Wait(); err != nil { return 0, err } @@ -62,15 +82,15 @@ func (s *rcStmtContext) getTS() (ts uint64, err error) { } func (s *rcStmtContext) retry() error { - forUpdateTS := s.sctx.GetSessionVars().TxnCtx.GetForUpdateTS() - if s.ts > 0 && forUpdateTS > s.ts { - s.tsFuture = sessiontxn.ConstantTSFuture(forUpdateTS) + if s.ts > 0 && s.advisedRetryTS > s.ts { + s.tsFuture = sessiontxn.ConstantTSFuture(s.advisedRetryTS) } else { - s.tsFuture = s.getOracleFuture() + s.tsFuture = s.getTxnOracleFuture() } s.ts = 0 s.useTxnStartTS = false + s.advisedRetryTS = 0 // trigger tso fetch immediately _, err := s.getTS() @@ -78,33 +98,57 @@ func (s *rcStmtContext) retry() error { } func (s *rcStmtContext) prepareTS() { - s.sctx.PrepareTSFuture(s.ctx) - if s.useTxnStartTS || s.tsFuture != nil { + if s.tsFuture != nil { return } - sessVars := s.sctx.GetSessionVars() - if !sessVars.StmtCtx.RCCheckTS && s.prevStmt != nil && s.prevStmt.ts > 0 { + s.sctx.PrepareTSFuture(s.ctx) + if s.useTxnStartTS { + s.tsFuture = s.getTxnStartTSFuture() + } else if s.usePrevStmtTS { s.tsFuture = sessiontxn.ConstantTSFuture(s.prevStmt.ts) } else { - s.tsFuture = s.getOracleFuture() + s.tsFuture = s.getTxnOracleFuture() } } -func (s *rcStmtContext) getOracleFuture() oracle.Future { +func (s *rcStmtContext) getTxnStartTSFuture() sessiontxn.FuncTSFuture { + return func() (uint64, error) { + txn, err := s.sctx.Txn(false) + if err != nil { + return 0, err + } + + if !txn.Valid() { + return 0, errors.New("txn is invalid") + } + + return txn.StartTS(), nil + } +} + +func (s *rcStmtContext) getTxnOracleFuture() oracle.Future { return sessiontxn.GetTxnOracleFuture(s.ctx, s.sctx, s.sctx.GetSessionVars().CheckAndGetTxnScope()) + } type txnContextProvider struct { - sctx sessionctx.Context - is infoschema.InfoSchema - stmt *rcStmtContext + sctx sessionctx.Context + is infoschema.InfoSchema + stmt *rcStmtContext + causalConsistencyOnly bool + + // tidbSnapshotVarTS is the set by @@tidb_snapshot + tidbSnapshotVarTS uint64 + // tidbSnapshotVarInfoSchema is the timestamp according to tidbSnapshotVarTS + tidbSnapshotVarInfoSchema infoschema.InfoSchema } // NewRCTxnContextProvider creates a new txnContextProvider -func NewRCTxnContextProvider(sctx sessionctx.Context) sessiontxn.TxnContextProvider { +func NewRCTxnContextProvider(sctx sessionctx.Context, causalConsistencyOnly bool) sessiontxn.TxnContextProvider { return &txnContextProvider{ - sctx: sctx, + sctx: sctx, + causalConsistencyOnly: causalConsistencyOnly, is: temptable.AttachLocalTemporaryTableInfoSchema( sctx, sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema), @@ -113,15 +157,35 @@ func NewRCTxnContextProvider(sctx sessionctx.Context) sessiontxn.TxnContextProvi } func (p *txnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { - if snapshotIS := p.sctx.GetSessionVars().SnapshotInfoschema; snapshotIS != nil { - return snapshotIS.(infoschema.InfoSchema) + if p.tidbSnapshotVarTS != 0 { + return p.tidbSnapshotVarInfoSchema } return p.is } +func (p *txnContextProvider) ActiveTxn(ctx context.Context) (kv.Transaction, error) { + if p.stmt == nil { + if err := p.sctx.NewTxn(ctx); err != nil { + return nil, err + } + } else { + p.sctx.PrepareTSFuture(ctx) + } + + txn, err := p.sctx.Txn(true) + if err != nil { + return nil, err + } + + if p.causalConsistencyOnly { + txn.SetOption(kv.GuaranteeLinearizability, false) + } + return txn, nil +} + func (p *txnContextProvider) GetReadTS() (uint64, error) { - if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { - return snapshotTS, nil + if p.tidbSnapshotVarTS != 0 { + return p.tidbSnapshotVarTS, nil } return p.stmt.getTS() } @@ -131,12 +195,15 @@ func (p *txnContextProvider) GetForUpdateTS() (uint64, error) { } func (p *txnContextProvider) OnStmtStart(ctx context.Context) error { - p.stmt = &rcStmtContext{ - ctx: ctx, - sctx: p.sctx, - prevStmt: p.stmt, - useTxnStartTS: p.stmt == nil, + if snapshotTS := p.sctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { + p.tidbSnapshotVarTS = snapshotTS + p.tidbSnapshotVarInfoSchema = p.sctx.GetSessionVars().SnapshotInfoschema.(infoschema.InfoSchema) + } + stmt, err := newRcStmtContext(ctx, p.sctx, p.stmt) + if err != nil { + return err } + p.stmt = stmt return nil } @@ -144,10 +211,26 @@ func (p *txnContextProvider) OnStmtRetry() error { return p.stmt.retry() } -func (p *txnContextProvider) Advise(opt sessiontxn.AdviceOption, _ ...interface{}) error { +func (p *txnContextProvider) warmUp() error { + if p.tidbSnapshotVarTS != 0 { + p.stmt.prepareTS() + } + return nil +} + +func (p *txnContextProvider) adviseRetryTS(ts uint64) error { + if p.stmt == nil { + p.stmt.advisedRetryTS = ts + } + return nil +} + +func (p *txnContextProvider) Advise(opt sessiontxn.AdviceOption, val ...interface{}) error { switch opt { case sessiontxn.AdviceWarmUpNow: - p.stmt.prepareTS() + return p.warmUp() + case sessiontxn.AdviceNoConflictForUpdateTS: + return sessiontxn.WithValUint64(val, p.adviseRetryTS) } return nil } diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go index 4912e812615a8..1eb9562212c87 100644 --- a/sessiontxn/staleread/provider.go +++ b/sessiontxn/staleread/provider.go @@ -18,21 +18,27 @@ import ( "context" "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/table/temptable" ) // StalenessTxnContextProvider implements sessiontxn.TxnContextProvider type StalenessTxnContextProvider struct { - is infoschema.InfoSchema - ts uint64 + sctx sessionctx.Context + is infoschema.InfoSchema + ts uint64 } // NewStalenessTxnContextProvider creates a new StalenessTxnContextProvider -func NewStalenessTxnContextProvider(is infoschema.InfoSchema, ts uint64) *StalenessTxnContextProvider { +func NewStalenessTxnContextProvider(sctx sessionctx.Context, ts uint64, is infoschema.InfoSchema) *StalenessTxnContextProvider { return &StalenessTxnContextProvider{ - is: is, - ts: ts, + sctx: sctx, + is: is, + ts: ts, } } @@ -51,8 +57,25 @@ func (p *StalenessTxnContextProvider) GetForUpdateTS() (uint64, error) { return 0, errors.New("GetForUpdateTS not supported for stalenessTxnProvider") } +// ActiveTxn actives stale read txn +func (p *StalenessTxnContextProvider) ActiveTxn(ctx context.Context) (kv.Transaction, error) { + if err := p.sctx.NewStaleTxnWithStartTS(ctx, p.ts); err != nil { + return nil, err + } + p.is = p.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema) + return p.sctx.Txn(true) +} + // OnStmtStart is the hook that should be called when a new statement started func (p *StalenessTxnContextProvider) OnStmtStart(_ context.Context) error { + if p.is == nil { + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(p.ts) + if err != nil { + return err + } + + p.is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) + } return nil } diff --git a/sessiontxn/txn.go b/sessiontxn/txn.go index 289782e118262..62065298dc2a7 100644 --- a/sessiontxn/txn.go +++ b/sessiontxn/txn.go @@ -18,7 +18,11 @@ import ( "context" "github.com/opentracing/opentracing-go" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn/readcommitted" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/tikv/client-go/v2/oracle" ) @@ -28,6 +32,12 @@ func (c ConstantTSFuture) Wait() (uint64, error) { return uint64(c), nil } +type FuncTSFuture func() (uint64, error) + +func (f FuncTSFuture) Wait() (uint64, error) { + return f() +} + func GetTxnOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("session.getTxnFuture", opentracing.ChildOf(span.Context())) @@ -44,3 +54,91 @@ func GetTxnOracleFuture(ctx context.Context, sctx sessionctx.Context, scope stri return oracleStore.GetTimestampAsync(ctx, option) } } + +type EnterNewSessionTxnRequest struct { + sctx sessionctx.Context + explictBegin bool + txnMode string + causalConsistencyOnly bool + staleReadTS uint64 + staleReadInfoSchema infoschema.InfoSchema +} + +func CreateEnterNewTxnRequest(sctx sessionctx.Context) *EnterNewSessionTxnRequest { + return &EnterNewSessionTxnRequest{ + sctx: sctx, + } +} + +func (r *EnterNewSessionTxnRequest) ExplictBegin() *EnterNewSessionTxnRequest { + r.explictBegin = true + return r +} + +func (r *EnterNewSessionTxnRequest) StaleRead(ts uint64) *EnterNewSessionTxnRequest { + r.staleReadTS = ts + return nil +} + +func (r *EnterNewSessionTxnRequest) StaleReadWithInfoSchema(ts uint64, is infoschema.InfoSchema) *EnterNewSessionTxnRequest { + r.staleReadTS = ts + r.staleReadInfoSchema = is + return nil +} + +func (r *EnterNewSessionTxnRequest) TxnMode(mode string) *EnterNewSessionTxnRequest { + r.txnMode = mode + return r +} + +func (r *EnterNewSessionTxnRequest) CausalConsistencyOnly(val bool) *EnterNewSessionTxnRequest { + r.causalConsistencyOnly = val + return r +} + +func (r *EnterNewSessionTxnRequest) EnterNewSessionTxn(ctx context.Context) error { + sessVars := r.sctx.GetSessionVars() + txnCtx := sessVars.TxnCtx + if r.explictBegin && txnCtx.History == nil && !txnCtx.IsStaleness && r.staleReadTS == 0 { + // If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the + // need to call NewTxn, which commits the existing transaction and begins a new one. + // If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should + // always create a new transaction. + return nil + } + + txnMode := r.txnMode + if txnMode == "" { + txnMode = sessVars.TxnMode + } + + var provider TxnContextProvider + switch { + case r.staleReadTS > 0: + // stale read should be the first place to check, `is` is nil because it will be fetched when `ActiveTxn` + provider = staleread.NewStalenessTxnContextProvider(r.sctx, r.staleReadTS, nil) + case txnMode == ast.Pessimistic && sessVars.IsIsolation(ast.ReadCommitted): + // when txn mode is pessimistic and isolation level is 'READ-COMMITTED', use RC + provider = readcommitted.NewRCTxnContextProvider(r.sctx, r.causalConsistencyOnly) + default: + // otherwise, use `SimpleTxnContextProvider` for compatibility + provider = &SimpleTxnContextProvider{Sctx: r.sctx, CausalConsistencyOnly: r.causalConsistencyOnly} + } + + txnManager := GetTxnManager(r.sctx) + if err := txnManager.SetContextProvider(provider); err == nil { + return err + } + + if r.explictBegin { + if _, err := txnManager.ActiveTxn(ctx); err != nil { + return err + } + // With START TRANSACTION, autocommit remains disabled until you end + // the transaction with COMMIT or ROLLBACK. The autocommit mode then + // reverts to its previous state. + sessVars.SetInTxn(true) + } + + return nil +}