Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Apr 15, 2022
1 parent 7a8522f commit e977efa
Show file tree
Hide file tree
Showing 9 changed files with 338 additions and 109 deletions.
8 changes: 8 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
// the async rollback operation rollbacked the lock just acquired
if err != nil {
tsErr := UpdateForUpdateTS(a.Ctx, 0)
if tsErr == nil {
tsErr = sessiontxn.AdviseNoConflictUpdateTS(a.Ctx, a.Ctx.GetSessionVars().TxnCtx.GetForUpdateTS())
}

if tsErr != nil {
logutil.Logger(ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr))
}
Expand All @@ -805,6 +809,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
return nil, err
}

if err = sessiontxn.AdviseNoConflictUpdateTS(a.Ctx, a.Ctx.GetSessionVars().TxnCtx.GetForUpdateTS()); err != nil {
return nil, err
}

if err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(); err != nil {
return nil, err
}
Expand Down
53 changes: 6 additions & 47 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/readcommitted"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -592,61 +591,21 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
}
}
if e.staleTxnStartTS > 0 {
if err := e.ctx.NewStaleTxnWithStartTS(ctx, e.staleTxnStartTS); err != nil {
return err
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
vars := e.ctx.GetSessionVars()
if err := vars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil {
return errors.Trace(err)
}
vars.SetInTxn(true)
return nil
}
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
// If the last un-committed/un-rollback transaction is a time-bounded read-only transaction, we should
// always create a new transaction.
txnCtx := e.ctx.GetSessionVars().TxnCtx
if txnCtx.History != nil || txnCtx.IsStaleness {
err := e.ctx.NewTxn(ctx)
if err != nil {
return err
}
}
// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
// Call ctx.Txn(true) to active pending txn.
txnMode := s.Mode
if txnMode == "" {
txnMode = e.ctx.GetSessionVars().TxnMode
}
if txnMode == ast.Pessimistic {
e.ctx.GetSessionVars().TxnCtx.IsPessimistic = true
}
txn, err := e.ctx.Txn(true)
if err != nil {
return err
}
if e.ctx.GetSessionVars().TxnCtx.IsPessimistic {
txn.SetOption(kv.Pessimistic, true)
}
if s.CausalConsistencyOnly {
txn.SetOption(kv.GuaranteeLinearizability, false)
}

var txnCtxProvider sessiontxn.TxnContextProvider
if e.ctx.GetSessionVars().IsPessimisticReadConsistency() {
txnCtxProvider = readcommitted.NewRCTxnContextProvider(e.ctx)
} else {
txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{Sctx: e.ctx}
}

return sessiontxn.GetTxnManager(e.ctx).SetContextProvider(txnCtxProvider)
return sessiontxn.CreateEnterNewTxnRequest(e.ctx).
ExplictBegin().
StaleRead(e.staleTxnStartTS).
TxnMode(s.Mode).
CausalConsistencyOnly(s.CausalConsistencyOnly).
EnterNewSessionTxn(ctx)
}

