Skip to content

Commit

Permalink
*: refactor rc isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
lcwangchao committed Apr 15, 2022
1 parent b1bf822 commit 7a8522f
Show file tree
Hide file tree
Showing 16 changed files with 399 additions and 141 deletions.
6 changes: 5 additions & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,6 @@ func UpdateForUpdateTS(seCtx sessionctx.Context, newForUpdateTS uint64) error {
}
seCtx.GetSessionVars().TxnCtx.SetForUpdateTS(newForUpdateTS)
txn.SetOption(kv.SnapshotTS, seCtx.GetSessionVars().TxnCtx.GetForUpdateTS())
seCtx.GetSessionVars().TxnCtx.LastRcReadTs = newForUpdateTS
return nil
}

Expand Down Expand Up @@ -805,6 +804,11 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
if err != nil {
return nil, err
}

if err = sessiontxn.GetTxnManager(a.Ctx).OnStmtRetry(); err != nil {
return nil, err
}

e, err := a.buildExecutor()
if err != nil {
return nil, err
Expand Down
47 changes: 4 additions & 43 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,12 @@ import (
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/cteutil"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)

var (
Expand Down Expand Up @@ -1590,12 +1588,6 @@ func (b *executorBuilder) getReadTS() (uint64, error) {
return b.snapshotTS, nil
}

if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
if err := b.refreshForUpdateTSForRC(); err != nil {
return 0, err
}
}

if b.snapshotTS != 0 {
b.snapshotTSCached = true
// Return the cached value.
Expand Down Expand Up @@ -2191,6 +2183,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
// PointGet executor will get conflict error if the ForUpdateTS is older than the latest commitTS,
// so we don't need to update now for better latency.
func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.PhysicalPlan) error {
if sessiontxn.UsingNonSimpleProvider(b.ctx) {
return nil
}

txnCtx := b.ctx.GetSessionVars().TxnCtx
if !txnCtx.IsPessimistic {
return nil
Expand All @@ -2210,44 +2206,9 @@ func (b *executorBuilder) updateForUpdateTSIfNeeded(selectPlan plannercore.Physi
}
return nil
}
// The Repeatable Read transaction use Read Committed level to read data for writing (insert, update, delete, select for update),
// We should always update/refresh the for-update-ts no matter the isolation level is RR or RC.
if b.ctx.GetSessionVars().IsPessimisticReadConsistency() {
return b.refreshForUpdateTSForRC()
}
return UpdateForUpdateTS(b.ctx, 0)
}

// refreshForUpdateTSForRC is used to refresh the for-update-ts for reading data at read consistency level in pessimistic transaction.
// It could use the cached tso from the statement future to avoid get tso many times.
func (b *executorBuilder) refreshForUpdateTSForRC() error {
defer func() {
b.snapshotTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS()
}()
// The first time read-consistency read is executed and `RcReadCheckTS` is enabled, try to use
// the last valid ts as the for update read ts.
if b.ctx.GetSessionVars().StmtCtx.RCCheckTS {
rcReadTS := b.ctx.GetSessionVars().TxnCtx.LastRcReadTs
if rcReadTS == 0 {
rcReadTS = b.ctx.GetSessionVars().TxnCtx.StartTS
}
return UpdateForUpdateTS(b.ctx, rcReadTS)
}
future := b.ctx.GetSessionVars().TxnCtx.GetStmtFutureForRC()
if future == nil {
return nil
}
newForUpdateTS, waitErr := future.Wait()
if waitErr != nil {
logutil.BgLogger().Warn("wait tso failed",
zap.Uint64("startTS", b.ctx.GetSessionVars().TxnCtx.StartTS),
zap.Error(waitErr))
}
b.ctx.GetSessionVars().TxnCtx.SetStmtFutureForRC(nil)
// If newForUpdateTS is 0, it will force to get a new for-update-ts from PD.
return UpdateForUpdateTS(b.ctx, newForUpdateTS)
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down
7 changes: 6 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
txnManager := sessiontxn.GetTxnManager(c.Ctx)
if err = txnManager.OnStmtStart(ctx); err != nil {
return nil, err
}

is := txnManager.GetTxnInfoSchema()
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
if err != nil {
return nil, err
Expand Down
12 changes: 11 additions & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import (
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/readcommitted"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
Expand Down Expand Up @@ -636,7 +638,15 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
if s.CausalConsistencyOnly {
txn.SetOption(kv.GuaranteeLinearizability, false)
}
return nil

var txnCtxProvider sessiontxn.TxnContextProvider
if e.ctx.GetSessionVars().IsPessimisticReadConsistency() {
txnCtxProvider = readcommitted.NewRCTxnContextProvider(e.ctx)
} else {
txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{Sctx: e.ctx}
}

return sessiontxn.GetTxnManager(e.ctx).SetContextProvider(txnCtxProvider)
}

func (e *SimpleExec) executeRevokeRole(ctx context.Context, s *ast.RevokeRoleStmt) error {
Expand Down
16 changes: 2 additions & 14 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1681,21 +1681,9 @@ func (p *preprocessor) initTxnContextProviderIfNecessary(node ast.Node) {
return
}

txnManager := sessiontxn.GetTxnManager(p.ctx)
currentProvider := txnManager.GetContextProvider()

// If currentProvider is nil or not a `SimpleTxnContextProvider`, it means
if currentProvider == nil {
return
}

if _, ok := currentProvider.(*sessiontxn.SimpleTxnContextProvider); !ok {
return
if provider, ok := sessiontxn.GetTxnManager(p.ctx).GetContextProvider().(*sessiontxn.SimpleTxnContextProvider); ok {
provider.InfoSchema = p.ensureInfoSchema()
}

p.err = sessiontxn.GetTxnManager(p.ctx).SetContextProvider(&sessiontxn.SimpleTxnContextProvider{
InfoSchema: p.ensureInfoSchema(),
})
}

func (p *preprocessor) hasAutoConvertWarning(colDef *ast.ColumnDef) bool {
Expand Down
8 changes: 4 additions & 4 deletions planner/funcdep/extract_fd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/require"

"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/parser"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/hint"
"github.com/stretchr/testify/require"
)

func testGetIS(t *testing.T, ctx sessionctx.Context) infoschema.InfoSchema {
Expand Down Expand Up @@ -220,7 +220,7 @@ func TestFDSet_ExtractFD(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.AdviseTxnWarmUp(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestFDSet_ExtractFDForApply(t *testing.T) {
tk.Session().GetSessionVars().PlanColumnID = 0
err = plannercore.Preprocess(tk.Session(), stmt, plannercore.WithPreprocessorReturn(&plannercore.PreprocessorReturn{InfoSchema: is}))
require.NoError(t, err, comment)
tk.Session().PrepareTSFuture(ctx)
require.NoError(t, sessiontxn.AdviseTxnWarmUp(tk.Session()))
builder, _ := plannercore.NewPlanBuilder().Init(tk.Session(), is, &hint.BlockHintProcessor{})
// extract FD to every OP
p, err := builder.Build(ctx, stmt)
Expand Down
10 changes: 8 additions & 2 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/table/temptable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
Expand Down Expand Up @@ -143,12 +144,17 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in
}
if fp != nil {
if !useMaxTS(sctx, fp) {
sctx.PrepareTSFuture(ctx)
if err := sessiontxn.AdviseTxnWarmUp(sctx); err != nil {
return nil, nil, err
}
}
return fp, fp.OutputNames(), nil
}
}
sctx.PrepareTSFuture(ctx)

if err := sessiontxn.AdviseTxnWarmUp(sctx); err != nil {
return nil, nil, err
}

useBinding := sessVars.UsePlanBaselines
stmtNode, ok := node.(ast.StmtNode)
Expand Down
Loading

0 comments on commit 7a8522f

Please sign in to comment.