From 21abdcb46e989a6d72d9edb2d00e5dfd866c61bc Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 16:59:21 +0800 Subject: [PATCH 01/14] *: Provide staleReadProcessor to process stale read --- planner/core/planbuilder.go | 31 ----- planner/core/preprocess.go | 136 ++++---------------- sessiontxn/staleread/errors.go | 24 ++++ sessiontxn/staleread/processor.go | 202 ++++++++++++++++++++++++++++++ sessiontxn/staleread/util.go | 56 +++++++++ 5 files changed, 309 insertions(+), 140 deletions(-) create mode 100644 sessiontxn/staleread/errors.go create mode 100644 sessiontxn/staleread/processor.go create mode 100644 sessiontxn/staleread/util.go diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index cf8cb52959f78..84b3370a2691f 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -60,7 +60,6 @@ import ( "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/sqlexec" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/cznic/mathutil" @@ -3097,36 +3096,6 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, return p, nil } -// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS. -func calculateTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) { - tsVal, err := evalAstExpr(sctx, asOfClause.TsExpr) - if err != nil { - return 0, err - } - toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) - // We need at least the millionsecond here, so set fsp to 3. - toTypeTimestamp.Decimal = 3 - tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp) - if err != nil { - return 0, err - } - tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location()) - if err != nil { - return 0, err - } - return oracle.GoTimeToTS(tsTime), nil -} - -func calculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { - nowVal, err := expression.GetStmtTimestamp(sctx) - if err != nil { - return 0, err - } - tsVal := nowVal.Add(readStaleness) - minTsVal := expression.GetMinSafeTime(sctx) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil -} - func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) ([]visitInfo, error) { // To use REVOKE, you must have the GRANT OPTION privilege, // and you must have the privileges that you are granting. diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index f766c8887e093..37fbb8b47a706 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -15,7 +15,6 @@ package core import ( - "context" "fmt" "math" "strings" @@ -38,6 +37,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" @@ -112,7 +112,12 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { // Preprocess resolves table names of the node, and checks some statements' validation. // preprocessReturn used to extract the infoschema for the tableName and the timestamp from the asof clause. func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error { - v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0), withName: make(map[string]interface{})} + v := preprocessor{ + ctx: ctx, + tableAliasInJoin: make([]map[string]interface{}, 0), + withName: make(map[string]interface{}), + staleReadProcessor: staleread.NewStaleReadProcessor(ctx), + } for _, optFn := range preprocessOpt { optFn(&v) } @@ -183,6 +188,8 @@ type preprocessor struct { tableAliasInJoin []map[string]interface{} withName map[string]interface{} + staleReadProcessor staleread.Processor + // values that may be returned *PreprocessorReturn *PreprocessExecuteISUpdate @@ -1442,7 +1449,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } - p.handleAsOfAndReadTS(tn.AsOf) + p.handleAsOfAndReadTS(tn) if p.err != nil { return } @@ -1604,7 +1611,7 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) { } // handleAsOfAndReadTS tries to handle as of closure, or possibly read_ts. -func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { +func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) { if p.stmtTp != TypeSelect { return } @@ -1616,117 +1623,28 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { p.ctx.GetSessionVars().StmtCtx.IsStaleness = true } }() - // When statement is during the Txn, we check whether there exists AsOfClause. If exists, we will return error, - // otherwise we should directly set the return param from TxnCtx. - p.ReadReplicaScope = kv.GlobalReplicaScope - if p.ctx.GetSessionVars().InTxn() { - if node != nil { - p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") - return - } - txnCtx := p.ctx.GetSessionVars().TxnCtx - p.ReadReplicaScope = txnCtx.TxnScope - // It means we meet following case: - // 1. start transaction read only as of timestamp ts - // 2. select statement - if txnCtx.IsStaleness { - p.LastSnapshotTS = txnCtx.StartTS - p.IsStaleness = txnCtx.IsStaleness - p.initedLastSnapshotTS = true - return - } - } - scope := config.GetTxnScopeFromConfig() - if p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && scope != kv.GlobalReplicaScope { - p.ReadReplicaScope = scope - } - // If the statement is in auto-commit mode, we will check whether there exists read_ts, if exists, - // we will directly use it. The txnScope will be defined by the zone label, if it is not set, we will use - // global txnScope directly. - readTS := p.ctx.GetSessionVars().TxnReadTS.UseTxnReadTS() - readStaleness := p.ctx.GetSessionVars().ReadStaleness - var ts uint64 - switch { - case readTS > 0: - ts = readTS - if node != nil { - p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of") - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) { - return ts, nil - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node != nil: - // If we didn't use read_ts, and node isn't nil, it means we use 'select table as of timestamp ... ' - // for stale read - // It means we meet following case: - // select statement with as of timestamp - ts, p.err = calculateTsExpr(p.ctx, node) - if p.err != nil { - return - } - if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil { - p.err = errors.Trace(err) - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) { - return calculateTsExpr(ctx, node) - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node == nil && readStaleness != 0: - // If both readTS and node is empty while the readStaleness isn't, it means we meet following situation: - // set @@tidb_read_staleness='-5'; - // select * from t; - // Then the following select statement should be affected by the tidb_read_staleness in session. - ts, p.err = calculateTsWithReadStaleness(p.ctx, readStaleness) - if p.err != nil { - return - } - if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil { - p.err = errors.Trace(err) - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) { - return calculateTsWithReadStaleness(p.ctx, readStaleness) - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node == nil && readStaleness == 0: - // If both readTS and node is empty while the readStaleness is empty, - // setting p.ReadReplicaScope is necessary to verify the txn scope later - // because we may be in a local txn without using the Stale Read. - p.ReadReplicaScope = scope + if p.err = p.staleReadProcessor.OnSelectTable(tn); p.err != nil { + return } - // If the select statement is related to multi tables, we should grantee that all tables use the same timestamp - if p.LastSnapshotTS != ts { - p.err = ErrAsOf.GenWithStack("can not set different time in the as of") + if p.initedLastSnapshotTS { return } - if p.LastSnapshotTS != 0 { - dom := domain.GetDomain(p.ctx) - is, err := dom.GetSnapshotInfoSchema(p.LastSnapshotTS) - // if infoschema is empty, LastSnapshotTS init failed - if err != nil { - p.err = err - return - } - if is == nil { - p.err = fmt.Errorf("can not get any information schema based on snapshotTS: %d", p.LastSnapshotTS) - return - } - p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is) + + if p.IsStaleness = p.staleReadProcessor.IsStaleness(); p.IsStaleness { + p.initedLastSnapshotTS = true + p.LastSnapshotTS = p.staleReadProcessor.GetStalenessReadTS() + p.SnapshotTSEvaluator = p.staleReadProcessor.GetStalenessTSEvaluatorForPrepare() + p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema() } + + if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() { + p.ReadReplicaScope = config.GetTxnScopeFromConfig() + } else { + p.ReadReplicaScope = kv.GlobalReplicaScope + } + p.initedLastSnapshotTS = true } diff --git a/sessiontxn/staleread/errors.go b/sessiontxn/staleread/errors.go new file mode 100644 index 0000000000000..1d89fa632ccd6 --- /dev/null +++ b/sessiontxn/staleread/errors.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + mysql "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/util/dbterror" +) + +var ( + errAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf) +) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go new file mode 100644 index 0000000000000..4fcd8a202b142 --- /dev/null +++ b/sessiontxn/staleread/processor.go @@ -0,0 +1,202 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/table/temptable" +) + +// StalenessTSEvaluator is a function to get staleness ts +type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) + +// Processor is an interface used to process stale read +type Processor interface { + // IsStaleness indicates that whether we should use the staleness + IsStaleness() bool + // GetStalenessInfoSchema returns the information schema if it is stale read, otherwise returns nil + GetStalenessInfoSchema() infoschema.InfoSchema + // GetStalenessReadTS returns the ts if it is stale read, otherwise returns 0 + GetStalenessReadTS() uint64 + // GetStalenessTSEvaluatorForPrepare returns a function that will be used by prepare to evaluate ts + GetStalenessTSEvaluatorForPrepare() StalenessTSEvaluator + + // OnSelectTable will be called when process table in select statement + OnSelectTable(tn *ast.TableName) error +} + +type baseProcessor struct { + sctx sessionctx.Context + txnManager sessiontxn.TxnManager + + evaluated bool + ts uint64 + tsEvaluator StalenessTSEvaluator + is infoschema.InfoSchema +} + +func (p *baseProcessor) init(sctx sessionctx.Context) { + p.sctx = sctx + p.txnManager = sessiontxn.GetTxnManager(sctx) +} + +func (p *baseProcessor) IsStaleness() bool { + return p.ts != 0 +} + +func (p *baseProcessor) GetStalenessInfoSchema() infoschema.InfoSchema { + return p.is +} + +func (p *baseProcessor) GetStalenessReadTS() uint64 { + return p.ts +} + +func (p *baseProcessor) GetStalenessTSEvaluatorForPrepare() StalenessTSEvaluator { + return p.tsEvaluator +} + +func (p *baseProcessor) OnSelectTable(_ *ast.TableName) error { + return errors.New("not supported") +} + +func (p *baseProcessor) setEvaluatedTS(ts uint64, tsEvaluator StalenessTSEvaluator) error { + if p.evaluated { + if ts != p.ts { + return errAsOf.GenWithStack("can not set different time in the as of") + } + return nil + } + + if ts != 0 { + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + if err != nil { + return err + } + p.is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) + } + + p.ts = ts + p.evaluated = true + p.tsEvaluator = tsEvaluator + return nil +} + +func (p *baseProcessor) useStmtOrTxnReadTS(stmtTS uint64) (ts uint64, err error) { + staleReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS() + if staleReadTS != 0 && stmtTS != 0 { + return 0, errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") + } + + if staleReadTS == 0 { + staleReadTS = stmtTS + } + + return staleReadTS, nil +} + +type staleReadProcessor struct { + baseProcessor +} + +// NewStaleReadProcessor creates a new stale read processor +func NewStaleReadProcessor(sctx sessionctx.Context) Processor { + p := &staleReadProcessor{} + p.init(sctx) + return p +} + +// OnSelectTable will be called when process table in select statement +func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { + // Try to get ts from '... as of timestamp ...' + ts, err := parseAndValidateAsOf(p.sctx, tn.AsOf) + if err != nil { + return err + } + + if p.sctx.GetSessionVars().InTxn() { + // When in explicit txn, it is not allowed to declare stale read in statement + // and the sys variables should also be ignored no matter it is set or not + if ts != 0 { + return errAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") + } + + if txnCtx := p.sctx.GetSessionVars().TxnCtx; txnCtx.IsStaleness { + ts = txnCtx.StartTS + } + + return p.setEvaluatedTS(0, nil) + } + + // Try to get ts from variable `txn_read_ts`, when it is present 'as of' clause in statement should not be allowed + if txnReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS(); txnReadTS != 0 { + if ts != 0 { + return errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") + } + ts = txnReadTS + } + + if ts != 0 { + return p.setEvaluatedTS(0, func(sctx sessionctx.Context) (uint64, error) { + return ts, nil + }) + } + + // Try to get ts from variable `tidb_read_staleness` + evaluator := getTsEvaluatorFromReadStaleness(p.sctx) + if evaluator != nil { + ts, err = evaluator(p.sctx) + if err != nil { + return err + } + } + + return p.setEvaluatedTS(ts, evaluator) +} + +func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64, error) { + if asOf == nil { + return 0, nil + } + + ts, err := CalculateAsOfTsExpr(sctx, asOf) + if err != nil { + return 0, err + } + + if err = sessionctx.ValidateSnapshotReadTS(context.TODO(), sctx, ts); err != nil { + return 0, err + } + + return ts, nil +} + +func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluator { + readStaleness := sctx.GetSessionVars().ReadStaleness + if readStaleness == 0 { + return nil + } + + return func(sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(sctx, readStaleness) + } +} diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go new file mode 100644 index 0000000000000..ab2b03057476a --- /dev/null +++ b/sessiontxn/staleread/util.go @@ -0,0 +1,56 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "time" + + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/tikv/client-go/v2/oracle" +) + +// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS. +func CalculateAsOfTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) { + tsVal, err := expression.EvalAstExpr(sctx, asOfClause.TsExpr) + if err != nil { + return 0, err + } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) + // We need at least the millionsecond here, so set fsp to 3. + toTypeTimestamp.Decimal = 3 + tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp) + if err != nil { + return 0, err + } + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location()) + if err != nil { + return 0, err + } + return oracle.GoTimeToTS(tsTime), nil +} + +func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { + nowVal, err := expression.GetStmtTimestamp(sctx) + if err != nil { + return 0, err + } + tsVal := nowVal.Add(readStaleness) + minTsVal := expression.GetMinSafeTime(sctx) + return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil +} From 0134b551ba627b650a108db333b11be94340596f Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 17:14:52 +0800 Subject: [PATCH 02/14] update --- planner/core/preprocess.go | 1 - 1 file changed, 1 deletion(-) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 37fbb8b47a706..ac391459e6a68 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1633,7 +1633,6 @@ func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) { } if p.IsStaleness = p.staleReadProcessor.IsStaleness(); p.IsStaleness { - p.initedLastSnapshotTS = true p.LastSnapshotTS = p.staleReadProcessor.GetStalenessReadTS() p.SnapshotTSEvaluator = p.staleReadProcessor.GetStalenessTSEvaluatorForPrepare() p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema() From 0c52cb1b6370282c8e173109372ee265cba7948f Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 17:27:05 +0800 Subject: [PATCH 03/14] build --- planner/core/planbuilder.go | 4 +++- sessiontxn/staleread/util.go | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 84b3370a2691f..355c1bf16ae1a 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -24,6 +24,8 @@ import ( "strings" "time" + "github.com/pingcap/tidb/sessiontxn/staleread" + "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -3079,7 +3081,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, case *ast.BeginStmt: readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS() if raw.AsOf != nil { - startTS, err := calculateTsExpr(b.ctx, raw.AsOf) + startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf) if err != nil { return nil, err } diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go index ab2b03057476a..fe7062a60d7ce 100644 --- a/sessiontxn/staleread/util.go +++ b/sessiontxn/staleread/util.go @@ -45,6 +45,7 @@ func CalculateAsOfTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (u return oracle.GoTimeToTS(tsTime), nil } +// CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { nowVal, err := expression.GetStmtTimestamp(sctx) if err != nil { From 970b651dff3f969e8e3a0743111717b80c8a67c4 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 17:49:09 +0800 Subject: [PATCH 04/14] update --- sessiontxn/staleread/processor.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 4fcd8a202b142..a196511a57bee 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -101,19 +101,6 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64, tsEvaluator StalenessTSEvaluat return nil } -func (p *baseProcessor) useStmtOrTxnReadTS(stmtTS uint64) (ts uint64, err error) { - staleReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS() - if staleReadTS != 0 && stmtTS != 0 { - return 0, errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") - } - - if staleReadTS == 0 { - staleReadTS = stmtTS - } - - return staleReadTS, nil -} - type staleReadProcessor struct { baseProcessor } From 76c2a42b73a5ed62c9c968ef6817a7b5d7efd7ca Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 18:23:27 +0800 Subject: [PATCH 05/14] fix --- sessiontxn/staleread/processor.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index a196511a57bee..dbb1e26dcc6a4 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -127,11 +127,12 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { return errAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") } + p.evaluated = true if txnCtx := p.sctx.GetSessionVars().TxnCtx; txnCtx.IsStaleness { - ts = txnCtx.StartTS + p.ts = txnCtx.StartTS + p.is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, txnCtx.InfoSchema.(infoschema.InfoSchema)) } - - return p.setEvaluatedTS(0, nil) + return nil } // Try to get ts from variable `txn_read_ts`, when it is present 'as of' clause in statement should not be allowed @@ -143,7 +144,7 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } if ts != 0 { - return p.setEvaluatedTS(0, func(sctx sessionctx.Context) (uint64, error) { + return p.setEvaluatedTS(ts, func(sctx sessionctx.Context) (uint64, error) { return ts, nil }) } @@ -170,7 +171,7 @@ func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64 return 0, err } - if err = sessionctx.ValidateSnapshotReadTS(context.TODO(), sctx, ts); err != nil { + if err = sessionctx.ValidateStaleReadTS(context.TODO(), sctx, ts); err != nil { return 0, err } From 4ff98bf7a26b5245ebd7d4284f16ece224cb34ce Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 19:06:14 +0800 Subject: [PATCH 06/14] fix --- planner/core/preprocess.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index ac391459e6a68..6ac07f8c7301b 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -25,7 +25,6 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" @@ -1641,7 +1640,7 @@ func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) { if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() { p.ReadReplicaScope = config.GetTxnScopeFromConfig() } else { - p.ReadReplicaScope = kv.GlobalReplicaScope + p.ReadReplicaScope = p.ctx.GetSessionVars().TxnCtx.TxnScope } p.initedLastSnapshotTS = true From c911be86d722fab0784dc9f34c2f880cba313cf5 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 1 Mar 2022 19:11:41 +0800 Subject: [PATCH 07/14] update --- planner/core/planbuilder.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 355c1bf16ae1a..cccc4d7e26a13 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -24,8 +24,7 @@ import ( "strings" "time" - "github.com/pingcap/tidb/sessiontxn/staleread" - + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -47,8 +46,10 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -63,9 +64,6 @@ import ( "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/tikv" - - "github.com/cznic/mathutil" - "github.com/pingcap/tidb/table/tables" "go.uber.org/zap" ) From 7b529127ee1c066c6b4ab38227784a4fdca7a355 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Wed, 2 Mar 2022 15:28:01 +0800 Subject: [PATCH 08/14] add tests --- sessiontxn/staleread/main_test.go | 30 ++++ sessiontxn/staleread/processor.go | 101 +++++++---- sessiontxn/staleread/processor_test.go | 226 +++++++++++++++++++++++++ 3 files changed, 324 insertions(+), 33 deletions(-) create mode 100644 sessiontxn/staleread/main_test.go create mode 100644 sessiontxn/staleread/processor_test.go diff --git a/sessiontxn/staleread/main_test.go b/sessiontxn/staleread/main_test.go new file mode 100644 index 0000000000000..bf62a08c3c3c2 --- /dev/null +++ b/sessiontxn/staleread/main_test.go @@ -0,0 +1,30 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "testing" + + "github.com/pingcap/tidb/util/testbridge" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + testbridge.SetupForCommonTest() + goleak.VerifyTestMain(m, opts...) +} diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index dbb1e26dcc6a4..938d8d546ba5d 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -79,12 +79,37 @@ func (p *baseProcessor) OnSelectTable(_ *ast.TableName) error { return errors.New("not supported") } -func (p *baseProcessor) setEvaluatedTS(ts uint64, tsEvaluator StalenessTSEvaluator) error { +func (p *baseProcessor) setAsNonStaleRead() error { + return p.setEvaluatedValues(0, nil, nil) +} + +func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + if err != nil { + return err + } + return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return ts, nil + }) +} + +func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { + ts, err := evaluator(p.sctx) + if err != nil { + return err + } + + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + if err != nil { + return err + } + + return p.setEvaluatedValues(ts, is, evaluator) +} + +func (p *baseProcessor) setEvaluatedValues(ts uint64, is infoschema.InfoSchema, tsEvaluator StalenessTSEvaluator) error { if p.evaluated { - if ts != p.ts { - return errAsOf.GenWithStack("can not set different time in the as of") - } - return nil + return errors.New("already evaluated") } if ts != 0 { @@ -103,6 +128,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64, tsEvaluator StalenessTSEvaluat type staleReadProcessor struct { baseProcessor + stmtAsOfTs uint64 } // NewStaleReadProcessor creates a new stale read processor @@ -114,51 +140,60 @@ func NewStaleReadProcessor(sctx sessionctx.Context) Processor { // OnSelectTable will be called when process table in select statement func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { - // Try to get ts from '... as of timestamp ...' - ts, err := parseAndValidateAsOf(p.sctx, tn.AsOf) - if err != nil { - return err - } - if p.sctx.GetSessionVars().InTxn() { // When in explicit txn, it is not allowed to declare stale read in statement // and the sys variables should also be ignored no matter it is set or not - if ts != 0 { + if tn.AsOf != nil { return errAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") } - p.evaluated = true + if p.evaluated { + return nil + } + if txnCtx := p.sctx.GetSessionVars().TxnCtx; txnCtx.IsStaleness { - p.ts = txnCtx.StartTS - p.is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, txnCtx.InfoSchema.(infoschema.InfoSchema)) + return p.setEvaluatedValues( + txnCtx.StartTS, + temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, txnCtx.InfoSchema.(infoschema.InfoSchema)), + nil, + ) } - return nil + return p.setAsNonStaleRead() } - // Try to get ts from variable `txn_read_ts`, when it is present 'as of' clause in statement should not be allowed - if txnReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS(); txnReadTS != 0 { - if ts != 0 { - return errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") + stmtAsOfTS, err := parseAndValidateAsOf(p.sctx, tn.AsOf) + if err != nil { + return err + } + + if p.evaluated { + if p.stmtAsOfTs != stmtAsOfTS { + return errAsOf.GenWithStack("can not set different time in the as of") } - ts = txnReadTS + return nil } - if ts != 0 { - return p.setEvaluatedTS(ts, func(sctx sessionctx.Context) (uint64, error) { - return ts, nil - }) + txnReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS() + if txnReadTS > 0 && stmtAsOfTS > 0 { + // `as of` and `@@tx_read_ts` cannot be set in the same time + return errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") } - // Try to get ts from variable `tidb_read_staleness` - evaluator := getTsEvaluatorFromReadStaleness(p.sctx) - if evaluator != nil { - ts, err = evaluator(p.sctx) - if err != nil { - return err - } + if stmtAsOfTS > 0 { + p.stmtAsOfTs = stmtAsOfTS + return p.setEvaluatedTS(stmtAsOfTS) + } + + if txnReadTS > 0 { + return p.setEvaluatedTS(txnReadTS) + } + + // set ts from `@@tidb_read_staleness` when both `as of` and `@@tx_read_ts` are not set + if evaluator := getTsEvaluatorFromReadStaleness(p.sctx); evaluator != nil { + return p.setEvaluatedEvaluator(evaluator) } - return p.setEvaluatedTS(ts, evaluator) + return p.setAsNonStaleRead() } func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64, error) { diff --git a/sessiontxn/staleread/processor_test.go b/sessiontxn/staleread/processor_test.go new file mode 100644 index 0000000000000..fde4c37124804 --- /dev/null +++ b/sessiontxn/staleread/processor_test.go @@ -0,0 +1,226 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread_test + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn/staleread" + "github.com/pingcap/tidb/table/temptable" + "github.com/pingcap/tidb/testkit" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +type staleReadPoint struct { + tk *testkit.TestKit + ts uint64 + dt string + tm time.Time + is infoschema.InfoSchema + tn *ast.TableName +} + +func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.Processor, hasEvaluator bool) { + require.True(t, processor.IsStaleness()) + require.Equal(t, p.ts, processor.GetStalenessReadTS()) + require.Equal(t, p.is.SchemaMetaVersion(), processor.GetStalenessInfoSchema().SchemaMetaVersion()) + require.IsTypef(t, processor.GetStalenessInfoSchema(), temptable.AttachLocalTemporaryTableInfoSchema(p.tk.Session(), p.is), "") + evaluator := processor.GetStalenessTSEvaluatorForPrepare() + if hasEvaluator { + require.NotNil(t, evaluator) + ts, err := evaluator(p.tk.Session()) + require.NoError(t, err) + require.Equal(t, processor.GetStalenessReadTS(), ts) + } else { + require.Nil(t, evaluator) + } +} + +func genStaleReadPoint(t *testing.T, tk *testkit.TestKit) *staleReadPoint { + tk.MustExec("create table if not exists test.t(a bigint)") + tk.MustExec(fmt.Sprintf("alter table test.t alter column a set default %d", time.Now().UnixNano())) + time.Sleep(time.Millisecond * 20) + is := domain.GetDomain(tk.Session()).InfoSchema() + dt := tk.MustQuery("select now(3)").Rows()[0][0].(string) + tm, err := time.ParseInLocation("2006-01-02 15:04:05.999999", dt, tk.Session().GetSessionVars().Location()) + require.NoError(t, err) + ts := oracle.GoTimeToTS(tm) + tn := astTableWithAsOf(t, dt) + return &staleReadPoint{ + tk: tk, + ts: ts, + dt: dt, + tm: tm, + is: is, + tn: tn, + } +} + +func astTableWithAsOf(t *testing.T, dt string) *ast.TableName { + p := parser.New() + var sql string + if dt == "" { + sql = "select * from test.t" + } else { + sql = fmt.Sprintf("select * from test.t as of timestamp '%s'", dt) + } + + stmt, err := p.ParseOneStmt(sql, "", "") + require.NoError(t, err) + sel := stmt.(*ast.SelectStmt) + return sel.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName) +} + +func TestStaleReadProcessorWithSelectTable(t *testing.T) { + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tn := astTableWithAsOf(t, "") + p1 := genStaleReadPoint(t, tk) + p2 := genStaleReadPoint(t, tk) + + // create local temporary table to check processor's infoschema will consider temporary table + tk.MustExec("create temporary table test.t2(a int)") + + // no sys variable just select ... as of ... + processor := createProcessor(t, tk.Session()) + err := processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + err = processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + err = processor.OnSelectTable(p2.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error()) + p1.checkMatchProcessor(t, processor, true) + + // the first select has not 'as of' + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.NoError(t, err) + require.False(t, processor.IsStaleness()) + err = processor.OnSelectTable(p1.tn) + require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error()) + require.False(t, processor.IsStaleness()) + + // 'as of' is not allowed when @@txn_read_ts is set + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error()) + tk.MustExec("set @@tx_read_ts=''") + + // no 'as of' will consume @txn_read_ts + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + p1.checkMatchProcessor(t, processor, true) + tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed() + require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS()) + tk.MustExec("set @@tx_read_ts=''") + + // `@@tidb_read_staleness` + tk.MustExec("set @@tidb_read_staleness=-5") + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.True(t, processor.IsStaleness()) + require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) + expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + require.NoError(t, err) + require.Equal(t, expectedTS, processor.GetStalenessReadTS()) + evaluator := processor.GetStalenessTSEvaluatorForPrepare() + evaluatorTS, err := evaluator(tk.Session()) + require.NoError(t, err) + require.Equal(t, expectedTS, evaluatorTS) + tk.MustExec("set @@tidb_read_staleness=''") + + tk.MustExec("do sleep(0.01)") + evaluatorTS, err = evaluator(tk.Session()) + require.NoError(t, err) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + require.NoError(t, err) + require.Equal(t, expectedTS2, evaluatorTS) + + // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` + tk.MustExec("set @@tidb_read_staleness=-5") + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + tk.MustExec("set @@tidb_read_staleness=''") +} + +func TestStaleReadProcessorInTxn(t *testing.T) { + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tn := astTableWithAsOf(t, "") + p1 := genStaleReadPoint(t, tk) + _ = genStaleReadPoint(t, tk) + + tk.MustExec("begin") + + // no error when there is no 'as of' + processor := createProcessor(t, tk.Session()) + err := processor.OnSelectTable(tn) + require.NoError(t, err) + require.False(t, processor.IsStaleness()) + + // return an error when 'as of' is set + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error()) + tk.MustExec("rollback") + + tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%s'", p1.dt)) + + // processor will use the transaction's stale read context + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, false) + + // sys variables will be ignored in txn + tk.MustExec("set @@tidb_read_staleness=-5") + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, false) + tk.MustExec("set @@tidb_read_staleness=''") +} + +func createProcessor(t *testing.T, se sessionctx.Context) staleread.Processor { + processor := staleread.NewStaleReadProcessor(se) + require.False(t, processor.IsStaleness()) + require.Equal(t, uint64(0), processor.GetStalenessReadTS()) + require.Nil(t, processor.GetStalenessTSEvaluatorForPrepare()) + require.Nil(t, processor.GetStalenessInfoSchema()) + return processor +} From 7fbd50cf38c0a8ebb29eb7e6037bec5602a42df1 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 3 Mar 2022 14:00:47 +0800 Subject: [PATCH 09/14] update --- sessiontxn/staleread/processor.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 938d8d546ba5d..dee9c6026bae8 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -88,6 +88,7 @@ func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { if err != nil { return err } + is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { return ts, nil }) @@ -100,6 +101,7 @@ func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) er } is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) if err != nil { return err } @@ -112,15 +114,8 @@ func (p *baseProcessor) setEvaluatedValues(ts uint64, is infoschema.InfoSchema, return errors.New("already evaluated") } - if ts != 0 { - is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) - if err != nil { - return err - } - p.is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) - } - p.ts = ts + p.is = is p.evaluated = true p.tsEvaluator = tsEvaluator return nil From 7b44a264e4b1b218aee6a09e9ced3c2c381b0aeb Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 3 Mar 2022 16:56:56 +0800 Subject: [PATCH 10/14] comments --- planner/core/preprocess.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 279870664f144..e183c0b85a32e 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -1641,9 +1641,14 @@ func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) { p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema() } + // It is a little hacking for the below codes. `ReadReplicaScope` is used both by stale read's closest read and local txn. + // They are different features and the value for `ReadReplicaScope` will be conflicted in some scenes. + // But because local txn is still an experimental feature, we should make stale read work first. if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() { + // When stale read or closet read is set, we read the tidb's locality as the read replica scope p.ReadReplicaScope = config.GetTxnScopeFromConfig() } else { + // Otherwise, use the scope from TxnCtx for local txn validation p.ReadReplicaScope = p.ctx.GetSessionVars().TxnCtx.TxnScope } From 8dbfa197dbfe31b54b52d3703e8b9e1bcecea4bc Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Fri, 4 Mar 2022 12:33:14 +0800 Subject: [PATCH 11/14] update --- sessiontxn/staleread/processor.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index dee9c6026bae8..9a911ceb098c1 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -26,6 +26,9 @@ import ( "github.com/pingcap/tidb/table/temptable" ) +// enforce implement Processor interface +var _ Processor = &staleReadProcessor{} + // StalenessTSEvaluator is a function to get staleness ts type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) From adfe2684fb1778bc477afaffab19e8c3a65891b4 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 8 Mar 2022 10:41:15 +0800 Subject: [PATCH 12/14] address comments --- sessiontxn/staleread/main_test.go | 30 -------------------------- sessiontxn/staleread/processor.go | 14 +++++++++++- sessiontxn/staleread/processor_test.go | 10 +++++++++ 3 files changed, 23 insertions(+), 31 deletions(-) delete mode 100644 sessiontxn/staleread/main_test.go diff --git a/sessiontxn/staleread/main_test.go b/sessiontxn/staleread/main_test.go deleted file mode 100644 index bf62a08c3c3c2..0000000000000 --- a/sessiontxn/staleread/main_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package staleread - -import ( - "testing" - - "github.com/pingcap/tidb/util/testbridge" - "go.uber.org/goleak" -) - -func TestMain(m *testing.M) { - opts := []goleak.Option{ - goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), - } - testbridge.SetupForCommonTest() - goleak.VerifyTestMain(m, opts...) -} diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 9a911ceb098c1..1c41e24c7156c 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -150,6 +150,9 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { } if txnCtx := p.sctx.GetSessionVars().TxnCtx; txnCtx.IsStaleness { + // It means we meet following case: + // 1. start transaction read only as of timestamp ts + // 2. select statement return p.setEvaluatedValues( txnCtx.StartTS, temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, txnCtx.InfoSchema.(infoschema.InfoSchema)), @@ -159,18 +162,23 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { return p.setAsNonStaleRead() } + // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' stmtAsOfTS, err := parseAndValidateAsOf(p.sctx, tn.AsOf) if err != nil { return err } if p.evaluated { + // If the select statement is related to multi tables, we should guarantee that all tables use the same timestamp if p.stmtAsOfTs != stmtAsOfTS { return errAsOf.GenWithStack("can not set different time in the as of") } return nil } + // If `txnReadTS` is not 0, it means we meet following situation: + // start transaction read only as of timestamp ... + // select from table txnReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS() if txnReadTS > 0 && stmtAsOfTS > 0 { // `as of` and `@@tx_read_ts` cannot be set in the same time @@ -186,11 +194,15 @@ func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { return p.setEvaluatedTS(txnReadTS) } - // set ts from `@@tidb_read_staleness` when both `as of` and `@@tx_read_ts` are not set + // If both txnReadTS and stmtAsOfTS is empty while the readStaleness isn't, it means we meet following situation: + // set @@tidb_read_staleness='-5'; + // select from table + // Then the following select statement should be affected by the tidb_read_staleness in session. if evaluator := getTsEvaluatorFromReadStaleness(p.sctx); evaluator != nil { return p.setEvaluatedEvaluator(evaluator) } + // Otherwise, it means we should not use stale read. return p.setAsNonStaleRead() } diff --git a/sessiontxn/staleread/processor_test.go b/sessiontxn/staleread/processor_test.go index fde4c37124804..55a8270b612b5 100644 --- a/sessiontxn/staleread/processor_test.go +++ b/sessiontxn/staleread/processor_test.go @@ -27,10 +27,20 @@ import ( "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/testbridge" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/oracle" + "go.uber.org/goleak" ) +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + testbridge.SetupForCommonTest() + goleak.VerifyTestMain(m, opts...) +} + type staleReadPoint struct { tk *testkit.TestKit ts uint64 From d6a3e03f18fc76f044c7917fc20b9f7f71e87c88 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 8 Mar 2022 18:43:45 +0800 Subject: [PATCH 13/14] update --- sessiontxn/staleread/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 1c41e24c7156c..77fb17cf9fa0d 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -34,7 +34,7 @@ type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { - // IsStaleness indicates that whether we should use the staleness + // IsStaleness indicates that whether we should use the staleness. IsStaleness() bool // GetStalenessInfoSchema returns the information schema if it is stale read, otherwise returns nil GetStalenessInfoSchema() infoschema.InfoSchema From c350f1eee8aee91a0c86c1e549f574e96073138d Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 8 Mar 2022 18:43:54 +0800 Subject: [PATCH 14/14] update --- sessiontxn/staleread/processor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go index 77fb17cf9fa0d..1c41e24c7156c 100644 --- a/sessiontxn/staleread/processor.go +++ b/sessiontxn/staleread/processor.go @@ -34,7 +34,7 @@ type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) // Processor is an interface used to process stale read type Processor interface { - // IsStaleness indicates that whether we should use the staleness. + // IsStaleness indicates that whether we should use the staleness IsStaleness() bool // GetStalenessInfoSchema returns the information schema if it is stale read, otherwise returns nil GetStalenessInfoSchema() infoschema.InfoSchema