Skip to content

Commit

Permalink
*: fix weak consistency not work (pingcap#31935) (pingcap#31949)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jan 25, 2022
1 parent 3a44f6d commit 55f3b24
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 57 deletions.
12 changes: 5 additions & 7 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,6 @@ func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *Req
return builder
}

// SetIsolationLevel sets "IsolationLevel" for "kv.Request".
func (builder *RequestBuilder) SetIsolationLevel(level kv.IsoLevel) *RequestBuilder {
builder.Request.IsolationLevel = level
return builder
}

const estimatedRegionRowCount = 100000

// SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data.
Expand Down Expand Up @@ -250,7 +244,11 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req
// Concurrency may be set to 1 by SetDAGRequest
builder.Request.Concurrency = sv.DistSQLScanConcurrency()
}
builder.Request.IsolationLevel = builder.getIsolationLevel()
if sv.StmtCtx.WeakConsistency {
builder.Request.IsolationLevel = kv.RC
} else {
builder.Request.IsolationLevel = builder.getIsolationLevel()
}
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.TaskID = sv.StmtCtx.TaskID
builder.Request.Priority = builder.getKVPriority(sv)
Expand Down
3 changes: 0 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3936,9 +3936,6 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
if err != nil {
return nil, err
}
if builder.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilderWithRange.SetIsolationLevel(kv.RC)
}
kvReq, err := reqBuilderWithRange.
SetDAGRequest(e.dagPB).
SetStartTS(startTS).
Expand Down
6 changes: 0 additions & 6 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,9 +289,6 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down Expand Up @@ -559,9 +556,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
PushedLimit: e.PushedLimit,
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
SetDesc(e.desc).
Expand Down
3 changes: 0 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
builder.SetDAGRequest(e.dagPBs[workID]).
SetStartTS(e.startTS).
SetDesc(e.descs[workID]).
Expand Down
6 changes: 0 additions & 6 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,6 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
return nil, err
}
var builder distsql.RequestBuilder
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
builder.SetIsolationLevel(kv.RC)
}
reqBuilder := builder.SetKeyRanges(kvRange)
kvReq, err := reqBuilder.
SetDAGRequest(e.dagPB).
Expand Down Expand Up @@ -357,9 +354,6 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
} else {
reqBuilder = builder.SetHandleRanges(e.ctx.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges, e.feedback)
}
if e.ctx.GetSessionVars().StmtCtx.WeakConsistency {
reqBuilder.SetIsolationLevel(kv.RC)
}
reqBuilder.
SetDAGRequest(e.dagPB).
SetStartTS(e.startTS).
Expand Down
61 changes: 29 additions & 32 deletions sessionctx/stmtctx/stmtctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package stmtctx_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -96,49 +98,44 @@ func TestWeakConsistencyRead(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()

lastWeakConsistency := func(tk *testkit.TestKit) bool {
return tk.Session().GetSessionVars().StmtCtx.WeakConsistency
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(id int primary key, c int, c1 int, unique index i(c))")

execAndCheck := func(sql string, rows [][]interface{}, isolationLevel kv.IsoLevel) {
ctx := context.WithValue(context.Background(), "CheckSelectRequestHook", func(req *kv.Request) {
require.Equal(t, req.IsolationLevel, isolationLevel)
})
tk.Session().Execute(ctx, sql)
if rows != nil {
tk.MustQuery(sql).Check(rows)
}
lastWeakConsistency := tk.Session().GetSessionVars().StmtCtx.WeakConsistency
require.Equal(t, lastWeakConsistency, isolationLevel == kv.RC)
}

// strict
tk.MustExec("insert into t values(1, 1, 1)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
execAndCheck("insert into t values(1, 1, 1)", nil, kv.SI)
execAndCheck("select * from t", testkit.Rows("1 1 1"), kv.SI)
tk.MustExec("prepare s from 'select * from t'")
tk.MustExec("prepare u from 'update t set c1 = id + 1'")
tk.MustQuery("execute s").Check(testkit.Rows("1 1 1"))
require.False(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
execAndCheck("execute s", testkit.Rows("1 1 1"), kv.SI)
execAndCheck("execute u", nil, kv.SI)
execAndCheck("admin check table t", nil, kv.SI)
// weak
tk.MustExec("set tidb_read_consistency = weak")
tk.MustExec("insert into t values(2, 2, 2)")
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2", "2 2 2"))
require.True(t, lastWeakConsistency(tk))
tk.MustExec("execute u")
require.False(t, lastWeakConsistency(tk))
execAndCheck("insert into t values(2, 2, 2)", nil, kv.SI)
execAndCheck("select * from t", testkit.Rows("1 1 2", "2 2 2"), kv.RC)
execAndCheck("execute s", testkit.Rows("1 1 2", "2 2 2"), kv.RC)
execAndCheck("execute u", nil, kv.SI)
// non-read-only queries should be strict
tk.MustExec("admin check table t")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("update t set c = c + 1 where id = 2")
require.False(t, lastWeakConsistency(tk))
tk.MustExec("delete from t where id = 2")
require.False(t, lastWeakConsistency(tk))
execAndCheck("admin check table t", nil, kv.SI)
execAndCheck("update t set c = c + 1 where id = 2", nil, kv.SI)
execAndCheck("delete from t where id = 2", nil, kv.SI)
// in-transaction queries should be strict
tk.MustExec("begin")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
tk.MustQuery("execute s").Check(testkit.Rows("1 1 2"))
require.False(t, lastWeakConsistency(tk))
execAndCheck("select * from t", testkit.Rows("1 1 2"), kv.SI)
execAndCheck("execute s", testkit.Rows("1 1 2"), kv.SI)
tk.MustExec("rollback")
}

0 comments on commit 55f3b24

Please sign in to comment.