Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: (DRAFT) Refactor stale read code #32325

Closed
wants to merge 15 commits into from
1 change: 1 addition & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ func VerifyTxnScope(txnScope string, physicalTableID int64, is infoschema.InfoSc
return false
}
return true

lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
}

func indexRangesToKVWithoutSplit(sc *stmtctx.StatementContext, tids []int64, idxID int64, ranges []*ranger.Range, memTracker *memory.Tracker, interruptSignal *atomic.Value) ([]kv.KeyRange, error) {
Expand Down
39 changes: 16 additions & 23 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -195,15 +196,6 @@ type TelemetryInfo struct {
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
GoCtx context.Context
// SnapshotTS stores the timestamp for stale read.
// It is not equivalent to session variables's snapshot ts, it only use to build the executor.
SnapshotTS uint64
// IsStaleness means whether this statement use stale read.
IsStaleness bool
// ReplicaReadScope indicates the scope the store selector scope the request visited
ReplicaReadScope string
// InfoSchema stores a reference to the schema information.
InfoSchema infoschema.InfoSchema
// Plan stores a reference to the final physical plan.
Plan plannercore.Plan
// Text represents the origin query text.
Expand Down Expand Up @@ -257,7 +249,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(a.Ctx, a.Ti)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -303,7 +295,8 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
if err := plannercore.
Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

Expand All @@ -316,20 +309,18 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
})

a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema()
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
a.ReplicaReadScope = ret.ReadReplicaScope
if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && a.ReplicaReadScope == kv.GlobalReplicaScope {
txnManager := sessiontxn.GetTxnManager(a.Ctx)
is := sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema()
if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && txnManager.GetReadReplicaScope() == kv.GlobalReplicaScope {
logutil.BgLogger().Warn(fmt.Sprintf("tidb can't read closest replicas due to it haven't %s label", placement.DCLabelKey))
}
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, is)
if err != nil {
return 0, err
}
a.OutputNames = names
a.Plan = p
return a.InfoSchema.SchemaMetaVersion(), nil
return is.SchemaMetaVersion(), nil
}

func (a *ExecStmt) setPlanLabelForTopSQL(ctx context.Context) context.Context {
Expand Down Expand Up @@ -366,8 +357,12 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}()

failpoint.Inject("assertStaleTSO", func(val failpoint.Value) {
if n, ok := val.(int); ok && a.IsStaleness {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n, ok := val.(int); ok && staleread.IsTxnStaleness(a.Ctx) {
ts, err := sessiontxn.GetTxnManager(a.Ctx).GetReadTS()
if err != nil {
panic(err)
}
startTS := oracle.ExtractPhysical(ts) / 1000
if n != int(startTS) {
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
Expand Down Expand Up @@ -846,7 +841,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(ctx, a.Ti)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down Expand Up @@ -964,8 +959,6 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
// Reset DurationParse due to the next statement may not need to be parsed (not a text protocol query).
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
}

// CloseRecordSet will finish the execution of current statement and do some record work
Expand Down
23 changes: 17 additions & 6 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import (
"testing"
"time"

"github.com/pingcap/tidb/kv"

"github.com/pingcap/tidb/sessiontxn"

Comment on lines +30 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please format this part.

"github.com/pingcap/log"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/expression"
Expand All @@ -46,7 +50,6 @@ import (
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/stringutil"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -282,6 +285,14 @@ func defaultAggTestCase(exec string) *aggTestCase {
ctx := mock.NewContext()
ctx.GetSessionVars().InitChunkSize = variable.DefInitChunkSize
ctx.GetSessionVars().MaxChunkSize = variable.DefMaxChunkSize
err := sessiontxn.GetTxnManager(ctx).SetContextProvider(&sessiontxn.SimpleTxnContextProvider{
Sctx: ctx,
InfoSchema: nil,
ReadReplicaScope: kv.GlobalReplicaScope,
})
if err != nil {
panic(err)
}
return &aggTestCase{exec, ast.AggFuncSum, 1000, false, 10000000, 4, true, ctx}
}

Expand All @@ -293,7 +304,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -345,7 +356,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil)
return b.build(plan)
}

Expand Down Expand Up @@ -578,7 +589,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1318,7 +1329,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
keyOff2IdxOff[i] = i
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1392,7 +1403,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down
53 changes: 32 additions & 21 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -116,15 +118,21 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder {
return &executorBuilder{
func newExecutorBuilder(ctx sessionctx.Context, ti *TelemetryInfo) *executorBuilder {
txnManager := sessiontxn.GetTxnManager(ctx)
b := &executorBuilder{
ctx: ctx,
is: is,
is: txnManager.GetTxnInfoSchema(),
Ti: ti,
snapshotTS: snapshotTS,
isStaleness: isStaleness,
readReplicaScope: replicaReadScope,
isStaleness: staleread.IsTxnStaleness(ctx),
readReplicaScope: txnManager.GetReadReplicaScope(),
}

if provider, ok := txnManager.GetContextProvider().(*sessiontxn.SimpleTxnContextProvider); ok {
provider.GetReadTSFunc = b.getReadTS
}

return b
}

// MockPhysicalPlan is used to return a specified executor in when build.
Expand All @@ -141,9 +149,9 @@ type MockExecutorBuilder struct {
}

// NewMockExecutorBuilderForTest is ONLY used in test.
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder {
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, ti *TelemetryInfo) *MockExecutorBuilder {
return &MockExecutorBuilder{
executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)}
executorBuilder: newExecutorBuilder(ctx, ti)}
}

// Build builds an executor tree according to `p`.
Expand Down Expand Up @@ -732,12 +740,6 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
b.snapshotTS = v.SnapshotTS
b.isStaleness = v.IsStaleness
b.readReplicaScope = v.ReadReplicaScope
if b.snapshotTS != 0 {
b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
}
e := &ExecuteExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand All @@ -751,7 +753,17 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
if strconv.FormatUint(b.snapshotTS, 10) != assertTS ||

if !staleread.IsTxnStaleness(b.ctx) {
panic("is not staleness")
}

staleReadTS, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS()
if err != nil {
panic(err)
}

if strconv.FormatUint(staleReadTS, 10) != assertTS ||
assertTxnScope != b.readReplicaScope {
panic("execute prepare statement have wrong staleness option")
}
Expand Down Expand Up @@ -800,11 +812,10 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
staleTxnStartTS: v.StaleTxnStartTS,
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
}
return e
}
Expand Down Expand Up @@ -1500,7 +1511,7 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return b.forUpdateTS, nil
}

