diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 03f6034bc513e..da51a25ab0480 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -138,12 +138,6 @@ func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *Req return builder } -// SetIsolationLevel sets "IsolationLevel" for "kv.Request". -func (builder *RequestBuilder) SetIsolationLevel(level kv.IsoLevel) *RequestBuilder { - builder.Request.IsolationLevel = level - return builder -} - const estimatedRegionRowCount = 100000 // SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data. @@ -250,7 +244,11 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req // Concurrency may be set to 1 by SetDAGRequest builder.Request.Concurrency = sv.DistSQLScanConcurrency() } - builder.Request.IsolationLevel = builder.getIsolationLevel() + if sv.StmtCtx.WeakConsistency { + builder.Request.IsolationLevel = kv.RC + } else { + builder.Request.IsolationLevel = builder.getIsolationLevel() + } builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.TaskID = sv.StmtCtx.TaskID builder.Request.Priority = builder.getKVPriority(sv) diff --git a/executor/builder.go b/executor/builder.go index ad216d44e3965..a718577cea424 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3936,9 +3936,6 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T if err != nil { return nil, err } - if builder.ctx.GetSessionVars().StmtCtx.WeakConsistency { - reqBuilderWithRange.SetIsolationLevel(kv.RC) - } kvReq, err := reqBuilderWithRange. SetDAGRequest(e.dagPB). SetStartTS(startTS). diff --git a/executor/distsql.go b/executor/distsql.go index 92cf0b8d60344..734074eef8baf 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -289,9 +289,6 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) e.memTracker = memory.NewTracker(e.id, -1) e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) var builder distsql.RequestBuilder - if e.ctx.GetSessionVars().StmtCtx.WeakConsistency { - builder.SetIsolationLevel(kv.RC) - } builder.SetKeyRanges(kvRanges). SetDAGRequest(e.dagPB). SetStartTS(e.startTS). @@ -559,9 +556,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan< PushedLimit: e.PushedLimit, } var builder distsql.RequestBuilder - if e.ctx.GetSessionVars().StmtCtx.WeakConsistency { - builder.SetIsolationLevel(kv.RC) - } builder.SetDAGRequest(e.dagPB). SetStartTS(e.startTS). SetDesc(e.desc). diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 38aa4a84f5bc5..a712e02f580bf 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -296,9 +296,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } var builder distsql.RequestBuilder - if e.ctx.GetSessionVars().StmtCtx.WeakConsistency { - builder.SetIsolationLevel(kv.RC) - } builder.SetDAGRequest(e.dagPBs[workID]). SetStartTS(e.startTS). SetDesc(e.descs[workID]). diff --git a/executor/table_reader.go b/executor/table_reader.go index b31f07bb79d66..958b8cc442061 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -321,9 +321,6 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [ return nil, err } var builder distsql.RequestBuilder - if e.ctx.GetSessionVars().StmtCtx.WeakConsistency { - builder.SetIsolationLevel(kv.RC) - } reqBuilder := builder.SetKeyRanges(kvRange) kvReq, err := reqBuilder. SetDAGRequest(e.dagPB). @@ -357,9 +354,6 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R } else { reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback) } - if e.ctx.GetSessionVars().StmtCtx.WeakConsistency { - reqBuilder.SetIsolationLevel(kv.RC) - } reqBuilder. SetDAGRequest(e.dagPB). SetStartTS(e.startTS). diff --git a/sessionctx/stmtctx/stmtctx_test.go b/sessionctx/stmtctx/stmtctx_test.go index 7b27e2c62a653..5685e85e9033a 100644 --- a/sessionctx/stmtctx/stmtctx_test.go +++ b/sessionctx/stmtctx/stmtctx_test.go @@ -15,10 +15,12 @@ package stmtctx_test import ( + "context" "fmt" "testing" "time" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/util/execdetails" @@ -96,49 +98,44 @@ func TestWeakConsistencyRead(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() - lastWeakConsistency := func(tk *testkit.TestKit) bool { - return tk.Session().GetSessionVars().StmtCtx.WeakConsistency - } - tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(id int primary key, c int, c1 int, unique index i(c))") + + execAndCheck := func(sql string, rows [][]interface{}, isolationLevel kv.IsoLevel) { + ctx := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) { + require.Equal(t, req.IsolationLevel, isolationLevel) + }) + tk.Session().Execute(ctx, sql) + if rows != nil { + tk.MustQuery(sql).Check(rows) + } + lastWeakConsistency := tk.Session().GetSessionVars().StmtCtx.WeakConsistency + require.Equal(t, lastWeakConsistency, isolationLevel == kv.RC) + } + // strict - tk.MustExec("insert into t values(1, 1, 1)") - require.False(t, lastWeakConsistency(tk)) - tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1")) - require.False(t, lastWeakConsistency(tk)) + execAndCheck("insert into t values(1, 1, 1)", nil, kv.SI) + execAndCheck("select * from t", testkit.Rows("1 1 1"), kv.SI) tk.MustExec("prepare s from 'select * from t'") tk.MustExec("prepare u from 'update t set c1 = id + 1'") - tk.MustQuery("execute s").Check(testkit.Rows("1 1 1")) - require.False(t, lastWeakConsistency(tk)) - tk.MustExec("execute u") - require.False(t, lastWeakConsistency(tk)) - tk.MustExec("admin check table t") - require.False(t, lastWeakConsistency(tk)) + execAndCheck("execute s", testkit.Rows("1 1 1"), kv.SI) + execAndCheck("execute u", nil, kv.SI) + execAndCheck("admin check table t", nil, kv.SI) // weak tk.MustExec("set tidb_read_consistency = weak") - tk.MustExec("insert into t values(2, 2, 2)") - require.False(t, lastWeakConsistency(tk)) - tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2", "2 2 2")) - require.True(t, lastWeakConsistency(tk)) - tk.MustQuery("execute s").Check(testkit.Rows("1 1 2", "2 2 2")) - require.True(t, lastWeakConsistency(tk)) - tk.MustExec("execute u") - require.False(t, lastWeakConsistency(tk)) + execAndCheck("insert into t values(2, 2, 2)", nil, kv.SI) + execAndCheck("select * from t", testkit.Rows("1 1 2", "2 2 2"), kv.RC) + execAndCheck("execute s", testkit.Rows("1 1 2", "2 2 2"), kv.RC) + execAndCheck("execute u", nil, kv.SI) // non-read-only queries should be strict - tk.MustExec("admin check table t") - require.False(t, lastWeakConsistency(tk)) - tk.MustExec("update t set c = c + 1 where id = 2") - require.False(t, lastWeakConsistency(tk)) - tk.MustExec("delete from t where id = 2") - require.False(t, lastWeakConsistency(tk)) + execAndCheck("admin check table t", nil, kv.SI) + execAndCheck("update t set c = c + 1 where id = 2", nil, kv.SI) + execAndCheck("delete from t where id = 2", nil, kv.SI) // in-transaction queries should be strict tk.MustExec("begin") - tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2")) - require.False(t, lastWeakConsistency(tk)) - tk.MustQuery("execute s").Check(testkit.Rows("1 1 2")) - require.False(t, lastWeakConsistency(tk)) + execAndCheck("select * from t", testkit.Rows("1 1 2"), kv.SI) + execAndCheck("execute s", testkit.Rows("1 1 2"), kv.SI) tk.MustExec("rollback") }