Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement chunk rpc encoding for unistore #35114

Merged
merged 15 commits into from
Jun 6, 2022
Merged
32 changes: 10 additions & 22 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
if resp == nil {
return nil, errors.New("client returns nil response")
}
encodeType := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodeType = tipb.EncodeType_TypeChunk
}
// TODO: Add metric label and set open tracing.
return &selectResult{
label: "mpp",
Expand All @@ -58,7 +54,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
fieldTypes: fieldTypes,
ctx: sctx,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
encodeType: encodeType,
copPlanIDs: planIDs,
rootPlanID: rootID,
storeType: kv.TiFlash,
Expand Down Expand Up @@ -130,10 +125,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodetype = tipb.EncodeType_TypeChunk
}
return &selectResult{
label: "dag",
resp: resp,
Expand All @@ -143,7 +134,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
}, nil
Expand Down Expand Up @@ -186,12 +176,11 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte
label = metrics.LblInternal
}
result := &selectResult{
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand All @@ -205,12 +194,11 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars int
return nil, errors.New("client returns nil response")
}
result := &selectResult{
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string
encodeType tipb.EncodeType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a encodeType in this.selectResp.DAGRequest
This field is duplicated...


// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
Expand Down Expand Up @@ -269,13 +268,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
}
}
// TODO(Shenghui Wu): add metrics
switch r.selectResp.GetEncodeType() {
encodeType := r.selectResp.GetEncodeType()
switch encodeType {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeChunk:
return r.readFromChunk(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
return errors.Errorf("unsupported encode type:%v", encodeType)
}

// NextRaw returns the next raw partial result.
Expand Down
6 changes: 6 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,12 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")

// TODO(tiancaiamao): admin check doesn't support the chunk protocol.
// Remove this after https://github.com/pingcap/tidb/issues/35156
_, err = tk.Exec(ctx, "set @@tidb_enable_chunk_rpc = off")
require.NoError(t, err)

_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`, err.Error())
Expand Down
4 changes: 4 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ func TestDefaultValForAnalyze(t *testing.T) {
for i := 1; i < 4; i++ {
tk.MustExec("insert into t values (?)", i)
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustQuery("select @@tidb_enable_fast_analyze").Check(testkit.Rows("0"))
tk.MustQuery("select @@session.tidb_enable_fast_analyze").Check(testkit.Rows("0"))
tk.MustExec("analyze table t with 0 topn;")
Expand Down
3 changes: 3 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
},
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

for _, test := range tests {
checkActRows(t, tk, test.sql, test.expected)
}
Expand Down
1 change: 1 addition & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,7 @@ func TestCompareBuiltin(t *testing.T) {
tk.MustQuery(`select 1 < 17666000000000000000, 1 > 17666000000000000000, 1 = 17666000000000000000`).Check(testkit.Rows("1 0 0"))

tk.MustExec("drop table if exists t")

// insert value at utc timezone
tk.MustExec("set time_zone = '+00:00'")
tk.MustExec("create table t(a timestamp)")
Expand Down
3 changes: 3 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func TestIndexRead(t *testing.T) {
testKit.MustExec("create index ts on t (ts)")
testKit.MustExec("create table t1 (a int, b int, index idx(a), index idxx(b))")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
testKit.MustExec("set @@tidb_enable_chunk_rpc = on")

// This stats is generated by following format:
// fill (a, b, c, e) as (i*100+j, i, i+j, i*100+j), i and j is dependent and range of this two are [0, 99].
require.NoError(t, loadTableStats("analyzesSuiteTestIndexReadT.json", dom))
Expand Down
3 changes: 3 additions & 0 deletions planner/core/enforce_mpp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestEnforceMPP(t *testing.T) {
tk.MustExec("create table t(a int, b int)")
tk.MustExec("create index idx on t(a)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
Expand Down
4 changes: 4 additions & 0 deletions planner/core/expression_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func TestMultiColInExpression(t *testing.T) {
Plan []string
Res []string
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

expressionRewriterSuiteData := plannercore.GetExpressionRewriterSuiteData()
expressionRewriterSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
Expand Down
15 changes: 15 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ func TestVerboseExplain(t *testing.T) {
tk.MustExec("analyze table t2")
tk.MustExec("analyze table t3")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
Expand Down Expand Up @@ -5394,6 +5397,9 @@ func TestIndexJoinCost(t *testing.T) {
tk.MustExec(`create table t_inner_pk (a int primary key)`)
tk.MustExec(`create table t_inner_idx (a int, b int, key(a))`)

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustQuery(`explain format=verbose select /*+ TIDB_INLJ(t_outer, t_inner_pk) */ * from t_outer, t_inner_pk where t_outer.a=t_inner_pk.a`).Check(testkit.Rows( // IndexJoin with inner TableScan
`IndexJoin_11 12487.50 206368.09 root inner join, inner:TableReader_8, outer key:test.t_outer.a, inner key:test.t_inner_pk.a, equal cond:eq(test.t_outer.a, test.t_inner_pk.a)`,
`├─TableReader_18(Build) 9990.00 36412.58 root data:Selection_17`,
Expand Down Expand Up @@ -5448,6 +5454,9 @@ func TestHeuristicIndexSelection(t *testing.T) {
tk.MustExec("create table t3(a bigint, b varchar(255), c bigint, primary key(a, b) clustered)")
tk.MustExec("create table t4(a bigint, b varchar(255), c bigint, primary key(a, b) nonclustered)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5475,6 +5484,9 @@ func TestOutputSkylinePruningInfo(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, d int, e int, f int, g int, primary key (a), unique key c_d_e (c, d, e), unique key f (f), unique key f_g (f, g), key g (g))")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5506,6 +5518,9 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
3 changes: 3 additions & 0 deletions planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ func TestListColumnsPartitionPruner(t *testing.T) {
tk2.MustExec("insert into t1 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,10,null)")
tk2.MustExec("insert into t2 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,null,null)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk1.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []struct {
SQL string
Pruner string
Expand Down
4 changes: 4 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestPreferRangeScan(t *testing.T) {
tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;")
tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;")
tk.MustExec("analyze table test;")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
3 changes: 3 additions & 0 deletions planner/core/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func TestNDVGroupCols(t *testing.T) {
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"encoding/json"
stderrs "errors"
"fmt"
"math/rand"
"runtime/pprof"
"runtime/trace"
"strconv"
Expand Down Expand Up @@ -2743,7 +2744,17 @@ func (s *session) RefreshVars(ctx context.Context) error {

// CreateSession4Test creates a new session environment for test.
func CreateSession4Test(store kv.Storage) (Session, error) {
return CreateSession4TestWithOpt(store, nil)
se, err := CreateSession4TestWithOpt(store, nil)
if err == nil {
// Cover both chunk rpc encoding and default encoding.
// nolint:gosec
if rand.Intn(2) == 0 {
se.GetSessionVars().EnableChunkRPC = false
} else {
se.GetSessionVars().EnableChunkRPC = true
}
}
return se, err
}

// Opt describes the option for creating session
Expand Down
Loading