return b.getReadTS()
return sessiontxn.GetTxnManager(b.ctx).GetReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
Expand Down
26 changes: 12 additions & 14 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/pingcap/tidb/sessiontxn/staleread"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -62,7 +64,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
stmtNode,
plannercore.WithPreprocessorReturn(ret),
plannercore.WithExecuteInfoSchemaUpdate(pe),
plannercore.InitTxnContextProvider,
plannercore.InitTxnContext,
)
if err != nil {
return nil, err
Expand All @@ -81,7 +83,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm

failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) {
expected := val.(bool)
got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness
got := staleread.IsTxnStaleness(c.Ctx)
if got != expected {
panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got))
}
Expand All @@ -93,18 +95,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
lowerPriority = needLowerPriority(finalPlan)
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.LastSnapshotTS,
IsStaleness: ret.IsStaleness,
ReplicaReadScope: ret.ReadReplicaScope,
InfoSchema: is,
Plan: finalPlan,
LowerPriority: lowerPriority,
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
GoCtx: ctx,
Plan: finalPlan,
LowerPriority: lowerPriority,
Text: stmtNode.Text(),
StmtNode: stmtNode,
Ctx: c.Ctx,
OutputNames: names,
Ti: &TelemetryInfo{},
}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
)

// CoprocessorDAGHandler uses to handle cop dag request.
Expand Down Expand Up @@ -170,7 +169,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(h.sctx, nil)
return b.build(plan), nil
}

Expand Down
9 changes: 4 additions & 5 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,13 @@ func (e *DDLExec) createSessionTemporaryTable(s *ast.CreateTableStmt) error {
}

func (e *DDLExec) executeCreateView(s *ast.CreateViewStmt) error {
ret := &core.PreprocessorReturn{}
err := core.Preprocess(e.ctx, s.Select, core.WithPreprocessorReturn(ret))
err := core.Preprocess(e.ctx, s.Select, core.ValidateCreateView(func() error {
return ErrViewInvalid.GenWithStackByArgs(s.ViewName.Schema, s.ViewName.Name)
}))

if err != nil {
return errors.Trace(err)
}
if ret.IsStaleness {
return ErrViewInvalid.GenWithStackByArgs(s.ViewName.Schema.L, s.ViewName.Name.L)
}

return domain.GetDomain(e.ctx).DDL().CreateView(e.ctx, s)
}
Expand Down
Loading