diff --git a/executor/adapter.go b/executor/adapter.go index b4c4888565c0e..4bee8dbf507b0 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" @@ -195,11 +196,6 @@ type TelemetryInfo struct { type ExecStmt struct { // GoCtx stores parent go context.Context for a stmt. GoCtx context.Context - // SnapshotTS stores the timestamp for stale read. - // It is not equivalent to session variables's snapshot ts, it only use to build the executor. - SnapshotTS uint64 - // IsStaleness means whether this statement use stale read. - IsStaleness bool // ReplicaReadScope indicates the scope the store selector scope the request visited ReplicaReadScope string // InfoSchema stores a reference to the schema information. @@ -234,6 +230,14 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } + + failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() { + sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) + // stale read should not reach here + staleread.AssertStmtStaleness(a.Ctx, false) + sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is) + }) + ctx = a.setPlanLabelForTopSQL(ctx) a.observeStmtBeginForTopSQL() startTs := uint64(math.MaxUint64) @@ -257,7 +261,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec } } if a.PsStmt.Executor == nil { - b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope) + b := newExecutorBuilder(a.Ctx, is, a.Ti, a.ReplicaReadScope) newExecutor := b.build(a.Plan) if b.err != nil { return nil, b.err @@ -266,11 +270,6 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec } pointExecutor := a.PsStmt.Executor.(*PointGetExecutor) - failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() { - sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) - sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is) - }) - if err = pointExecutor.Open(ctx); err != nil { terror.Call(pointExecutor.Close) return nil, err @@ -303,7 +302,7 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool { // It returns the current information schema version that 'a' is using. func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { ret := &plannercore.PreprocessorReturn{} - if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil { + if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.InitTxnContextProvider, plannercore.WithPreprocessorReturn(ret)); err != nil { return 0, err } @@ -314,11 +313,10 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { } sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true) sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema) + staleread.AssertStmtStaleness(a.Ctx, ret.IsStaleness) }) a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema() - a.SnapshotTS = ret.LastSnapshotTS - a.IsStaleness = ret.IsStaleness a.ReplicaReadScope = ret.ReadReplicaScope if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && a.ReplicaReadScope == kv.GlobalReplicaScope { logutil.BgLogger().Warn(fmt.Sprintf("tidb can't read closest replicas due to it haven't %s label", placement.DCLabelKey)) @@ -369,8 +367,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { }() failpoint.Inject("assertStaleTSO", func(val failpoint.Value) { - if n, ok := val.(int); ok && a.IsStaleness { - startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000 + if n, ok := val.(int); ok && staleread.IsStmtStaleness(a.Ctx) { + txnManager := sessiontxn.GetTxnManager(a.Ctx) + ts, err := txnManager.GetReadTS() + if err != nil { + panic(err) + } + startTS := oracle.ExtractPhysical(ts) / 1000 if n != int(startTS) { panic(fmt.Sprintf("different tso %d != %d", n, startTS)) } @@ -863,7 +866,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } - b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope) + b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.ReplicaReadScope) e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index a36ad9eba5f39..68d9e1ed5667d 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -292,7 +292,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi plan.SetSchema(schema) plan.Init(ctx, nil, 0) plan.SetChildren(nil) - b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope) + b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope) exec := b.build(plan) hashAgg := exec.(*HashAggExec) hashAgg.children[0] = src @@ -344,7 +344,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex plan = sg } - b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope) + b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope) return b.build(plan) } @@ -577,7 +577,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f plan = win } - b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope) + b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope) exec := b.build(plan) return exec } @@ -1317,7 +1317,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, keyOff2IdxOff[i] = i } - readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope). + readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope). newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS}) if err != nil { return nil, err @@ -1391,7 +1391,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i])) } - readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope). + readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope). newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS}) if err != nil { return nil, err diff --git a/executor/builder.go b/executor/builder.go index d1e8733c9d224..9c475368709eb 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -47,6 +47,8 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/table" @@ -117,16 +119,27 @@ type CTEStorages struct { IterInTbl cteutil.Storage } -func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder { - return &executorBuilder{ +func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder { + b := &executorBuilder{ ctx: ctx, is: is, Ti: ti, - snapshotTSCached: isStaleness, - snapshotTS: snapshotTS, - isStaleness: isStaleness, + isStaleness: staleread.IsStmtStaleness(ctx), readReplicaScope: replicaReadScope, } + + txnManager := sessiontxn.GetTxnManager(ctx) + if provider, ok := txnManager.GetContextProvider().(*sessiontxn.SimpleTxnContextProvider); ok { + provider.GetReadTSFunc = b.getReadTS + provider.GetForUpdateTSFunc = func() (uint64, error) { + if b.forUpdateTS != 0 { + return b.forUpdateTS, nil + } + return b.getReadTS() + } + } + + return b } // MockPhysicalPlan is used to return a specified executor in when build. @@ -143,9 +156,9 @@ type MockExecutorBuilder struct { } // NewMockExecutorBuilderForTest is ONLY used in test. -func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder { +func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *MockExecutorBuilder { return &MockExecutorBuilder{ - executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)} + executorBuilder: newExecutorBuilder(ctx, is, ti, replicaReadScope)} } // Build builds an executor tree according to `p`. @@ -733,10 +746,6 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor { failpoint.Inject("assertStaleReadValuesSameWithExecuteAndBuilder", func() { // This fail point is used to assert the behavior after refactoring is exactly the same with the previous implement. // Some variables in `plannercore.Execute` is deprecated and only be used for asserting now. - if b.snapshotTS != v.SnapshotTS { - panic(fmt.Sprintf("%d != %d", b.snapshotTS, v.SnapshotTS)) - } - if b.isStaleness != v.IsStaleness { panic(fmt.Sprintf("%v != %v", b.isStaleness, v.IsStaleness)) } @@ -746,7 +755,7 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor { } if v.SnapshotTS != 0 { - is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS) + is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(v.SnapshotTS) if err != nil { panic(err) } @@ -754,13 +763,28 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor { if b.is.SchemaMetaVersion() != is.SchemaMetaVersion() { panic(fmt.Sprintf("%d != %d", b.is.SchemaMetaVersion(), is.SchemaMetaVersion())) } + + ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS() + if err != nil { + panic(e) + } + + if v.SnapshotTS != ts { + panic(fmt.Sprintf("%d != %d", ts, v.SnapshotTS)) + } } }) failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) { vs := strings.Split(val.(string), "_") assertTS, assertTxnScope := vs[0], vs[1] - if strconv.FormatUint(b.snapshotTS, 10) != assertTS || + staleread.AssertStmtStaleness(b.ctx, true) + ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS() + if err != nil { + panic(e) + } + + if strconv.FormatUint(ts, 10) != assertTS || assertTxnScope != b.readReplicaScope { panic("execute prepare statement have wrong staleness option") } @@ -1541,11 +1565,11 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) { return b.dataReaderTS, nil } - if (b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt) && b.forUpdateTS != 0 { - return b.forUpdateTS, nil + txnManager := sessiontxn.GetTxnManager(b.ctx) + if b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt { + return txnManager.GetForUpdateTS() } - - return b.getReadTS() + return txnManager.GetReadTS() } // getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level @@ -1557,6 +1581,11 @@ func (b *executorBuilder) getReadTS() (uint64, error) { // logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the // snapshotTS and and may even use it after the txn being destroyed. In this case, mark // `snapshotTSCached` to skip `refreshForUpdateTSForRC`. + failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() { + // after refactoring stale read will use its own context provider + staleread.AssertStmtStaleness(b.ctx, false) + }) + if b.snapshotTSCached { return b.snapshotTS, nil } diff --git a/executor/compiler.go b/executor/compiler.go index 26a7dde9d6ba7..af678b58bffa3 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -16,7 +16,6 @@ package executor import ( "context" - "fmt" "github.com/opentracing/opentracing-go" "github.com/pingcap/failpoint" @@ -28,6 +27,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/sessiontxn/staleread" ) var ( @@ -71,6 +71,10 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm failpoint.Inject("assertTxnManagerInCompile", func() { sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true) sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema) + if ret.LastSnapshotTS != 0 { + staleread.AssertStmtStaleness(c.Ctx, true) + sessiontxn.AssertTxnManagerReadTS(c.Ctx, ret.LastSnapshotTS) + } }) is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema() @@ -80,11 +84,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) { - expected := val.(bool) - got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness - if got != expected { - panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got)) - } + staleread.AssertStmtStaleness(c.Ctx, val.(bool)) }) CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL) @@ -94,8 +94,6 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } return &ExecStmt{ GoCtx: ctx, - SnapshotTS: ret.LastSnapshotTS, - IsStaleness: ret.IsStaleness, ReplicaReadScope: ret.ReadReplicaScope, InfoSchema: is, Plan: finalPlan, diff --git a/executor/coprocessor.go b/executor/coprocessor.go index 8970629c9f4b8..5df6528d72301 100644 --- a/executor/coprocessor.go +++ b/executor/coprocessor.go @@ -170,7 +170,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec } plan = core.InjectExtraProjection(plan) // Build executor. - b := newExecutorBuilder(h.sctx, is, nil, 0, false, oracle.GlobalTxnScope) + b := newExecutorBuilder(h.sctx, is, nil, oracle.GlobalTxnScope) return b.build(plan), nil } diff --git a/executor/executor.go b/executor/executor.go index 9f8de49a4b844..6361420b19614 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -67,6 +67,7 @@ import ( topsqlstate "github.com/pingcap/tidb/util/topsql/state" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" tikvutil "github.com/tikv/client-go/v2/util" atomicutil "go.uber.org/atomic" "go.uber.org/zap" @@ -1263,7 +1264,7 @@ func init() { ctx = opentracing.ContextWithSpan(ctx, span1) } - e := &executorBuilder{is: is, ctx: sctx} + e := newExecutorBuilder(sctx, is, nil, oracle.GlobalTxnScope) exec := e.build(p) if e.err != nil { return nil, e.err diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index 8330a0f691876..cf73696411343 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -846,7 +846,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i])) } - b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope) + b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope) return b.build(j) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 5c15ced7da3ed..57714dd792fbc 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -3467,7 +3467,7 @@ func TestUnreasonablyClose(t *testing.T) { &plannercore.PhysicalShuffle{}, &plannercore.PhysicalUnionAll{}, } - executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, math.MaxUint64, false, "global") + executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, oracle.GlobalTxnScope) opsNeedsCoveredMask := uint64(1< 0 + if staleness { + txnCtxProvider = staleread.NewStalenessTxnContextProvider(is, snapshotTS) + } else { + txnCtxProvider = &sessiontxn.SimpleTxnContextProvider{ + InfoSchema: is, + } } txnManager := sessiontxn.GetTxnManager(s) @@ -2313,7 +2320,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ } executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL) - ok, err = s.IsCachedExecOk(ctx, preparedStmt, snapshotTS != 0) + ok, err = s.IsCachedExecOk(ctx, preparedStmt, staleness) if err != nil { return nil, err } @@ -2321,7 +2328,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ defer s.txn.onStmtEnd() if ok { - return s.cachedPlanExec(ctx, txnManager.GetTxnInfoSchema(), snapshotTS, stmtID, preparedStmt, replicaReadScope, args) + return s.cachedPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args) } return s.preparedStmtExec(ctx, txnManager.GetTxnInfoSchema(), snapshotTS, stmtID, preparedStmt, replicaReadScope, args) } @@ -2500,7 +2507,7 @@ func (s *session) NewStaleTxnWithStartTS(ctx context.Context, startTS uint64) er TxnScope: txnScope, } s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor()) - return nil + return sessiontxn.GetTxnManager(s).SetContextProvider(staleread.NewStalenessTxnContextProvider(is, txn.StartTS())) } func (s *session) SetValue(key fmt.Stringer, value interface{}) { @@ -3112,7 +3119,7 @@ func (s *session) PrepareTSFuture(ctx context.Context) { return } if !s.txn.validOrPending() { - if s.GetSessionVars().StmtCtx.IsStaleness { + if staleread.IsStmtStaleness(s) { // Do nothing when StmtCtx.IsStaleness is true // we don't need to request tso for stale read return diff --git a/session/txnmanager.go b/session/txnmanager.go index 0e47a8cf7406a..942302aa17a15 100644 --- a/session/txnmanager.go +++ b/session/txnmanager.go @@ -15,6 +15,8 @@ package session import ( + "errors" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessiontxn" @@ -52,6 +54,20 @@ func (m *txnManager) GetTxnInfoSchema() infoschema.InfoSchema { return m.ctxProvider.GetTxnInfoSchema() } +func (m *txnManager) GetReadTS() (uint64, error) { + if m.ctxProvider == nil { + return 0, errors.New("context provider not set") + } + return m.ctxProvider.GetReadTS() +} + +func (m *txnManager) GetForUpdateTS() (uint64, error) { + if m.ctxProvider == nil { + return 0, errors.New("context provider not set") + } + return m.ctxProvider.GetForUpdateTS() +} + func (m *txnManager) GetContextProvider() sessiontxn.TxnContextProvider { return m.ctxProvider } diff --git a/sessionctx/context.go b/sessionctx/context.go index f7394a5e85e7e..dbc55c2635a66 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -71,7 +71,9 @@ type Context interface { // ClearValue clears the value associated with this context for key. ClearValue(key fmt.Stringer) - // Deprecated: Use TxnManager.GetTxnInfoSchema to get the current schema in session + // Deprecated: the semantics of session.GetInfoSchema() is ambiguous + // If you want to get the infoschema of the current transaction in SQL layer, use sessiontxn.GetTxnManager(ctx).GetTxnInfoSchema() + // If you want to get the latest infoschema use domain.GetDomain(ctx).GetInfoSchema() GetInfoSchema() InfoschemaMetaVersion GetSessionVars() *variable.SessionVars diff --git a/sessiontxn/failpoint.go b/sessiontxn/failpoint.go index 1d0a832de1083..64144bc933e83 100644 --- a/sessiontxn/failpoint.go +++ b/sessiontxn/failpoint.go @@ -72,3 +72,15 @@ func AssertTxnManagerInfoSchema(sctx sessionctx.Context, is interface{}) { assertVersion(is) assertVersion(sctx.Value(AssertTxnInfoSchemaKey)) } + +// AssertTxnManagerReadTS is used only for test +func AssertTxnManagerReadTS(sctx sessionctx.Context, expected uint64) { + actual, err := GetTxnManager(sctx).GetReadTS() + if err != nil { + panic(err) + } + + if actual != expected { + panic(fmt.Sprintf("Txn read ts not match, expect:%d, got:%d", expected, actual)) + } +} diff --git a/sessiontxn/interface.go b/sessiontxn/interface.go index fc7357ad10d55..9f22cd2958516 100644 --- a/sessiontxn/interface.go +++ b/sessiontxn/interface.go @@ -15,6 +15,7 @@ package sessiontxn import ( + "github.com/pingcap/errors" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/sessionctx" ) @@ -25,13 +26,19 @@ type TxnContextProvider interface { Initialize(sctx sessionctx.Context) error // GetTxnInfoSchema returns the information schema used by txn GetTxnInfoSchema() infoschema.InfoSchema + // GetReadTS returns the read timestamp used by select statement (not for select ... for update) + GetReadTS() (uint64, error) + // GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update + GetForUpdateTS() (uint64, error) } // SimpleTxnContextProvider implements TxnContextProvider // It is only used in refactor stage // TODO: remove it after refactor finished type SimpleTxnContextProvider struct { - InfoSchema infoschema.InfoSchema + InfoSchema infoschema.InfoSchema + GetReadTSFunc func() (uint64, error) + GetForUpdateTSFunc func() (uint64, error) } // Initialize the provider with session context @@ -44,10 +51,30 @@ func (p *SimpleTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { return p.InfoSchema } +// GetReadTS returns the read timestamp used by select statement (not for select ... for update) +func (p *SimpleTxnContextProvider) GetReadTS() (uint64, error) { + if p.GetReadTSFunc == nil { + return 0, errors.New("ReadTSFunc not set") + } + return p.GetReadTSFunc() +} + +// GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update +func (p *SimpleTxnContextProvider) GetForUpdateTS() (uint64, error) { + if p.GetForUpdateTSFunc == nil { + return 0, errors.New("GetForUpdateTSFunc not set") + } + return p.GetForUpdateTSFunc() +} + // TxnManager is an interface providing txn context management in session type TxnManager interface { // GetTxnInfoSchema returns the information schema used by txn GetTxnInfoSchema() infoschema.InfoSchema + // GetReadTS returns the read timestamp used by select statement (not for select ... for update) + GetReadTS() (uint64, error) + // GetForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update + GetForUpdateTS() (uint64, error) // GetContextProvider returns the current TxnContextProvider GetContextProvider() TxnContextProvider diff --git a/sessiontxn/staleread/failpoint.go b/sessiontxn/staleread/failpoint.go new file mode 100644 index 0000000000000..1e2d184453f7f --- /dev/null +++ b/sessiontxn/staleread/failpoint.go @@ -0,0 +1,37 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "fmt" + + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" +) + +// AssertStmtStaleness is used only for test +func AssertStmtStaleness(sctx sessionctx.Context, expected bool) { + actual := IsStmtStaleness(sctx) + if actual != expected { + panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, actual)) + } + + if expected { + provider := sessiontxn.GetTxnManager(sctx).GetContextProvider() + if _, ok := provider.(*StalenessTxnContextProvider); !ok { + panic(fmt.Sprintf("stale read should be StalenessTxnContextProvider but current provider is: %T", provider)) + } + } +} diff --git a/sessiontxn/staleread/provider.go b/sessiontxn/staleread/provider.go new file mode 100644 index 0000000000000..f0495833bc93c --- /dev/null +++ b/sessiontxn/staleread/provider.go @@ -0,0 +1,55 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/sessionctx" +) + +// StalenessTxnContextProvider implements sessiontxn.TxnContextProvider +type StalenessTxnContextProvider struct { + is infoschema.InfoSchema + ts uint64 +} + +// NewStalenessTxnContextProvider creates a new StalenessTxnContextProvider +func NewStalenessTxnContextProvider(is infoschema.InfoSchema, ts uint64) *StalenessTxnContextProvider { + return &StalenessTxnContextProvider{ + is: is, + ts: ts, + } +} + +// Initialize the provider with session context +func (p *StalenessTxnContextProvider) Initialize(_ sessionctx.Context) error { + return nil +} + +// GetTxnInfoSchema returns the information schema used by txn +func (p *StalenessTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { + return p.is +} + +// GetReadTS returns the read timestamp +func (p *StalenessTxnContextProvider) GetReadTS() (uint64, error) { + return p.ts, nil +} + +// GetForUpdateTS will return an error because stale read does not support it +func (p *StalenessTxnContextProvider) GetForUpdateTS() (uint64, error) { + return 0, errors.New("GetForUpdateTS not supported for stalenessTxnProvider") +} diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index 896b378c0ebd2..2241446ab3110 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -60,3 +60,8 @@ func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Du minTsVal := expression.GetMinSafeTime(sctx) return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil } + +// IsStmtStaleness indicates whether the current statement is staleness or not +func IsStmtStaleness(sctx sessionctx.Context) bool { + return sctx.GetSessionVars().StmtCtx.IsStaleness +} diff --git a/sessiontxn/txn_context_test.go b/sessiontxn/txn_context_test.go index 085166dd3db03..0aec44f8a6ee1 100644 --- a/sessiontxn/txn_context_test.go +++ b/sessiontxn/txn_context_test.go @@ -48,6 +48,7 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return")) @@ -77,6 +78,7 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec")) @@ -400,6 +402,7 @@ func TestTxnContextForHistoricalRead(t *testing.T) { tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint)) is1 := do.InfoSchema() + tk.MustExec("do sleep(0.1)") tk.MustExec("set @a=now(6)") // change schema tk.MustExec("alter table t2 add column(c1 int)") @@ -454,6 +457,7 @@ func TestTxnContextForStaleRead(t *testing.T) { tk.MustExec(fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%s', '') ON DUPLICATE KEY UPDATE variable_value = '%s', comment=''`, safePoint, safePoint)) is1 := do.InfoSchema() + tk.MustExec("do sleep(0.1)") tk.MustExec("set @a=now(6)") time.Sleep(time.Millisecond * 1200) @@ -588,6 +592,7 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) { se := tk.Session() is1 := do.InfoSchema() + tk.MustExec("do sleep(0.1)") tk.MustExec("set @a=now(6)") tk.MustExec("prepare s1 from 'select * from t1 where id=1'") tk.MustExec("prepare s2 from 'select * from t1 as of timestamp @a where id=1 '")