func (e *SimpleExec) executeRevokeRole(ctx context.Context, s *ast.RevokeRoleStmt) error {
Expand Down
3 changes: 2 additions & 1 deletion planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1630,8 +1630,9 @@ func (p *preprocessor) updateStateFromStaleReadProcessor() error {
p.ctx.GetSessionVars().StmtCtx.IsStaleness = true
if !p.ctx.GetSessionVars().InTxn() {
err := sessiontxn.GetTxnManager(p.ctx).SetContextProvider(staleread.NewStalenessTxnContextProvider(
p.InfoSchema,
p.ctx,
p.LastSnapshotTS,
p.InfoSchema,
))

if err != nil {
Expand Down
22 changes: 7 additions & 15 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/readcommitted"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -2321,7 +2320,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
snapshotTS = staleReadProcessor.GetStalenessReadTS()
is = staleReadProcessor.GetStalenessInfoSchema()
replicaReadScope = config.GetTxnScopeFromConfig()
if err = txnManager.SetContextProvider(staleread.NewStalenessTxnContextProvider(is, snapshotTS)); err != nil {
if err = txnManager.SetContextProvider(staleread.NewStalenessTxnContextProvider(s, snapshotTS, is)); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -2477,9 +2476,6 @@ func (s *session) NewTxn(ctx context.Context) error {
TxnScope: s.sessionVars.CheckAndGetTxnScope(),
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.sessionVars.IsPessimisticReadConsistency() {

}
return nil
}

Expand Down Expand Up @@ -2527,7 +2523,7 @@ func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) er
TxnScope: txnScope,
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
return sessiontxn.GetTxnManager(s).SetContextProvider(staleread.NewStalenessTxnContextProvider(is, txn.StartTS()))
return nil
}

func (s *session) SetValue(key fmt.Stringer, value interface{}) {
Expand Down Expand Up @@ -3125,17 +3121,13 @@ func (s *session) PrepareTxnCtx(ctx context.Context) error {
}
}

var txnCtxProvider sessiontxn.TxnContextProvider
if !s.sessionVars.IsPessimisticReadConsistency() {
txnCtxProvider = readcommitted.NewRCTxnContextProvider(s)
txnRequest := sessiontxn.CreateEnterNewTxnRequest(s)
if s.sessionVars.TxnCtx.IsPessimistic {
txnRequest.TxnMode(ast.Pessimistic)
} else {
txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{
Sctx: s,
InfoSchema: is.(infoschema.InfoSchema),
}
txnRequest.TxnMode(ast.Optimistic)
}

return sessiontxn.GetTxnManager(s).SetContextProvider(txnCtxProvider)
return txnRequest.EnterNewSessionTxn(ctx)
}

// PrepareTSFuture uses to try to get ts future.
Expand Down
8 changes: 8 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
)
Expand Down Expand Up @@ -78,6 +79,13 @@ func (m *txnManager) SetContextProvider(provider sessiontxn.TxnContextProvider)
return nil
}

func (m *txnManager) ActiveTxn(ctx context.Context) (kv.Transaction, error) {
if m.ctxProvider == nil {
return nil, errors.New("context provider not set")
}
return m.ctxProvider.ActiveTxn(ctx)
}

func (m *txnManager) OnStmtStart(ctx context.Context) error {
if m.ctxProvider == nil {
return errors.New("context provider not set")
Expand Down
67 changes: 62 additions & 5 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
)

type AdviceOption int64

const (
AdviceWarmUpNow AdviceOption = iota
AdviceNoConflictForUpdateTS
)

// TxnContextProvider provides txn context
Expand All @@ -37,6 +40,8 @@ type TxnContextProvider interface {
// GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
GetForUpdateTS() (uint64, error)

// ActiveTxn actives the txn
ActiveTxn(ctx context.Context) (kv.Transaction, error)
// OnStmtStart is the hook that should be called when a new statement started
OnStmtStart(ctx context.Context) error
// OnStmtRetry is the hook that should be called when a statement is retrying
Expand All @@ -51,11 +56,13 @@ type TxnContextProvider interface {
// It is only used in refactor stage
// TODO: remove it after refactor finished
type SimpleTxnContextProvider struct {
ctx context.Context
Sctx sessionctx.Context
InfoSchema infoschema.InfoSchema
GetReadTSFunc func() (uint64, error)
GetForUpdateTSFunc func() (uint64, error)
ctx context.Context
Sctx sessionctx.Context
CausalConsistencyOnly bool
stmtStarted bool
InfoSchema infoschema.InfoSchema
GetReadTSFunc func() (uint64, error)
GetForUpdateTSFunc func() (uint64, error)
}

// GetTxnInfoSchema returns the information schema used by txn
Expand All @@ -79,10 +86,41 @@ func (p *SimpleTxnContextProvider) GetForUpdateTS() (uint64, error) {
return p.GetForUpdateTSFunc()
}

// ActiveTxn actives the txn
func (p *SimpleTxnContextProvider) ActiveTxn(ctx context.Context) (kv.Transaction, error) {
if !p.stmtStarted {
if err := p.Sctx.NewTxn(ctx); err != nil {
return nil, err
}
} else {
p.Sctx.PrepareTSFuture(ctx)
}

txnMode, _ := ctx.Value("txnMode").(string)
if txnMode == "" {
txnMode = p.Sctx.GetSessionVars().TxnMode
}
if txnMode == ast.Pessimistic {
p.Sctx.GetSessionVars().TxnCtx.IsPessimistic = true
}
txn, err := p.Sctx.Txn(true)
if err != nil {
return nil, err
}
if p.Sctx.GetSessionVars().TxnCtx.IsPessimistic {
txn.SetOption(kv.Pessimistic, true)
}
if p.CausalConsistencyOnly {
txn.SetOption(kv.GuaranteeLinearizability, false)
}
return txn, nil
}

// OnStmtStart is the hook that should be called when a new statement started
func (p *SimpleTxnContextProvider) OnStmtStart(ctx context.Context) error {
p.ctx = ctx
p.InfoSchema = p.Sctx.GetInfoSchema().(infoschema.InfoSchema)
p.stmtStarted = true
return nil
}

Expand Down Expand Up @@ -114,6 +152,8 @@ type TxnManager interface {
// GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
GetForUpdateTS() (uint64, error)

// ActiveTxn actives the txn
ActiveTxn(ctx context.Context) (kv.Transaction, error)
// GetContextProvider returns the current TxnContextProvider
GetContextProvider() TxnContextProvider
// SetContextProvider sets the context provider
Expand All @@ -133,5 +173,22 @@ func AdviseTxnWarmUp(sctx sessionctx.Context) error {
return GetTxnManager(sctx).Advise(AdviceWarmUpNow)
}

func AdviseNoConflictUpdateTS(sctx sessionctx.Context, ts uint64) error {
return GetTxnManager(sctx).Advise(AdviceNoConflictForUpdateTS, ts)
}

func WithValUint64(val []interface{}, fn func(uint64) error) error {
if len(val) == 0 {
return errors.New("%v need a uint64 arg")
}

if ts, ok := val[0].(uint64); ok {
return fn(ts)
}

return errors.New("%v need a uint64 arg")

}

// GetTxnManager returns the TxnManager object from session context
var GetTxnManager func(sctx sessionctx.Context) TxnManager
Loading

0 comments on commit e977efa

Please sign in to comment.