diff --git a/distsql/distsql.go b/distsql/distsql.go index 3219098b646e9..7840f5470cd0e 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -46,10 +46,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv. if resp == nil { return nil, errors.New("client returns nil response") } - encodeType := tipb.EncodeType_TypeDefault - if canUseChunkRPC(sctx) { - encodeType = tipb.EncodeType_TypeChunk - } // TODO: Add metric label and set open tracing. return &selectResult{ label: "mpp", @@ -58,7 +54,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv. fieldTypes: fieldTypes, ctx: sctx, feedback: statistics.NewQueryFeedback(0, nil, 0, false), - encodeType: encodeType, copPlanIDs: planIDs, rootPlanID: rootID, storeType: kv.TiFlash, @@ -130,10 +125,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie feedback: fb, }, nil } - encodetype := tipb.EncodeType_TypeDefault - if canUseChunkRPC(sctx) { - encodetype = tipb.EncodeType_TypeChunk - } return &selectResult{ label: "dag", resp: resp, @@ -143,7 +134,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie feedback: fb, sqlType: label, memTracker: kvReq.MemTracker, - encodeType: encodetype, storeType: kvReq.StoreType, paging: kvReq.Paging, }, nil @@ -186,12 +176,11 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte label = metrics.LblInternal } result := &selectResult{ - label: "analyze", - resp: resp, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - sqlType: label, - encodeType: tipb.EncodeType_TypeDefault, - storeType: kvReq.StoreType, + label: "analyze", + resp: resp, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + sqlType: label, + storeType: kvReq.StoreType, } return result, nil } @@ -205,12 +194,11 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars int return nil, errors.New("client returns nil response") } result := &selectResult{ - label: "checksum", - resp: resp, - feedback: statistics.NewQueryFeedback(0, nil, 0, false), - sqlType: metrics.LblGeneral, - encodeType: tipb.EncodeType_TypeDefault, - storeType: kvReq.StoreType, + label: "checksum", + resp: resp, + feedback: statistics.NewQueryFeedback(0, nil, 0, false), + sqlType: metrics.LblGeneral, + storeType: kvReq.StoreType, } return result, nil } diff --git a/distsql/select_result.go b/distsql/select_result.go index 8ae67446022e7..645ae3d175e9f 100644 --- a/distsql/select_result.go +++ b/distsql/select_result.go @@ -140,7 +140,6 @@ type selectResult struct { feedback *statistics.QueryFeedback partialCount int64 // number of partial results. sqlType string - encodeType tipb.EncodeType // copPlanIDs contains all copTasks' planIDs, // which help to collect copTasks' runtime stats. @@ -269,13 +268,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error { } } // TODO(Shenghui Wu): add metrics - switch r.selectResp.GetEncodeType() { + encodeType := r.selectResp.GetEncodeType() + switch encodeType { case tipb.EncodeType_TypeDefault: return r.readFromDefault(ctx, chk) case tipb.EncodeType_TypeChunk: return r.readFromChunk(ctx, chk) } - return errors.Errorf("unsupported encode type:%v", r.encodeType) + return errors.Errorf("unsupported encode type:%v", encodeType) } // NextRaw returns the next raw partial result. diff --git a/executor/admin_test.go b/executor/admin_test.go index 151340eebe12b..ffb31b5c46e32 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -1316,6 +1316,12 @@ func TestCheckFailReport(t *testing.T) { require.NoError(t, txn.Commit(tk.ctx)) ctx, hook := withLogHook(tk.ctx, t, "inconsistency") + + // TODO(tiancaiamao): admin check doesn't support the chunk protocol. + // Remove this after https://github.com/pingcap/tidb/issues/35156 + _, err = tk.Exec(ctx, "set @@tidb_enable_chunk_rpc = off") + require.NoError(t, err) + _, err = tk.Exec(ctx, "admin check table admin_test") require.Error(t, err) require.Equal(t, `[admin:8223]data inconsistency in table: admin_test, index: uk1, handle: 282574488403969, index-values:"handle: 282574488403969, values: [KindInt64 282578800083201 KindInt64 282574488403969]" != record-values:""`, err.Error()) diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 2aff291c5a818..736fb1bc6bf0b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -830,6 +830,10 @@ func TestDefaultValForAnalyze(t *testing.T) { for i := 1; i < 4; i++ { tk.MustExec("insert into t values (?)", i) } + + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + tk.MustQuery("select @@tidb_enable_fast_analyze").Check(testkit.Rows("0")) tk.MustQuery("select @@session.tidb_enable_fast_analyze").Check(testkit.Rows("0")) tk.MustExec("analyze table t with 0 topn;") diff --git a/executor/explain_test.go b/executor/explain_test.go index 6c5d28a4cad02..acfe8f1d241c9 100644 --- a/executor/explain_test.go +++ b/executor/explain_test.go @@ -355,6 +355,9 @@ func TestCheckActRowsWithUnistore(t *testing.T) { }, } + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + for _, test := range tests { checkActRows(t, tk, test.sql, test.expected) } diff --git a/expression/integration_test.go b/expression/integration_test.go index c3734a1a5c184..53be8080bb6fd 100644 --- a/expression/integration_test.go +++ b/expression/integration_test.go @@ -1919,6 +1919,7 @@ func TestCompareBuiltin(t *testing.T) { tk.MustQuery(`select 1 < 17666000000000000000, 1 > 17666000000000000000, 1 = 17666000000000000000`).Check(testkit.Rows("1 0 0")) tk.MustExec("drop table if exists t") + // insert value at utc timezone tk.MustExec("set time_zone = '+00:00'") tk.MustExec("create table t(a timestamp)") diff --git a/planner/core/cbo_test.go b/planner/core/cbo_test.go index a0aa419b95b86..12d0555ae31f7 100644 --- a/planner/core/cbo_test.go +++ b/planner/core/cbo_test.go @@ -230,6 +230,9 @@ func TestIndexRead(t *testing.T) { testKit.MustExec("create index ts on t (ts)") testKit.MustExec("create table t1 (a int, b int, index idx(a), index idxx(b))") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + testKit.MustExec("set @@tidb_enable_chunk_rpc = on") + // This stats is generated by following format: // fill (a, b, c, e) as (i*100+j, i, i+j, i*100+j), i and j is dependent and range of this two are [0, 99]. require.NoError(t, loadTableStats("analyzesSuiteTestIndexReadT.json", dom)) diff --git a/planner/core/enforce_mpp_test.go b/planner/core/enforce_mpp_test.go index 1b7f1792ea609..dab97226f436e 100644 --- a/planner/core/enforce_mpp_test.go +++ b/planner/core/enforce_mpp_test.go @@ -96,6 +96,9 @@ func TestEnforceMPP(t *testing.T) { tk.MustExec("create table t(a int, b int)") tk.MustExec("create index idx on t(a)") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Session()) is := dom.InfoSchema() diff --git a/planner/core/expression_rewriter_test.go b/planner/core/expression_rewriter_test.go index ee5f20d3f824c..93d94a3750410 100644 --- a/planner/core/expression_rewriter_test.go +++ b/planner/core/expression_rewriter_test.go @@ -409,6 +409,10 @@ func TestMultiColInExpression(t *testing.T) { Plan []string Res []string } + + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + expressionRewriterSuiteData := plannercore.GetExpressionRewriterSuiteData() expressionRewriterSuiteData.GetTestCases(t, &input, &output) for i, tt := range input { diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 096c7afab6051..ac8a18beb381f 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -479,6 +479,9 @@ func TestVerboseExplain(t *testing.T) { tk.MustExec("analyze table t2") tk.MustExec("analyze table t3") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Session()) is := dom.InfoSchema() @@ -5394,6 +5397,9 @@ func TestIndexJoinCost(t *testing.T) { tk.MustExec(`create table t_inner_pk (a int primary key)`) tk.MustExec(`create table t_inner_idx (a int, b int, key(a))`) + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + tk.MustQuery(`explain format=verbose select /*+ TIDB_INLJ(t_outer, t_inner_pk) */ * from t_outer, t_inner_pk where t_outer.a=t_inner_pk.a`).Check(testkit.Rows( // IndexJoin with inner TableScan `IndexJoin_11 12487.50 206368.09 root inner join, inner:TableReader_8, outer key:test.t_outer.a, inner key:test.t_inner_pk.a, equal cond:eq(test.t_outer.a, test.t_inner_pk.a)`, `├─TableReader_18(Build) 9990.00 36412.58 root data:Selection_17`, @@ -5448,6 +5454,9 @@ func TestHeuristicIndexSelection(t *testing.T) { tk.MustExec("create table t3(a bigint, b varchar(255), c bigint, primary key(a, b) clustered)") tk.MustExec("create table t4(a bigint, b varchar(255), c bigint, primary key(a, b) nonclustered)") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []string var output []struct { SQL string @@ -5475,6 +5484,9 @@ func TestOutputSkylinePruningInfo(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int, c int, d int, e int, f int, g int, primary key (a), unique key c_d_e (c, d, e), unique key f (f), unique key f_g (f, g), key g (g))") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []string var output []struct { SQL string @@ -5506,6 +5518,9 @@ func TestPreferRangeScanForUnsignedIntHandle(t *testing.T) { require.Nil(t, do.StatsHandle().DumpStatsDeltaToKV(handle.DumpAll)) tk.MustExec("analyze table t") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []string var output []struct { SQL string diff --git a/planner/core/partition_pruner_test.go b/planner/core/partition_pruner_test.go index 45f9e1f8f5529..2e7d704c8663e 100644 --- a/planner/core/partition_pruner_test.go +++ b/planner/core/partition_pruner_test.go @@ -361,6 +361,9 @@ func TestListColumnsPartitionPruner(t *testing.T) { tk2.MustExec("insert into t1 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,10,null)") tk2.MustExec("insert into t2 (id,a,b) values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),(null,null,null)") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk1.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []struct { SQL string Pruner string diff --git a/planner/core/plan_test.go b/planner/core/plan_test.go index 7b45db4c4bb7c..f0435241adece 100644 --- a/planner/core/plan_test.go +++ b/planner/core/plan_test.go @@ -58,6 +58,10 @@ func TestPreferRangeScan(t *testing.T) { tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;") tk.MustExec("insert into test(name,age,addr) select name,age,addr from test;") tk.MustExec("analyze table test;") + + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []string var output []struct { SQL string diff --git a/planner/core/stats_test.go b/planner/core/stats_test.go index 2959aae593e08..a7fc3d60fa779 100644 --- a/planner/core/stats_test.go +++ b/planner/core/stats_test.go @@ -133,6 +133,9 @@ func TestNDVGroupCols(t *testing.T) { tk.MustExec("analyze table t1") tk.MustExec("analyze table t2") + // Default RPC encoding may cause statistics explain result differ and then the test unstable. + tk.MustExec("set @@tidb_enable_chunk_rpc = on") + var input []string var output []struct { SQL string diff --git a/session/session.go b/session/session.go index 6e3320f9ddd67..e6b5997b78c4d 100644 --- a/session/session.go +++ b/session/session.go @@ -26,6 +26,7 @@ import ( "encoding/json" stderrs "errors" "fmt" + "math/rand" "runtime/pprof" "runtime/trace" "strconv" @@ -2743,7 +2744,17 @@ func (s *session) RefreshVars(ctx context.Context) error { // CreateSession4Test creates a new session environment for test. func CreateSession4Test(store kv.Storage) (Session, error) { - return CreateSession4TestWithOpt(store, nil) + se, err := CreateSession4TestWithOpt(store, nil) + if err == nil { + // Cover both chunk rpc encoding and default encoding. + // nolint:gosec + if rand.Intn(2) == 0 { + se.GetSessionVars().EnableChunkRPC = false + } else { + se.GetSessionVars().EnableChunkRPC = true + } + } + return se, err } // Opt describes the option for creating session diff --git a/store/mockstore/unistore/cophandler/cop_handler.go b/store/mockstore/unistore/cophandler/cop_handler.go index fd0e2b81ecf54..b5fbeaeb0841b 100644 --- a/store/mockstore/unistore/cophandler/cop_handler.go +++ b/store/mockstore/unistore/cophandler/cop_handler.go @@ -149,9 +149,9 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt resp.OtherError = err.Error() return resp } - return buildRespWithMPPExec(nil, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return genRespWithMPPExec(nil, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } - return buildRespWithMPPExec(chunks, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) + return genRespWithMPPExec(chunks, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime)) } func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (mppExec, []tipb.Chunk, []int64, []int64, error) { @@ -194,8 +194,7 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chun if err != nil { return } - var buf []byte - var datums []types.Datum + var chk *chunk.Chunk fields := exec.getFieldTypes() for { @@ -203,25 +202,66 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chun if err != nil || chk == nil || chk.NumRows() == 0 { return } - numRows := chk.NumRows() - for i := 0; i < numRows; i++ { - datums = datums[:0] - if dagReq.OutputOffsets != nil { - for _, j := range dagReq.OutputOffsets { - datums = append(datums, chk.GetRow(i).GetDatum(int(j), fields[j])) - } - } else { - for j, ft := range fields { - datums = append(datums, chk.GetRow(i).GetDatum(j, ft)) - } + + switch dagReq.EncodeType { + case tipb.EncodeType_TypeDefault: + chunks, err = useDefaultEncoding(chk, dagCtx, dagReq, fields, chunks) + case tipb.EncodeType_TypeChunk: + chunks = useChunkEncoding(chk, dagReq, fields, chunks) + default: + err = fmt.Errorf("unsupported DAG request encode type %s", dagReq.EncodeType) + } + if err != nil { + return + } + } +} + +func useDefaultEncoding(chk *chunk.Chunk, dagCtx *dagContext, dagReq *tipb.DAGRequest, + fields []*types.FieldType, chunks []tipb.Chunk) ([]tipb.Chunk, error) { + var buf []byte + var datums []types.Datum + var err error + numRows := chk.NumRows() + for i := 0; i < numRows; i++ { + datums = datums[:0] + if dagReq.OutputOffsets != nil { + for _, j := range dagReq.OutputOffsets { + datums = append(datums, chk.GetRow(i).GetDatum(int(j), fields[j])) } - buf, err = codec.EncodeValue(dagCtx.sc, buf[:0], datums...) - if err != nil { - return nil, errors.Trace(err) + } else { + for j, ft := range fields { + datums = append(datums, chk.GetRow(i).GetDatum(j, ft)) } - chunks = appendRow(chunks, buf, i) } + buf, err = codec.EncodeValue(dagCtx.sc, buf[:0], datums...) + if err != nil { + return nil, errors.Trace(err) + } + chunks = appendRow(chunks, buf, i) } + return chunks, nil +} + +func useChunkEncoding(chk *chunk.Chunk, dagReq *tipb.DAGRequest, fields []*types.FieldType, chunks []tipb.Chunk) []tipb.Chunk { + if dagReq.OutputOffsets != nil { + offsets := make([]int, len(dagReq.OutputOffsets)) + newFields := make([]*types.FieldType, len(dagReq.OutputOffsets)) + for i := 0; i < len(dagReq.OutputOffsets); i++ { + offset := dagReq.OutputOffsets[i] + offsets[i] = int(offset) + newFields[i] = fields[offset] + } + chk = chk.Prune(offsets) + fields = newFields + } + + c := chunk.NewCodec(fields) + buffer := c.Encode(chk) + chunks = append(chunks, tipb.Chunk{ + RowsData: buffer, + }) + return chunks } func buildDAG(reader *dbreader.DBReader, lockStore *lockstore.MemStore, req *coprocessor.Request) (*dagContext, *tipb.DAGRequest, error) { @@ -238,7 +278,17 @@ func buildDAG(reader *dbreader.DBReader, lockStore *lockstore.MemStore, req *cop return nil, nil, errors.Trace(err) } sc := flagsToStatementContext(dagReq.Flags) - sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset)) + switch dagReq.TimeZoneName { + case "": + sc.TimeZone = time.FixedZone("UTC", int(dagReq.TimeZoneOffset)) + case "System": + sc.TimeZone = time.Local + default: + sc.TimeZone, err = time.LoadLocation(dagReq.TimeZoneName) + if err != nil { + return nil, nil, errors.Trace(err) + } + } ctx := &dagContext{ evalContext: &evalContext{sc: sc}, dbReader: reader, @@ -394,13 +444,14 @@ func (e *ErrLocked) Error() string { return fmt.Sprintf("key is locked, key: %q, Type: %v, primary: %q, startTS: %v", e.Key, e.LockType, e.Primary, e.StartTS) } -func buildRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { +func genRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response { resp := &coprocessor.Response{} selResp := &tipb.SelectResponse{ Error: toPBError(err), Chunks: chunks, OutputCounts: counts, Ndvs: ndvs, + EncodeType: dagReq.EncodeType, } executors := dagReq.Executors if dagReq.CollectExecutionSummaries != nil && *dagReq.CollectExecutionSummaries {