Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/release-5.1' into release-5.1-…
Browse files Browse the repository at this point in the history
…92ddceb6a4ab

Signed-off-by: Yilin Chen <[email protected]>
  • Loading branch information
sticnarf committed Jul 20, 2021
2 parents 582b3a1 + 8f54e03 commit e49f989
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 1 deletion.
2 changes: 2 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,8 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
}
// Reset DurationParse due to the next statement may not need to be parsed (not a text protocol query).
sessVars.DurationParse = 0
// Clean the stale read flag when statement execution finish
sessVars.StmtCtx.IsStaleness = false
}

// CloseRecordSet will finish the execution of current statement and do some record work
Expand Down
10 changes: 10 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ package executor

import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -63,6 +65,14 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
return nil, err
}

failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) {
expected := val.(bool)
got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness
if got != expected {
panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got))
}
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
var lowerPriority bool
if c.Ctx.GetSessionVars().StmtCtx.Priority == mysql.NoPriority {
Expand Down
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
DiskTracker: disk.NewTracker(memory.LabelForSQLText, -1),
TaskID: stmtctx.AllocateTaskID(),
CTEStorageMap: map[int]*CTEStorages{},
IsStaleness: false,
}
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
globalConfig := config.GetGlobalConfig()
Expand Down
95 changes: 95 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ func (s *testStaleTxnSerialSuite) TestSelectAsOf(c *C) {
c.Assert(tk.Se.GetSessionVars().TxnReadTS.PeakTxnReadTS(), Equals, uint64(0))
}
}
failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSO")
failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleTSOWithTolerance")
}

