Skip to content

Commit

Permalink
*: implement chunk rpc encoding for unistore (#35114)
Browse files Browse the repository at this point in the history
close #35113
  • Loading branch information
tiancaiamao authored Jun 6, 2022
1 parent 1c53446 commit 85967fd
Show file tree
Hide file tree
Showing 15 changed files with 146 additions and 47 deletions.
32 changes: 10 additions & 22 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
if resp == nil {
return nil, errors.New("client returns nil response")
}
encodeType := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodeType = tipb.EncodeType_TypeChunk
}
// TODO: Add metric label and set open tracing.
return &selectResult{
label: "mpp",
Expand All @@ -58,7 +54,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
fieldTypes: fieldTypes,
ctx: sctx,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
encodeType: encodeType,
copPlanIDs: planIDs,
rootPlanID: rootID,
storeType: kv.TiFlash,
Expand Down Expand Up @@ -130,10 +125,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodetype = tipb.EncodeType_TypeChunk
}
return &selectResult{
label: "dag",
resp: resp,
Expand All @@ -143,7 +134,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
}, nil
Expand Down Expand Up @@ -186,12 +176,11 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte
label = metrics.LblInternal
}
result := &selectResult{
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand All @@ -205,12 +194,11 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars int
return nil, errors.New("client returns nil response")
}
result := &selectResult{
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string
encodeType tipb.EncodeType

// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
Expand Down Expand Up @@ -269,13 +268,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
}
}
// TODO(Shenghui Wu): add metrics
switch r.selectResp.GetEncodeType() {
encodeType := r.selectResp.GetEncodeType()
switch encodeType {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeChunk:
return r.readFromChunk(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
return errors.Errorf("unsupported encode type:%v", encodeType)
}

// NextRaw returns the next raw partial result.
Expand Down
6 changes: 6 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,12 @@ func TestCheckFailReport(t *testing.T) {
require.NoError(t, txn.Commit(tk.ctx))

ctx, hook := withLogHook(tk.ctx, t, "inconsistency")

// TODO(tiancaiamao): admin check doesn't support the chunk protocol.
// Remove this after https://github.com/pingcap/tidb/issues/35156
_, err = tk.Exec(ctx, "set @@tidb_enable_chunk_rpc = off")
require.NoError(t, err)

_, err = tk.Exec(ctx, "admin check table admin_test")
require.Error(t, err)
require.Equal(t, `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`, err.Error())
Expand Down
4 changes: 4 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ func TestDefaultValForAnalyze(t *testing.T) {
for i := 1; i < 4; i++ {
tk.MustExec("insert into t values (?)", i)
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustQuery("select @@tidb_enable_fast_analyze").Check(testkit.Rows("0"))
tk.MustQuery("select @@session.tidb_enable_fast_analyze").Check(testkit.Rows("0"))
tk.MustExec("analyze table t with 0 topn;")
Expand Down
3 changes: 3 additions & 0 deletions executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,9 @@ func TestCheckActRowsWithUnistore(t *testing.T) {
},
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

for _, test := range tests {
checkActRows(t, tk, test.sql, test.expected)
}
Expand Down
1 change: 1 addition & 0 deletions expression/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1919,6 +1919,7 @@ func TestCompareBuiltin(t *testing.T) {
tk.MustQuery(`select 1 < 17666000000000000000, 1 > 17666000000000000000, 1 = 17666000000000000000`).Check(testkit.Rows("1 0 0"))

tk.MustExec("drop table if exists t")

// insert value at utc timezone
tk.MustExec("set time_zone = '+00:00'")
tk.MustExec("create table t(a timestamp)")
Expand Down
3 changes: 3 additions & 0 deletions planner/core/cbo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ func TestIndexRead(t *testing.T) {
testKit.MustExec("create index ts on t (ts)")
testKit.MustExec("create table t1 (a int, b int, index idx(a), index idxx(b))")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
testKit.MustExec("set @@tidb_enable_chunk_rpc = on")

// This stats is generated by following format:
// fill (a, b, c, e) as (i*100+j, i, i+j, i*100+j), i and j is dependent and range of this two are [0, 99].
require.NoError(t, loadTableStats("analyzesSuiteTestIndexReadT.json", dom))
Expand Down
3 changes: 3 additions & 0 deletions planner/core/enforce_mpp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestEnforceMPP(t *testing.T) {
tk.MustExec("create table t(a int, b int)")
tk.MustExec("create index idx on t(a)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
Expand Down
4 changes: 4 additions & 0 deletions planner/core/expression_rewriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,10 @@ func TestMultiColInExpression(t *testing.T) {
Plan []string
Res []string
}

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

expressionRewriterSuiteData := plannercore.GetExpressionRewriterSuiteData()
expressionRewriterSuiteData.GetTestCases(t, &input, &output)
for i, tt := range input {
Expand Down
15 changes: 15 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ func TestVerboseExplain(t *testing.T) {
tk.MustExec("analyze table t2")
tk.MustExec("analyze table t3")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

// Create virtual tiflash replica info.
dom := domain.GetDomain(tk.Session())
is := dom.InfoSchema()
Expand Down Expand Up @@ -5394,6 +5397,9 @@ func TestIndexJoinCost(t *testing.T) {
tk.MustExec(`create table t_inner_pk (a int primary key)`)
tk.MustExec(`create table t_inner_idx (a int, b int, key(a))`)

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

tk.MustQuery(`explain format=verbose select /*+ TIDB_INLJ(t_outer, t_inner_pk) */ * from t_outer, t_inner_pk where t_outer.a=t_inner_pk.a`).Check(testkit.Rows( // IndexJoin with inner TableScan
`IndexJoin_11 12487.50 206368.09 root inner join, inner:TableReader_8, outer key:test.t_outer.a, inner key:test.t_inner_pk.a, equal cond:eq(test.t_outer.a, test.t_inner_pk.a)`,
`├─TableReader_18(Build) 9990.00 36412.58 root data:Selection_17`,
Expand Down Expand Up @@ -5448,6 +5454,9 @@ func TestHeuristicIndexSelection(t *testing.T) {
tk.MustExec("create table t3(a bigint, b varchar(255), c bigint, primary key(a, b) clustered)")
tk.MustExec("create table t4(a bigint, b varchar(255), c bigint, primary key(a, b) nonclustered)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5475,6 +5484,9 @@ func TestOutputSkylinePruningInfo(t *testing.T) {
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, c int, d int, e int, f int, g int, primary key (a), unique key c_d_e (c, d, e), unique key f (f), unique key f_g (f, g), key g (g))")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down Expand Up @@ -5506,6 +5518,9 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) {
require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll))
tk.MustExec("analyze table t")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
3 changes: 3 additions & 0 deletions planner/core/partition_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,9 @@ func TestListColumnsPartitionPruner(t *testing.T) {
tk2.MustExec("insert into t1 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,10,null)")
tk2.MustExec("insert into t2 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,null,null)")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk1.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []struct {
SQL string
Pruner string
Expand Down
4 changes: 4 additions & 0 deletions planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func TestPreferRangeScan(t *testing.T) {
tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;")
tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;")
tk.MustExec("analyze table test;")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
3 changes: 3 additions & 0 deletions planner/core/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func TestNDVGroupCols(t *testing.T) {
tk.MustExec("analyze table t1")
tk.MustExec("analyze table t2")

// Default RPC encoding may cause statistics explain result differ and then the test unstable.
tk.MustExec("set @@tidb_enable_chunk_rpc = on")

var input []string
var output []struct {
SQL string
Expand Down
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"encoding/json"
stderrs "errors"
"fmt"
"math/rand"
"runtime/pprof"
"runtime/trace"
"strconv"
Expand Down Expand Up @@ -2743,7 +2744,17 @@ func (s *session) RefreshVars(ctx context.Context) error {

// CreateSession4Test creates a new session environment for test.
func CreateSession4Test(store kv.Storage) (Session, error) {
return CreateSession4TestWithOpt(store, nil)
se, err := CreateSession4TestWithOpt(store, nil)
if err == nil {
// Cover both chunk rpc encoding and default encoding.
// nolint:gosec
if rand.Intn(2) == 0 {
se.GetSessionVars().EnableChunkRPC = false
} else {
se.GetSessionVars().EnableChunkRPC = true
}
}
return se, err
}

// Opt describes the option for creating session
Expand Down
Loading

0 comments on commit 85967fd

Please sign in to comment.