Skip to content

Commit

Permalink
*: Move stale read context to TxnManager (#33812)
Browse files Browse the repository at this point in the history
close #31954, close #33014, close #33832
  • Loading branch information
lcwangchao authored Apr 25, 2022
1 parent 9fc73ac commit d23a814
Show file tree
Hide file tree
Showing 22 changed files with 351 additions and 102 deletions.
37 changes: 20 additions & 17 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -195,11 +196,6 @@ type TelemetryInfo struct {
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
GoCtx context.Context
// SnapshotTS stores the timestamp for stale read.
// It is not equivalent to session variables's snapshot ts, it only use to build the executor.
SnapshotTS uint64
// IsStaleness means whether this statement use stale read.
IsStaleness bool
// ReplicaReadScope indicates the scope the store selector scope the request visited
ReplicaReadScope string
// InfoSchema stores a reference to the schema information.
Expand Down Expand Up @@ -234,6 +230,14 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
staleread.AssertStmtStaleness(a.Ctx, false)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
startTs := uint64(math.MaxUint64)
Expand All @@ -257,7 +261,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.ReplicaReadScope)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand All @@ -266,11 +270,6 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
Expand Down Expand Up @@ -303,7 +302,7 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.InitTxnContextProvider, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

Expand All @@ -314,11 +313,10 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
staleread.AssertStmtStaleness(a.Ctx, ret.IsStaleness)
})

a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema()
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
a.ReplicaReadScope = ret.ReadReplicaScope
if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && a.ReplicaReadScope == kv.GlobalReplicaScope {
logutil.BgLogger().Warn(fmt.Sprintf("tidb can't read closest replicas due to it haven't %s label", placement.DCLabelKey))
Expand Down Expand Up @@ -369,8 +367,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}()

failpoint.Inject("assertStaleTSO", func(val failpoint.Value) {
if n, ok := val.(int); ok && a.IsStaleness {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n, ok := val.(int); ok && staleread.IsStmtStaleness(a.Ctx) {
txnManager := sessiontxn.GetTxnManager(a.Ctx)
ts, err := txnManager.GetReadTS()
if err != nil {
panic(err)
}
startTS := oracle.ExtractPhysical(ts) / 1000
if n != int(startTS) {
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
Expand Down Expand Up @@ -863,7 +866,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.ReplicaReadScope)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
10 changes: 5 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -344,7 +344,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
return b.build(plan)
}

Expand Down Expand Up @@ -577,7 +577,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
keyOff2IdxOff[i] = i
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down
63 changes: 46 additions & 17 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -117,16 +119,27 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder {
return &executorBuilder{
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder {
b := &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
snapshotTSCached: isStaleness,
snapshotTS: snapshotTS,
isStaleness: isStaleness,
isStaleness: staleread.IsStmtStaleness(ctx),
readReplicaScope: replicaReadScope,
}

txnManager := sessiontxn.GetTxnManager(ctx)
if provider, ok := txnManager.GetContextProvider().(*sessiontxn.SimpleTxnContextProvider); ok {
provider.GetReadTSFunc = b.getReadTS
provider.GetForUpdateTSFunc = func() (uint64, error) {
if b.forUpdateTS != 0 {
return b.forUpdateTS, nil
}
return b.getReadTS()
}
}

return b
}

// MockPhysicalPlan is used to return a specified executor in when build.
Expand All @@ -143,9 +156,9 @@ type MockExecutorBuilder struct {
}

// NewMockExecutorBuilderForTest is ONLY used in test.
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder {
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *MockExecutorBuilder {
return &MockExecutorBuilder{
executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)}
executorBuilder: newExecutorBuilder(ctx, is, ti, replicaReadScope)}
}

// Build builds an executor tree according to `p`.
Expand Down Expand Up @@ -733,10 +746,6 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
failpoint.Inject("assertStaleReadValuesSameWithExecuteAndBuilder", func() {
// This fail point is used to assert the behavior after refactoring is exactly the same with the previous implement.
// Some variables in `plannercore.Execute` is deprecated and only be used for asserting now.
if b.snapshotTS != v.SnapshotTS {
panic(fmt.Sprintf("%d != %d", b.snapshotTS, v.SnapshotTS))
}

if b.isStaleness != v.IsStaleness {
panic(fmt.Sprintf("%v != %v", b.isStaleness, v.IsStaleness))
}
Expand All @@ -746,21 +755,36 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
}

if v.SnapshotTS != 0 {
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(v.SnapshotTS)
if err != nil {
panic(err)
}

if b.is.SchemaMetaVersion() != is.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", b.is.SchemaMetaVersion(), is.SchemaMetaVersion()))
}

ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS()
if err != nil {
panic(e)
}

if v.SnapshotTS != ts {
panic(fmt.Sprintf("%d != %d", ts, v.SnapshotTS))
}
}
})

failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
if strconv.FormatUint(b.snapshotTS, 10) != assertTS ||
staleread.AssertStmtStaleness(b.ctx, true)
ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS()
if err != nil {
panic(e)
}

if strconv.FormatUint(ts, 10) != assertTS ||
assertTxnScope != b.readReplicaScope {
panic("execute prepare statement have wrong staleness option")
}
Expand Down Expand Up @@ -1541,11 +1565,11 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return b.dataReaderTS, nil
}

