Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Apr 16, 2022
1 parent e977efa commit 93f5e67
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 121 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
return nil, err
}

if err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(); err != nil {
if err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(ctx); err != nil {
return nil, err
}

Expand Down
12 changes: 6 additions & 6 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,12 +600,12 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
}
}

return sessiontxn.CreateEnterNewTxnRequest(e.ctx).
ExplictBegin().
StaleRead(e.staleTxnStartTS).
TxnMode(s.Mode).
CausalConsistencyOnly(s.CausalConsistencyOnly).
EnterNewSessionTxn(ctx)
return sessiontxn.GetTxnManager(e.ctx).EnterNewTxn(ctx, &sessiontxn.NewTxnRequest{
ExplictStart: true,
StaleReadTS: e.staleTxnStartTS,
TxnMode: s.Mode,
CausalConsistencyOnly: s.CausalConsistencyOnly,
})
}

func (e *SimpleExec) executeRevokeRole(ctx context.Context, s *ast.RevokeRoleStmt) error {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ func (p *preprocessor) updateStateFromStaleReadProcessor() error {
if p.flag&initTxnContextProvider != 0 {
p.ctx.GetSessionVars().StmtCtx.IsStaleness = true
if !p.ctx.GetSessionVars().InTxn() {
err := sessiontxn.GetTxnManager(p.ctx).SetContextProvider(staleread.NewStalenessTxnContextProvider(
err := sessiontxn.GetTxnManager(p.ctx).ReplaceContextProvider(staleread.NewStalenessTxnContextProvider(
p.ctx,
p.LastSnapshotTS,
p.InfoSchema,
Expand Down
23 changes: 14 additions & 9 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,14 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
s.txn.onStmtStart(digest.String())
defer s.txn.onStmtEnd()

if err := sessiontxn.GetTxnManager(s).OnStmtStart(ctx); err != nil {
var err error
if s.sessionVars.RetryInfo.Retrying {
err = sessiontxn.GetTxnManager(s).OnStmtStart(ctx)
} else {
err = sessiontxn.GetTxnManager(s).OnStmtRetry(ctx)
}

if err != nil {
return nil, err
}

Expand Down Expand Up @@ -2320,7 +2327,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
snapshotTS = staleReadProcessor.GetStalenessReadTS()
is = staleReadProcessor.GetStalenessInfoSchema()
replicaReadScope = config.GetTxnScopeFromConfig()
if err = txnManager.SetContextProvider(staleread.NewStalenessTxnContextProvider(s, snapshotTS, is)); err != nil {
if err = txnManager.ReplaceContextProvider(staleread.NewStalenessTxnContextProvider(s, snapshotTS, is)); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -3114,20 +3121,18 @@ func (s *session) PrepareTxnCtx(ctx context.Context) error {
ShardStep: int(s.sessionVars.ShardAllocateStep),
TxnScope: s.GetSessionVars().CheckAndGetTxnScope(),
}
txnMode := ast.Optimistic
if !s.sessionVars.IsAutocommit() || s.sessionVars.RetryInfo.Retrying ||
config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() {
if s.sessionVars.TxnMode == ast.Pessimistic {
s.sessionVars.TxnCtx.IsPessimistic = true
txnMode = ast.Pessimistic
}
}

txnRequest := sessiontxn.CreateEnterNewTxnRequest(s)
if s.sessionVars.TxnCtx.IsPessimistic {
txnRequest.TxnMode(ast.Pessimistic)
} else {
txnRequest.TxnMode(ast.Optimistic)
}
return txnRequest.EnterNewSessionTxn(ctx)
return sessiontxn.GetTxnManager(s).EnterNewTxn(ctx, &sessiontxn.NewTxnRequest{
TxnMode: txnMode,
})
}

// PrepareTSFuture uses to try to get ts future.
Expand Down
68 changes: 65 additions & 3 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import (

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/readcommitted"
"github.com/pingcap/tidb/sessiontxn/staleread"
)

func init() {
Expand Down Expand Up @@ -60,29 +63,76 @@ func (m *txnManager) GetReadTS() (uint64, error) {
if m.ctxProvider == nil {
return 0, errors.New("context provider not set")
}
defer m.handleAutoCommit()
return m.ctxProvider.GetReadTS()
}

func (m *txnManager) GetForUpdateTS() (uint64, error) {
if m.ctxProvider == nil {
return 0, errors.New("context provider not set")
}
defer m.handleAutoCommit()
return m.ctxProvider.GetForUpdateTS()
}

func (m *txnManager) GetContextProvider() sessiontxn.TxnContextProvider {
return m.ctxProvider
}

func (m *txnManager) SetContextProvider(provider sessiontxn.TxnContextProvider) error {
func (m *txnManager) ReplaceContextProvider(provider sessiontxn.TxnContextProvider) error {
m.ctxProvider = provider
return nil
}

func (m *txnManager) EnterNewTxn(ctx context.Context, r *sessiontxn.NewTxnRequest) error {
sessVars := m.sctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
if r.ExplictStart && txnCtx.History == nil && !txnCtx.IsStaleness && r.StaleReadTS == 0 {
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
// always create a new transaction.
return nil
}

txnMode := r.TxnMode
if txnMode == "" {
txnMode = sessVars.TxnMode
}

switch {
case r.StaleReadTS > 0:
// stale read should be the first place to check, `is` is nil because it will be fetched when `ActiveTxn`
m.ctxProvider = staleread.NewStalenessTxnContextProvider(m.sctx, r.StaleReadTS, nil)
case txnMode == ast.Pessimistic && sessVars.IsIsolation(ast.ReadCommitted):
// when txn mode is pessimistic and isolation level is 'READ-COMMITTED', use RC
m.ctxProvider = readcommitted.NewRCTxnContextProvider(m.sctx, r.CausalConsistencyOnly)
default:
// otherwise, use `SimpleTxnContextProvider` for compatibility
m.ctxProvider = &sessiontxn.SimpleTxnContextProvider{Sctx: m.sctx, CausalConsistencyOnly: r.CausalConsistencyOnly}
}

if r.ExplictStart {
if _, err := m.ctxProvider.ActiveTxn(ctx); err != nil {
return err
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
sessVars.SetInTxn(true)
} else {
sessVars.SetInTxn(false)
}

return nil
}

func (m *txnManager) ActiveTxn(ctx context.Context) (kv.Transaction, error) {
if m.ctxProvider == nil {
return nil, errors.New("context provider not set")
}

defer m.handleAutoCommit()
return m.ctxProvider.ActiveTxn(ctx)
}

Expand All @@ -93,11 +143,11 @@ func (m *txnManager) OnStmtStart(ctx context.Context) error {
return m.ctxProvider.OnStmtStart(ctx)
}

func (m *txnManager) OnStmtRetry() error {
func (m *txnManager) OnStmtRetry(ctx context.Context) error {
if m.ctxProvider == nil {
return errors.New("context provider not set")
}
return m.ctxProvider.OnStmtRetry()
return m.ctxProvider.OnStmtRetry(ctx)
}

func (m *txnManager) Advise(opt sessiontxn.AdviceOption, val ...interface{}) error {
Expand All @@ -107,3 +157,15 @@ func (m *txnManager) Advise(opt sessiontxn.AdviceOption, val ...interface{}) err

return m.ctxProvider.Advise(opt, val)
}

func (m *txnManager) handleAutoCommit() {
sessVars := m.sctx.GetSessionVars()
if sessVars.IsAutocommit() {
return
}

tx, _ := m.sctx.Txn(false)
if tx.Valid() {
sessVars.SetInTxn(true)
}
}
20 changes: 15 additions & 5 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type TxnContextProvider interface {
// OnStmtStart is the hook that should be called when a new statement started
OnStmtStart(ctx context.Context) error
// OnStmtRetry is the hook that should be called when a statement is retrying
OnStmtRetry() error
OnStmtRetry(ctx context.Context) error
// Advise is used to provide some extra information to make a better performance.
// For example, we can give `AdviceWarmUpNow` to advice provider prefetch tso.
// Give or not give an advice should not affect the correctness.
Expand Down Expand Up @@ -125,7 +125,7 @@ func (p *SimpleTxnContextProvider) OnStmtStart(ctx context.Context) error {
}

// OnStmtRetry is the hook that should be called when a statement is retrying
func (p *SimpleTxnContextProvider) OnStmtRetry() error {
func (p *SimpleTxnContextProvider) OnStmtRetry(_ context.Context) error {
return nil
}

Expand All @@ -143,6 +143,14 @@ func UsingNonSimpleProvider(sctx sessionctx.Context) bool {
return ok
}

type NewTxnRequest struct {
ExplictStart bool
TxnMode string
CausalConsistencyOnly bool
StaleReadTS uint64
StaleReadInfoSchema infoschema.InfoSchema
}

// TxnManager is an interface providing txn context management in session
type TxnManager interface {
// GetTxnInfoSchema returns the information schema used by txn
Expand All @@ -156,13 +164,15 @@ type TxnManager interface {
ActiveTxn(ctx context.Context) (kv.Transaction, error)
// GetContextProvider returns the current TxnContextProvider
GetContextProvider() TxnContextProvider
// SetContextProvider sets the context provider
SetContextProvider(provider TxnContextProvider) error
// ReplaceContextProvider replaces the context provider
ReplaceContextProvider(provider TxnContextProvider) error
// EnterNewTxn enters a new txn
EnterNewTxn(ctx context.Context, request *NewTxnRequest) error

// OnStmtStart is the hook that should be called when a new statement started
OnStmtStart(ctx context.Context) error
// OnStmtRetry is the hook that should be called when a statement is retrying
OnStmtRetry() error
OnStmtRetry(ctx context.Context) error
// Advise is used to provide some extra information to make a better performance.
// For example, we can give `AdviceWarmUpNow` to advice provider prefetch tso.
// Give or not give an advice should not affect the correctness.
Expand Down
7 changes: 4 additions & 3 deletions sessiontxn/readcommitted/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (s *rcStmtContext) getTS() (ts uint64, err error) {
return ts, nil
}

func (s *rcStmtContext) retry() error {
func (s *rcStmtContext) retry(ctx context.Context) error {
s.ctx = ctx
if s.ts > 0 && s.advisedRetryTS > s.ts {
s.tsFuture = sessiontxn.ConstantTSFuture(s.advisedRetryTS)
} else {
Expand Down Expand Up @@ -207,8 +208,8 @@ func (p *txnContextProvider) OnStmtStart(ctx context.Context) error {
return nil
}

func (p *txnContextProvider) OnStmtRetry() error {
return p.stmt.retry()
func (p *txnContextProvider) OnStmtRetry(ctx context.Context) error {
return p.stmt.retry(ctx)
}

func (p *txnContextProvider) warmUp() error {
Expand Down
2 changes: 1 addition & 1 deletion sessiontxn/staleread/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (p *StalenessTxnContextProvider) OnStmtStart(_ context.Context) error {
}

// OnStmtRetry is the hook that should be called when a statement is retrying
func (p *StalenessTxnContextProvider) OnStmtRetry() error {
func (p *StalenessTxnContextProvider) OnStmtRetry(_ context.Context) error {
return nil
}

Expand Down
92 changes: 0 additions & 92 deletions sessiontxn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ import (
"context"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn/readcommitted"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/tikv/client-go/v2/oracle"
)

Expand Down Expand Up @@ -54,91 +50,3 @@ func GetTxnOracleFuture(ctx context.Context, sctx sessionctx.Context, scope stri
return oracleStore.GetTimestampAsync(ctx, option)
}
}

type EnterNewSessionTxnRequest struct {
sctx sessionctx.Context
explictBegin bool
txnMode string
causalConsistencyOnly bool
staleReadTS uint64
staleReadInfoSchema infoschema.InfoSchema
}

func CreateEnterNewTxnRequest(sctx sessionctx.Context) *EnterNewSessionTxnRequest {
return &EnterNewSessionTxnRequest{
sctx: sctx,
}
}

func (r *EnterNewSessionTxnRequest) ExplictBegin() *EnterNewSessionTxnRequest {
r.explictBegin = true
return r
}

func (r *EnterNewSessionTxnRequest) StaleRead(ts uint64) *EnterNewSessionTxnRequest {
r.staleReadTS = ts
return nil
}

func (r *EnterNewSessionTxnRequest) StaleReadWithInfoSchema(ts uint64, is infoschema.InfoSchema) *EnterNewSessionTxnRequest {
r.staleReadTS = ts
r.staleReadInfoSchema = is
return nil
}

func (r *EnterNewSessionTxnRequest) TxnMode(mode string) *EnterNewSessionTxnRequest {
r.txnMode = mode
return r
}

func (r *EnterNewSessionTxnRequest) CausalConsistencyOnly(val bool) *EnterNewSessionTxnRequest {
r.causalConsistencyOnly = val
return r
}

func (r *EnterNewSessionTxnRequest) EnterNewSessionTxn(ctx context.Context) error {
sessVars := r.sctx.GetSessionVars()
txnCtx := sessVars.TxnCtx
if r.explictBegin && txnCtx.History == nil && !txnCtx.IsStaleness && r.staleReadTS == 0 {
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
// always create a new transaction.
return nil
}

txnMode := r.txnMode
if txnMode == "" {
txnMode = sessVars.TxnMode
}

var provider TxnContextProvider
switch {
case r.staleReadTS > 0:
// stale read should be the first place to check, `is` is nil because it will be fetched when `ActiveTxn`
provider = staleread.NewStalenessTxnContextProvider(r.sctx, r.staleReadTS, nil)
case txnMode == ast.Pessimistic && sessVars.IsIsolation(ast.ReadCommitted):
// when txn mode is pessimistic and isolation level is 'READ-COMMITTED', use RC
provider = readcommitted.NewRCTxnContextProvider(r.sctx, r.causalConsistencyOnly)
default:
// otherwise, use `SimpleTxnContextProvider` for compatibility
provider = &SimpleTxnContextProvider{Sctx: r.sctx, CausalConsistencyOnly: r.causalConsistencyOnly}
}

txnManager := GetTxnManager(r.sctx)
if err := txnManager.SetContextProvider(provider); err == nil {
return err
}

if r.explictBegin {
if _, err := txnManager.ActiveTxn(ctx); err != nil {
return err
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
sessVars.SetInTxn(true)
}

return nil
}

0 comments on commit 93f5e67

Please sign in to comment.