func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
Expand Down Expand Up @@ -1012,3 +1014,96 @@ func (s *testStaleTxnSerialSuite) TestStaleReadPrepare(c *C) {
tk.MustExec(fmt.Sprintf(`set transaction read only as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")))
c.Assert("execute p1", NotNil)
}

func (s *testStaleTxnSuite) TestStmtCtxStaleFlag(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")
time.Sleep(2 * time.Second)
time1 := time.Now().Format("2006-1-2 15:04:05")
testcases := []struct {
sql string
hasStaleFlag bool
}{
// assert select as of statement
{
sql: fmt.Sprintf("select * from t as of timestamp '%v'", time1),
hasStaleFlag: true,
},
// assert select statement
{
sql: "select * from t",
hasStaleFlag: false,
},
// assert select statement in stale transaction
{
sql: fmt.Sprintf("start transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: "select * from t",
hasStaleFlag: true,
},
{
sql: "commit",
hasStaleFlag: false,
},
// assert select statement after set transaction
{
sql: fmt.Sprintf("set transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: "select * from t",
hasStaleFlag: true,
},
// assert select statement after consumed set transaction
{
sql: "select * from t",
hasStaleFlag: false,
},
// assert prepare statement with select as of statement
{
sql: fmt.Sprintf(`prepare p from 'select * from t as of timestamp "%v"'`, time1),
hasStaleFlag: false,
},
// assert execute statement with select as of statement
{
sql: "execute p",
hasStaleFlag: true,
},
// assert prepare common select statement
{
sql: "prepare p1 from 'select * from t'",
hasStaleFlag: false,
},
{
sql: "execute p1",
hasStaleFlag: false,
},
// assert execute select statement in stale transaction
{
sql: fmt.Sprintf("start transaction read only as of timestamp '%v'", time1),
hasStaleFlag: false,
},
{
sql: "execute p1",
hasStaleFlag: true,
},
{
sql: "commit",
hasStaleFlag: false,
},
}

for _, testcase := range testcases {
failpoint.Enable("github.com/pingcap/tidb/exector/assertStmtCtxIsStaleness",
fmt.Sprintf("return(%v)", testcase.hasStaleFlag))
tk.MustExec(testcase.sql)
failpoint.Disable("github.com/pingcap/tidb/exector/assertStmtCtxIsStaleness")
// assert stale read flag should be false after each statement execution
c.Assert(tk.Se.GetSessionVars().StmtCtx.IsStaleness, IsFalse)
}
}
36 changes: 36 additions & 0 deletions executor/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor_test

import (
"bytes"
"fmt"
"math/rand"
"strings"
Expand Down Expand Up @@ -855,3 +856,38 @@ func (s *tiflashTestSuite) TestTiFlashPartitionTableBroadcastJoin(c *C) {
}
}
}

func (s *tiflashTestSuite) TestForbidTiflashDuringStaleRead(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a bigint(20))")
tk.MustExec("alter table t set tiflash replica 1")
tb := testGetTableByName(c, tk.Se, "test", "t")
err := domain.GetDomain(tk.Se).DDL().UpdateTableReplicaInfo(tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
time.Sleep(2 * time.Second)
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
tk.MustExec("insert into t values (9223372036854775807)")
rows := tk.MustQuery("explain select avg(a) from t").Rows()
resBuff := bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res := resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsTrue)
c.Assert(strings.Contains(res, "tikv"), IsFalse)
tk.MustExec("set transaction read only as of timestamp now(1)")
rows = tk.MustQuery("explain select avg(a) from t").Rows()
resBuff = bytes.NewBufferString("")
for _, row := range rows {
fmt.Fprintf(resBuff, "%s\n", row)
}
res = resBuff.String()
c.Assert(strings.Contains(res, "tiflash"), IsFalse)
c.Assert(strings.Contains(res, "tikv"), IsTrue)
}
5 changes: 4 additions & 1 deletion expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,10 @@ func scalarExprSupportedByFlash(function *ScalarFunction) bool {
}
case ast.Cast:
switch function.Function.PbCode() {
case tipb.ScalarFuncSig_CastIntAsInt, tipb.ScalarFuncSig_CastIntAsReal, tipb.ScalarFuncSig_CastIntAsDecimal, tipb.ScalarFuncSig_CastIntAsString, tipb.ScalarFuncSig_CastIntAsTime,
case tipb.ScalarFuncSig_CastIntAsTime:
// ban the function of casting year type as time type pushing down to tiflash because of https://github.com/pingcap/tidb/issues/26215
return function.GetArgs()[0].GetType().Tp != mysql.TypeYear
case tipb.ScalarFuncSig_CastIntAsInt, tipb.ScalarFuncSig_CastIntAsReal, tipb.ScalarFuncSig_CastIntAsDecimal, tipb.ScalarFuncSig_CastIntAsString,
tipb.ScalarFuncSig_CastRealAsInt, tipb.ScalarFuncSig_CastRealAsReal, tipb.ScalarFuncSig_CastRealAsDecimal, tipb.ScalarFuncSig_CastRealAsString, tipb.ScalarFuncSig_CastRealAsTime,
tipb.ScalarFuncSig_CastStringAsInt, tipb.ScalarFuncSig_CastStringAsReal, tipb.ScalarFuncSig_CastStringAsDecimal, tipb.ScalarFuncSig_CastStringAsString, tipb.ScalarFuncSig_CastStringAsTime,
tipb.ScalarFuncSig_CastDecimalAsInt /*, tipb.ScalarFuncSig_CastDecimalAsReal*/, tipb.ScalarFuncSig_CastDecimalAsDecimal, tipb.ScalarFuncSig_CastDecimalAsString, tipb.ScalarFuncSig_CastDecimalAsTime,
Expand Down
1 change: 1 addition & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
if err != nil {
return errors.Trace(err)
}
sctx.GetSessionVars().StmtCtx.IsStaleness = true
}
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
Expand Down
15 changes: 15 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,10 @@ func getPossibleAccessPaths(ctx sessionctx.Context, tableHints *tableHintInfo, i
}

available = removeIgnoredPaths(available, ignored, tblInfo)
if ctx.GetSessionVars().StmtCtx.IsStaleness {
// skip tiflash if the statement is for stale read until tiflash support stale read
available = removeTiflashDuringStaleRead(available)
}

// If we have got "FORCE" or "USE" index hint but got no available index,
// we have to use table scan.
Expand Down Expand Up @@ -1141,6 +1145,17 @@ func removeIgnoredPaths(paths, ignoredPaths []*util.AccessPath, tblInfo *model.T
return remainedPaths
}

func removeTiflashDuringStaleRead(paths []*util.AccessPath) []*util.AccessPath {
n := 0
for _, path := range paths {
if path.StoreType != kv.TiFlash {
paths[n] = path
n++
}
}
return paths[:n]
}

func (b *PlanBuilder) buildSelectLock(src LogicalPlan, lock *ast.SelectLockInfo) (*LogicalLock, error) {
selectLock := LogicalLock{
Lock: lock,
Expand Down
11 changes: 11 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,9 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
}
txnCtx := p.ctx.GetSessionVars().TxnCtx
p.TxnScope = txnCtx.TxnScope
// It means we meet following case:
// 1. start transaction read only as of timestamp ts
// 2. select statement
if txnCtx.IsStaleness {
p.LastSnapshotTS = txnCtx.StartTS
p.ExplicitStaleness = txnCtx.IsStaleness
Expand All @@ -1510,6 +1513,9 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of")
return
}
// it means we meet following case:
// 1. set transaction read only as of timestamp ts
// 2. select statement
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) {
return ts, nil
Expand All @@ -1527,6 +1533,8 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
p.err = errors.Trace(err)
return
}
// It means we meet following case:
// select statement with as of timestamp
if !p.initedLastSnapshotTS {
p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) {
return calculateTsExpr(ctx, node)
Expand All @@ -1547,6 +1555,9 @@ func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) {
}
p.ExplicitStaleness = true
}
if p.flag&inPrepare == 0 {
p.ctx.GetSessionVars().StmtCtx.IsStaleness = p.ExplicitStaleness
}
p.initedLastSnapshotTS = true
}

Expand Down
1 change: 1 addition & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type StatementContext struct {
IgnoreNoPartition bool
OptimDependOnMutableConst bool
IgnoreExplainIDSuffix bool
IsStaleness bool

// mu struct holds variables that change during execution.
mu struct {
Expand Down

0 comments on commit e49f989

Please sign in to comment.