if (b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt) && b.forUpdateTS != 0 {
return b.forUpdateTS, nil
txnManager := sessiontxn.GetTxnManager(b.ctx)
if b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt {
return txnManager.GetForUpdateTS()
}

return b.getReadTS()
return txnManager.GetReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
Expand All @@ -1557,6 +1581,11 @@ func (b *executorBuilder) getReadTS() (uint64, error) {
// logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the
// snapshotTS and and may even use it after the txn being destroyed. In this case, mark
// `snapshotTSCached` to skip `refreshForUpdateTSForRC`.
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
})

if b.snapshotTSCached {
return b.snapshotTS, nil
}
Expand Down
14 changes: 6 additions & 8 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package executor

import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
Expand All @@ -28,6 +27,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
)

var (
Expand Down Expand Up @@ -71,6 +71,10 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema)
if ret.LastSnapshotTS != 0 {
staleread.AssertStmtStaleness(c.Ctx, true)
sessiontxn.AssertTxnManagerReadTS(c.Ctx, ret.LastSnapshotTS)
}
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
Expand All @@ -80,11 +84,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}

failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) {
expected := val.(bool)
got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness
if got != expected {
panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got))
}
staleread.AssertStmtStaleness(c.Ctx, val.(bool))
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
Expand All @@ -94,8 +94,6 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.LastSnapshotTS,
IsStaleness: ret.IsStaleness,
ReplicaReadScope: ret.ReadReplicaScope,
InfoSchema: is,
Plan: finalPlan,
Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(h.sctx, is, nil, oracle.GlobalTxnScope)
return b.build(plan), nil
}

Expand Down
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -1263,7 +1264,7 @@ func init() {
ctx = opentracing.ContextWithSpan(ctx, span1)
}

e := &executorBuilder{is: is, ctx: sctx}
e := newExecutorBuilder(sctx, is, nil, oracle.GlobalTxnScope)
exec := e.build(p)
if e.err != nil {
return nil, e.err
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i]))
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
return b.build(j)
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3467,7 +3467,7 @@ func TestUnreasonablyClose(t *testing.T) {
&plannercore.PhysicalShuffle{},
&plannercore.PhysicalUnionAll{},
}
executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, math.MaxUint64, false, "global")
executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, oracle.GlobalTxnScope)

opsNeedsCoveredMask := uint64(1<<len(opsNeedsCovered) - 1)
opsAlreadyCoveredMask := uint64(0)
Expand Down
Loading

0 comments on commit d23a814

Please sign in to comment.