diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 8133976912756..a9eeaab3051e7 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -24,6 +24,7 @@ import ( "strings" "time" + "github.com/cznic/mathutil" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -45,8 +46,10 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" driver "github.com/pingcap/tidb/types/parser_driver" @@ -60,11 +63,7 @@ import ( "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/sqlexec" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - - "github.com/cznic/mathutil" - "github.com/pingcap/tidb/table/tables" "go.uber.org/zap" ) @@ -3121,7 +3120,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, case *ast.BeginStmt: readTS := b.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS() if raw.AsOf != nil { - startTS, err := calculateTsExpr(b.ctx, raw.AsOf) + startTS, err := staleread.CalculateAsOfTsExpr(b.ctx, raw.AsOf) if err != nil { return nil, err } @@ -3138,36 +3137,6 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, return p, nil } -// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS. -func calculateTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) { - tsVal, err := evalAstExpr(sctx, asOfClause.TsExpr) - if err != nil { - return 0, err - } - toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) - // We need at least the millionsecond here, so set fsp to 3. - toTypeTimestamp.Decimal = 3 - tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp) - if err != nil { - return 0, err - } - tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location()) - if err != nil { - return 0, err - } - return oracle.GoTimeToTS(tsTime), nil -} - -func calculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { - nowVal, err := expression.GetStmtTimestamp(sctx) - if err != nil { - return 0, err - } - tsVal := nowVal.Add(readStaleness) - minTsVal := expression.GetMinSafeTime(sctx) - return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil -} - func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) ([]visitInfo, error) { // To use REVOKE, you must have the GRANT OPTION privilege, // and you must have the privileges that you are granting. diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index ecc6cffa9d3f0..e183c0b85a32e 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -15,7 +15,6 @@ package core import ( - "context" "fmt" "math" "strings" @@ -26,7 +25,6 @@ import ( "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser" "github.com/pingcap/tidb/parser/ast" @@ -39,6 +37,7 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/temptable" "github.com/pingcap/tidb/types" @@ -113,7 +112,12 @@ func TryAddExtraLimit(ctx sessionctx.Context, node ast.StmtNode) ast.StmtNode { // Preprocess resolves table names of the node, and checks some statements' validation. // preprocessReturn used to extract the infoschema for the tableName and the timestamp from the asof clause. func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...PreprocessOpt) error { - v := preprocessor{ctx: ctx, tableAliasInJoin: make([]map[string]interface{}, 0), withName: make(map[string]interface{})} + v := preprocessor{ + ctx: ctx, + tableAliasInJoin: make([]map[string]interface{}, 0), + withName: make(map[string]interface{}), + staleReadProcessor: staleread.NewStaleReadProcessor(ctx), + } for _, optFn := range preprocessOpt { optFn(&v) } @@ -184,6 +188,8 @@ type preprocessor struct { tableAliasInJoin []map[string]interface{} withName map[string]interface{} + staleReadProcessor staleread.Processor + // values that may be returned *PreprocessorReturn *PreprocessExecuteISUpdate @@ -1446,7 +1452,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } - p.handleAsOfAndReadTS(tn.AsOf) + p.handleAsOfAndReadTS(tn) if p.err != nil { return } @@ -1608,7 +1614,7 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) { } // handleAsOfAndReadTS tries to handle as of closure, or possibly read_ts. -func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { +func (p *preprocessor) handleAsOfAndReadTS(tn *ast.TableName) { if p.stmtTp != TypeSelect { return } @@ -1620,117 +1626,32 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { p.ctx.GetSessionVars().StmtCtx.IsStaleness = true } }() - // When statement is during the Txn, we check whether there exists AsOfClause. If exists, we will return error, - // otherwise we should directly set the return param from TxnCtx. - p.ReadReplicaScope = kv.GlobalReplicaScope - if p.ctx.GetSessionVars().InTxn() { - if node != nil { - p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") - return - } - txnCtx := p.ctx.GetSessionVars().TxnCtx - p.ReadReplicaScope = txnCtx.TxnScope - // It means we meet following case: - // 1. start transaction read only as of timestamp ts - // 2. select statement - if txnCtx.IsStaleness { - p.LastSnapshotTS = txnCtx.StartTS - p.IsStaleness = txnCtx.IsStaleness - p.initedLastSnapshotTS = true - return - } - } - scope := config.GetTxnScopeFromConfig() - if p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && scope != kv.GlobalReplicaScope { - p.ReadReplicaScope = scope - } - // If the statement is in auto-commit mode, we will check whether there exists read_ts, if exists, - // we will directly use it. The txnScope will be defined by the zone label, if it is not set, we will use - // global txnScope directly. - readTS := p.ctx.GetSessionVars().TxnReadTS.UseTxnReadTS() - readStaleness := p.ctx.GetSessionVars().ReadStaleness - var ts uint64 - switch { - case readTS > 0: - ts = readTS - if node != nil { - p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of") - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) { - return ts, nil - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node != nil: - // If we didn't use read_ts, and node isn't nil, it means we use 'select table as of timestamp ... ' - // for stale read - // It means we meet following case: - // select statement with as of timestamp - ts, p.err = calculateTsExpr(p.ctx, node) - if p.err != nil { - return - } - if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil { - p.err = errors.Trace(err) - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) { - return calculateTsExpr(ctx, node) - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node == nil && readStaleness != 0: - // If both readTS and node is empty while the readStaleness isn't, it means we meet following situation: - // set @@tidb_read_staleness='-5'; - // select * from t; - // Then the following select statement should be affected by the tidb_read_staleness in session. - ts, p.err = calculateTsWithReadStaleness(p.ctx, readStaleness) - if p.err != nil { - return - } - if err := sessionctx.ValidateStaleReadTS(context.Background(), p.ctx, ts); err != nil { - p.err = errors.Trace(err) - return - } - if !p.initedLastSnapshotTS { - p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) { - return calculateTsWithReadStaleness(p.ctx, readStaleness) - } - p.LastSnapshotTS = ts - p.IsStaleness = true - } - case readTS == 0 && node == nil && readStaleness == 0: - // If both readTS and node is empty while the readStaleness is empty, - // setting p.ReadReplicaScope is necessary to verify the txn scope later - // because we may be in a local txn without using the Stale Read. - p.ReadReplicaScope = scope + if p.err = p.staleReadProcessor.OnSelectTable(tn); p.err != nil { + return } - // If the select statement is related to multi tables, we should grantee that all tables use the same timestamp - if p.LastSnapshotTS != ts { - p.err = ErrAsOf.GenWithStack("can not set different time in the as of") + if p.initedLastSnapshotTS { return } - if p.LastSnapshotTS != 0 { - dom := domain.GetDomain(p.ctx) - is, err := dom.GetSnapshotInfoSchema(p.LastSnapshotTS) - // if infoschema is empty, LastSnapshotTS init failed - if err != nil { - p.err = err - return - } - if is == nil { - p.err = fmt.Errorf("can not get any information schema based on snapshotTS: %d", p.LastSnapshotTS) - return - } - p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.ctx, is) + + if p.IsStaleness = p.staleReadProcessor.IsStaleness(); p.IsStaleness { + p.LastSnapshotTS = p.staleReadProcessor.GetStalenessReadTS() + p.SnapshotTSEvaluator = p.staleReadProcessor.GetStalenessTSEvaluatorForPrepare() + p.InfoSchema = p.staleReadProcessor.GetStalenessInfoSchema() + } + + // It is a little hacking for the below codes. `ReadReplicaScope` is used both by stale read's closest read and local txn. + // They are different features and the value for `ReadReplicaScope` will be conflicted in some scenes. + // But because local txn is still an experimental feature, we should make stale read work first. + if p.IsStaleness || p.ctx.GetSessionVars().GetReplicaRead().IsClosestRead() { + // When stale read or closet read is set, we read the tidb's locality as the read replica scope + p.ReadReplicaScope = config.GetTxnScopeFromConfig() + } else { + // Otherwise, use the scope from TxnCtx for local txn validation + p.ReadReplicaScope = p.ctx.GetSessionVars().TxnCtx.TxnScope } + p.initedLastSnapshotTS = true } diff --git a/sessiontxn/staleread/errors.go b/sessiontxn/staleread/errors.go new file mode 100644 index 0000000000000..1d89fa632ccd6 --- /dev/null +++ b/sessiontxn/staleread/errors.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + mysql "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/util/dbterror" +) + +var ( + errAsOf = dbterror.ClassOptimizer.NewStd(mysql.ErrAsOf) +) diff --git a/sessiontxn/staleread/processor.go b/sessiontxn/staleread/processor.go new file mode 100644 index 0000000000000..1c41e24c7156c --- /dev/null +++ b/sessiontxn/staleread/processor.go @@ -0,0 +1,235 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/table/temptable" +) + +// enforce implement Processor interface +var _ Processor = &staleReadProcessor{} + +// StalenessTSEvaluator is a function to get staleness ts +type StalenessTSEvaluator func(sctx sessionctx.Context) (uint64, error) + +// Processor is an interface used to process stale read +type Processor interface { + // IsStaleness indicates that whether we should use the staleness + IsStaleness() bool + // GetStalenessInfoSchema returns the information schema if it is stale read, otherwise returns nil + GetStalenessInfoSchema() infoschema.InfoSchema + // GetStalenessReadTS returns the ts if it is stale read, otherwise returns 0 + GetStalenessReadTS() uint64 + // GetStalenessTSEvaluatorForPrepare returns a function that will be used by prepare to evaluate ts + GetStalenessTSEvaluatorForPrepare() StalenessTSEvaluator + + // OnSelectTable will be called when process table in select statement + OnSelectTable(tn *ast.TableName) error +} + +type baseProcessor struct { + sctx sessionctx.Context + txnManager sessiontxn.TxnManager + + evaluated bool + ts uint64 + tsEvaluator StalenessTSEvaluator + is infoschema.InfoSchema +} + +func (p *baseProcessor) init(sctx sessionctx.Context) { + p.sctx = sctx + p.txnManager = sessiontxn.GetTxnManager(sctx) +} + +func (p *baseProcessor) IsStaleness() bool { + return p.ts != 0 +} + +func (p *baseProcessor) GetStalenessInfoSchema() infoschema.InfoSchema { + return p.is +} + +func (p *baseProcessor) GetStalenessReadTS() uint64 { + return p.ts +} + +func (p *baseProcessor) GetStalenessTSEvaluatorForPrepare() StalenessTSEvaluator { + return p.tsEvaluator +} + +func (p *baseProcessor) OnSelectTable(_ *ast.TableName) error { + return errors.New("not supported") +} + +func (p *baseProcessor) setAsNonStaleRead() error { + return p.setEvaluatedValues(0, nil, nil) +} + +func (p *baseProcessor) setEvaluatedTS(ts uint64) (err error) { + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + if err != nil { + return err + } + is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) + return p.setEvaluatedValues(ts, is, func(sctx sessionctx.Context) (uint64, error) { + return ts, nil + }) +} + +func (p *baseProcessor) setEvaluatedEvaluator(evaluator StalenessTSEvaluator) error { + ts, err := evaluator(p.sctx) + if err != nil { + return err + } + + is, err := domain.GetDomain(p.sctx).GetSnapshotInfoSchema(ts) + is = temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, is) + if err != nil { + return err + } + + return p.setEvaluatedValues(ts, is, evaluator) +} + +func (p *baseProcessor) setEvaluatedValues(ts uint64, is infoschema.InfoSchema, tsEvaluator StalenessTSEvaluator) error { + if p.evaluated { + return errors.New("already evaluated") + } + + p.ts = ts + p.is = is + p.evaluated = true + p.tsEvaluator = tsEvaluator + return nil +} + +type staleReadProcessor struct { + baseProcessor + stmtAsOfTs uint64 +} + +// NewStaleReadProcessor creates a new stale read processor +func NewStaleReadProcessor(sctx sessionctx.Context) Processor { + p := &staleReadProcessor{} + p.init(sctx) + return p +} + +// OnSelectTable will be called when process table in select statement +func (p *staleReadProcessor) OnSelectTable(tn *ast.TableName) error { + if p.sctx.GetSessionVars().InTxn() { + // When in explicit txn, it is not allowed to declare stale read in statement + // and the sys variables should also be ignored no matter it is set or not + if tn.AsOf != nil { + return errAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") + } + + if p.evaluated { + return nil + } + + if txnCtx := p.sctx.GetSessionVars().TxnCtx; txnCtx.IsStaleness { + // It means we meet following case: + // 1. start transaction read only as of timestamp ts + // 2. select statement + return p.setEvaluatedValues( + txnCtx.StartTS, + temptable.AttachLocalTemporaryTableInfoSchema(p.sctx, txnCtx.InfoSchema.(infoschema.InfoSchema)), + nil, + ) + } + return p.setAsNonStaleRead() + } + + // If `stmtAsOfTS` is not 0, it means we use 'select ... from xxx as of timestamp ...' + stmtAsOfTS, err := parseAndValidateAsOf(p.sctx, tn.AsOf) + if err != nil { + return err + } + + if p.evaluated { + // If the select statement is related to multi tables, we should guarantee that all tables use the same timestamp + if p.stmtAsOfTs != stmtAsOfTS { + return errAsOf.GenWithStack("can not set different time in the as of") + } + return nil + } + + // If `txnReadTS` is not 0, it means we meet following situation: + // start transaction read only as of timestamp ... + // select from table + txnReadTS := p.sctx.GetSessionVars().TxnReadTS.UseTxnReadTS() + if txnReadTS > 0 && stmtAsOfTS > 0 { + // `as of` and `@@tx_read_ts` cannot be set in the same time + return errAsOf.FastGenWithCause("can't use select as of while already set transaction as of") + } + + if stmtAsOfTS > 0 { + p.stmtAsOfTs = stmtAsOfTS + return p.setEvaluatedTS(stmtAsOfTS) + } + + if txnReadTS > 0 { + return p.setEvaluatedTS(txnReadTS) + } + + // If both txnReadTS and stmtAsOfTS is empty while the readStaleness isn't, it means we meet following situation: + // set @@tidb_read_staleness='-5'; + // select from table + // Then the following select statement should be affected by the tidb_read_staleness in session. + if evaluator := getTsEvaluatorFromReadStaleness(p.sctx); evaluator != nil { + return p.setEvaluatedEvaluator(evaluator) + } + + // Otherwise, it means we should not use stale read. + return p.setAsNonStaleRead() +} + +func parseAndValidateAsOf(sctx sessionctx.Context, asOf *ast.AsOfClause) (uint64, error) { + if asOf == nil { + return 0, nil + } + + ts, err := CalculateAsOfTsExpr(sctx, asOf) + if err != nil { + return 0, err + } + + if err = sessionctx.ValidateStaleReadTS(context.TODO(), sctx, ts); err != nil { + return 0, err + } + + return ts, nil +} + +func getTsEvaluatorFromReadStaleness(sctx sessionctx.Context) StalenessTSEvaluator { + readStaleness := sctx.GetSessionVars().ReadStaleness + if readStaleness == 0 { + return nil + } + + return func(sctx sessionctx.Context) (uint64, error) { + return CalculateTsWithReadStaleness(sctx, readStaleness) + } +} diff --git a/sessiontxn/staleread/processor_test.go b/sessiontxn/staleread/processor_test.go new file mode 100644 index 0000000000000..55a8270b612b5 --- /dev/null +++ b/sessiontxn/staleread/processor_test.go @@ -0,0 +1,236 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread_test + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessiontxn/staleread" + "github.com/pingcap/tidb/table/temptable" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/testbridge" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/goleak" +) + +func TestMain(m *testing.M) { + opts := []goleak.Option{ + goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"), + } + testbridge.SetupForCommonTest() + goleak.VerifyTestMain(m, opts...) +} + +type staleReadPoint struct { + tk *testkit.TestKit + ts uint64 + dt string + tm time.Time + is infoschema.InfoSchema + tn *ast.TableName +} + +func (p *staleReadPoint) checkMatchProcessor(t *testing.T, processor staleread.Processor, hasEvaluator bool) { + require.True(t, processor.IsStaleness()) + require.Equal(t, p.ts, processor.GetStalenessReadTS()) + require.Equal(t, p.is.SchemaMetaVersion(), processor.GetStalenessInfoSchema().SchemaMetaVersion()) + require.IsTypef(t, processor.GetStalenessInfoSchema(), temptable.AttachLocalTemporaryTableInfoSchema(p.tk.Session(), p.is), "") + evaluator := processor.GetStalenessTSEvaluatorForPrepare() + if hasEvaluator { + require.NotNil(t, evaluator) + ts, err := evaluator(p.tk.Session()) + require.NoError(t, err) + require.Equal(t, processor.GetStalenessReadTS(), ts) + } else { + require.Nil(t, evaluator) + } +} + +func genStaleReadPoint(t *testing.T, tk *testkit.TestKit) *staleReadPoint { + tk.MustExec("create table if not exists test.t(a bigint)") + tk.MustExec(fmt.Sprintf("alter table test.t alter column a set default %d", time.Now().UnixNano())) + time.Sleep(time.Millisecond * 20) + is := domain.GetDomain(tk.Session()).InfoSchema() + dt := tk.MustQuery("select now(3)").Rows()[0][0].(string) + tm, err := time.ParseInLocation("2006-01-02 15:04:05.999999", dt, tk.Session().GetSessionVars().Location()) + require.NoError(t, err) + ts := oracle.GoTimeToTS(tm) + tn := astTableWithAsOf(t, dt) + return &staleReadPoint{ + tk: tk, + ts: ts, + dt: dt, + tm: tm, + is: is, + tn: tn, + } +} + +func astTableWithAsOf(t *testing.T, dt string) *ast.TableName { + p := parser.New() + var sql string + if dt == "" { + sql = "select * from test.t" + } else { + sql = fmt.Sprintf("select * from test.t as of timestamp '%s'", dt) + } + + stmt, err := p.ParseOneStmt(sql, "", "") + require.NoError(t, err) + sel := stmt.(*ast.SelectStmt) + return sel.From.TableRefs.Left.(*ast.TableSource).Source.(*ast.TableName) +} + +func TestStaleReadProcessorWithSelectTable(t *testing.T) { + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tn := astTableWithAsOf(t, "") + p1 := genStaleReadPoint(t, tk) + p2 := genStaleReadPoint(t, tk) + + // create local temporary table to check processor's infoschema will consider temporary table + tk.MustExec("create temporary table test.t2(a int)") + + // no sys variable just select ... as of ... + processor := createProcessor(t, tk.Session()) + err := processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + err = processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + err = processor.OnSelectTable(p2.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error()) + p1.checkMatchProcessor(t, processor, true) + + // the first select has not 'as of' + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.NoError(t, err) + require.False(t, processor.IsStaleness()) + err = processor.OnSelectTable(p1.tn) + require.Equal(t, "[planner:8135]can not set different time in the as of", err.Error()) + require.False(t, processor.IsStaleness()) + + // 'as of' is not allowed when @@txn_read_ts is set + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]invalid as of timestamp: can't use select as of while already set transaction as of", err.Error()) + tk.MustExec("set @@tx_read_ts=''") + + // no 'as of' will consume @txn_read_ts + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + p1.checkMatchProcessor(t, processor, true) + tk.Session().GetSessionVars().CleanupTxnReadTSIfUsed() + require.Equal(t, uint64(0), tk.Session().GetSessionVars().TxnReadTS.PeakTxnReadTS()) + tk.MustExec("set @@tx_read_ts=''") + + // `@@tidb_read_staleness` + tk.MustExec("set @@tidb_read_staleness=-5") + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.True(t, processor.IsStaleness()) + require.Equal(t, int64(0), processor.GetStalenessInfoSchema().SchemaMetaVersion()) + expectedTS, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + require.NoError(t, err) + require.Equal(t, expectedTS, processor.GetStalenessReadTS()) + evaluator := processor.GetStalenessTSEvaluatorForPrepare() + evaluatorTS, err := evaluator(tk.Session()) + require.NoError(t, err) + require.Equal(t, expectedTS, evaluatorTS) + tk.MustExec("set @@tidb_read_staleness=''") + + tk.MustExec("do sleep(0.01)") + evaluatorTS, err = evaluator(tk.Session()) + require.NoError(t, err) + expectedTS2, err := staleread.CalculateTsWithReadStaleness(tk.Session(), -5*time.Second) + require.NoError(t, err) + require.Equal(t, expectedTS2, evaluatorTS) + + // `@@tidb_read_staleness` will be ignored when `as of` or `@@tx_read_ts` + tk.MustExec("set @@tidb_read_staleness=-5") + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + + tk.MustExec(fmt.Sprintf("SET TRANSACTION READ ONLY AS OF TIMESTAMP '%s'", p1.dt)) + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, true) + tk.MustExec("set @@tidb_read_staleness=''") +} + +func TestStaleReadProcessorInTxn(t *testing.T) { + store, _, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tn := astTableWithAsOf(t, "") + p1 := genStaleReadPoint(t, tk) + _ = genStaleReadPoint(t, tk) + + tk.MustExec("begin") + + // no error when there is no 'as of' + processor := createProcessor(t, tk.Session()) + err := processor.OnSelectTable(tn) + require.NoError(t, err) + require.False(t, processor.IsStaleness()) + + // return an error when 'as of' is set + processor = createProcessor(t, tk.Session()) + err = processor.OnSelectTable(p1.tn) + require.Error(t, err) + require.Equal(t, "[planner:8135]invalid as of timestamp: as of timestamp can't be set in transaction.", err.Error()) + tk.MustExec("rollback") + + tk.MustExec(fmt.Sprintf("start transaction read only as of timestamp '%s'", p1.dt)) + + // processor will use the transaction's stale read context + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, false) + + // sys variables will be ignored in txn + tk.MustExec("set @@tidb_read_staleness=-5") + err = processor.OnSelectTable(tn) + require.NoError(t, err) + p1.checkMatchProcessor(t, processor, false) + tk.MustExec("set @@tidb_read_staleness=''") +} + +func createProcessor(t *testing.T, se sessionctx.Context) staleread.Processor { + processor := staleread.NewStaleReadProcessor(se) + require.False(t, processor.IsStaleness()) + require.Equal(t, uint64(0), processor.GetStalenessReadTS()) + require.Nil(t, processor.GetStalenessTSEvaluatorForPrepare()) + require.Nil(t, processor.GetStalenessInfoSchema()) + return processor +} diff --git a/sessiontxn/staleread/util.go b/sessiontxn/staleread/util.go new file mode 100644 index 0000000000000..fe7062a60d7ce --- /dev/null +++ b/sessiontxn/staleread/util.go @@ -0,0 +1,57 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package staleread + +import ( + "time" + + "github.com/pingcap/tidb/expression" + "github.com/pingcap/tidb/parser/ast" + "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/types" + "github.com/tikv/client-go/v2/oracle" +) + +// CalculateAsOfTsExpr calculates the TsExpr of AsOfClause to get a StartTS. +func CalculateAsOfTsExpr(sctx sessionctx.Context, asOfClause *ast.AsOfClause) (uint64, error) { + tsVal, err := expression.EvalAstExpr(sctx, asOfClause.TsExpr) + if err != nil { + return 0, err + } + toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp) + // We need at least the millionsecond here, so set fsp to 3. + toTypeTimestamp.Decimal = 3 + tsTimestamp, err := tsVal.ConvertTo(sctx.GetSessionVars().StmtCtx, toTypeTimestamp) + if err != nil { + return 0, err + } + tsTime, err := tsTimestamp.GetMysqlTime().GoTime(sctx.GetSessionVars().Location()) + if err != nil { + return 0, err + } + return oracle.GoTimeToTS(tsTime), nil +} + +// CalculateTsWithReadStaleness calculates the TsExpr for readStaleness duration +func CalculateTsWithReadStaleness(sctx sessionctx.Context, readStaleness time.Duration) (uint64, error) { + nowVal, err := expression.GetStmtTimestamp(sctx) + if err != nil { + return 0, err + } + tsVal := nowVal.Add(readStaleness) + minTsVal := expression.GetMinSafeTime(sctx) + return oracle.GoTimeToTS(expression.CalAppropriateTime(tsVal, nowVal, minTsVal)), nil +}