From 285a3f7b5bd6d8251666f30ea79dd7b5636ddc05 Mon Sep 17 00:00:00 2001 From: "Zhuomin(Charming) Liu" Date: Mon, 30 Mar 2020 22:37:34 +0800 Subject: [PATCH 1/2] cherry pick #15258 to release-3.1 Signed-off-by: sre-bot --- distsql/request_builder.go | 10 +++ distsql/request_builder_test.go | 90 +++++++++++++++++++ infoschema/infoschema.go | 32 ++++++- store/mockstore/mocktikv/cop_handler_dag.go | 99 +++++++++++++++++++++ 4 files changed, 230 insertions(+), 1 deletion(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 61e668d059dd8..fdaeca24c3cbe 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -17,6 +17,7 @@ import ( "math" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" @@ -165,8 +166,17 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) +<<<<<<< HEAD builder.Request.ReplicaRead = sv.ReplicaRead builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion +======= + builder.Request.ReplicaRead = sv.GetReplicaRead() + if sv.SnapshotInfoschema != nil { + builder.Request.SchemaVar = infoschema.GetInfoSchemaBySessionVars(sv).SchemaMetaVersion() + } else { + builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion + } +>>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 7b353d8f29c82..4b9a764fb0be1 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -18,6 +18,11 @@ import ( "testing" . "github.com/pingcap/check" +<<<<<<< HEAD +======= + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/infoschema" +>>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -587,3 +592,88 @@ func (s *testSuite) TestRequestBuilder7(c *C) { c.Assert(actual, DeepEquals, expect) } +<<<<<<< HEAD +======= + +func (s *testSuite) TestRequestBuilder8(c *C) { + sv := variable.NewSessionVars() + sv.SnapshotInfoschema = infoschema.MockInfoSchemaWithSchemaVer(nil, 10000) + actual, err := (&RequestBuilder{}). + SetFromSessionVars(sv). + Build() + c.Assert(err, IsNil) + expect := &kv.Request{ + Tp: 0, + StartTs: 0x0, + Data: []uint8(nil), + Concurrency: 15, + IsolationLevel: 0, + Priority: 0, + MemTracker: (*memory.Tracker)(nil), + ReplicaRead: 0x1, + SchemaVar: 10000, + } + c.Assert(actual, DeepEquals, expect) +} + +func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) { + ranges := []*ranger.Range{ + { + LowVal: []types.Datum{types.NewIntDatum(1)}, + HighVal: []types.Datum{types.NewIntDatum(4)}, + }, + } + hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) + for i := 0; i < 10; i++ { + hist.Bounds.AppendInt64(0, int64(i)) + hist.Bounds.AppendInt64(0, int64(i+2)) + hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) + } + fb := statistics.NewQueryFeedback(0, hist, 0, false) + lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) + fb.Feedback = []statistics.Feedback{ + {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, + } + actual := TableRangesToKVRanges(0, ranges, fb) + expect := []kv.KeyRange{ + { + StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, + }, + } + for i := 0; i < len(actual); i++ { + c.Assert(actual[i], DeepEquals, expect[i]) + } +} + +func (s *testSuite) TestIndexRangesToKVRangesWithFbs(c *C) { + ranges := []*ranger.Range{ + { + LowVal: []types.Datum{types.NewIntDatum(1)}, + HighVal: []types.Datum{types.NewIntDatum(4)}, + }, + } + hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) + for i := 0; i < 10; i++ { + hist.Bounds.AppendInt64(0, int64(i)) + hist.Bounds.AppendInt64(0, int64(i+2)) + hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) + } + fb := statistics.NewQueryFeedback(0, hist, 0, false) + lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) + fb.Feedback = []statistics.Feedback{ + {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, + } + actual, err := IndexRangesToKVRanges(new(stmtctx.StatementContext), 0, 0, ranges, fb) + c.Assert(err, IsNil) + expect := []kv.KeyRange{ + { + StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, + EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, + }, + } + for i := 0; i < len(actual); i++ { + c.Assert(actual[i], DeepEquals, expect[i]) + } +} +>>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 03757c068de4f..5f4efc87f65ec 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" @@ -164,6 +165,30 @@ func MockInfoSchema(tbList []*model.TableInfo) InfoSchema { return result } +// MockInfoSchemaWithSchemaVer only serves for test. +func MockInfoSchemaWithSchemaVer(tbList []*model.TableInfo, schemaVer int64) InfoSchema { + result := &infoSchema{} + result.schemaMap = make(map[string]*schemaTables) + result.sortedTablesBuckets = make([]sortedTables, bucketCount) + dbInfo := &model.DBInfo{ID: 0, Name: model.NewCIStr("test"), Tables: tbList} + tableNames := &schemaTables{ + dbInfo: dbInfo, + tables: make(map[string]table.Table), + } + result.schemaMap["test"] = tableNames + for _, tb := range tbList { + tbl := table.MockTableFromMeta(tb) + tableNames.tables[tb.Name.L] = tbl + bucketIdx := tableBucketIdx(tb.ID) + result.sortedTablesBuckets[bucketIdx] = append(result.sortedTablesBuckets[bucketIdx], tbl) + } + for i := range result.sortedTablesBuckets { + sort.Sort(result.sortedTablesBuckets[i]) + } + result.schemaMetaVersion = schemaVer + return result +} + var _ InfoSchema = (*infoSchema)(nil) func (is *infoSchema) SchemaByName(schema model.CIStr) (val *model.DBInfo, ok bool) { @@ -443,7 +468,12 @@ func HasAutoIncrementColumn(tbInfo *model.TableInfo) (bool, string) { // GetInfoSchema gets TxnCtx InfoSchema if snapshot schema is not set, // Otherwise, snapshot schema is returned. func GetInfoSchema(ctx sessionctx.Context) InfoSchema { - sessVar := ctx.GetSessionVars() + return GetInfoSchemaBySessionVars(ctx.GetSessionVars()) +} + +// GetInfoSchemaBySessionVars gets TxnCtx InfoSchema if snapshot schema is not set, +// Otherwise, snapshot schema is returned. +func GetInfoSchemaBySessionVars(sessVar *variable.SessionVars) InfoSchema { var is InfoSchema if snap := sessVar.SnapshotInfoschema; snap != nil { is = snap.(InfoSchema) diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index bdb914118ff23..acef350cf83c9 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -556,6 +556,105 @@ func buildResp(chunks []tipb.Chunk, counts []int64, execDetails []*execDetail, e Chunks: chunks, OutputCounts: counts, } +<<<<<<< HEAD +======= + for i := range warnings { + selResp.Warnings = append(selResp.Warnings, toPBError(warnings[i].Err)) + } + return selResp +} + +func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dagReq *tipb.DAGRequest, dagCtx *dagContext, rows [][][]byte) error { + switch dagReq.EncodeType { + case tipb.EncodeType_TypeDefault: + h.encodeDefault(selResp, rows, dagReq.OutputOffsets) + case tipb.EncodeType_TypeChunk: + colTypes := h.constructRespSchema(dagCtx) + loc := dagCtx.evalCtx.sc.TimeZone + err := h.encodeChunk(selResp, rows, colTypes, dagReq.OutputOffsets, loc) + if err != nil { + return err + } + } + return nil +} + +func (h *rpcHandler) constructRespSchema(dagCtx *dagContext) []*types.FieldType { + root := dagCtx.dagReq.Executors[len(dagCtx.dagReq.Executors)-1] + agg := root.Aggregation + if root.StreamAgg != nil { + agg = root.StreamAgg + } + if agg == nil { + return dagCtx.evalCtx.fieldTps + } + + schema := make([]*types.FieldType, 0, len(agg.AggFunc)+len(agg.GroupBy)) + for i := range agg.AggFunc { + if agg.AggFunc[i].Tp == tipb.ExprType_Avg { + // Avg function requests two columns : Count , Sum + // This line addend the Count(TypeLonglong) to the schema. + schema = append(schema, types.NewFieldType(mysql.TypeLonglong)) + } + schema = append(schema, expression.PbTypeToFieldType(agg.AggFunc[i].FieldType)) + } + for i := range agg.GroupBy { + schema = append(schema, expression.PbTypeToFieldType(agg.GroupBy[i].FieldType)) + } + return schema +} + +func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte, colOrdinal []uint32) { + var chunks []tipb.Chunk + for i := range rows { + requestedRow := dummySlice + for _, ordinal := range colOrdinal { + requestedRow = append(requestedRow, rows[i][ordinal]...) + } + chunks = appendRow(chunks, requestedRow, i) + } + selResp.Chunks = chunks + selResp.EncodeType = tipb.EncodeType_TypeDefault +} + +func (h *rpcHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error { + var chunks []tipb.Chunk + respColTypes := make([]*types.FieldType, 0, len(colOrdinal)) + for _, ordinal := range colOrdinal { + respColTypes = append(respColTypes, colTypes[ordinal]) + } + chk := chunk.NewChunkWithCapacity(respColTypes, rowsPerChunk) + encoder := chunk.NewCodec(respColTypes) + decoder := codec.NewDecoder(chk, loc) + for i := range rows { + for j, ordinal := range colOrdinal { + _, err := decoder.DecodeOne(rows[i][ordinal], j, colTypes[ordinal]) + if err != nil { + return err + } + } + if i%rowsPerChunk == rowsPerChunk-1 { + chunks = append(chunks, tipb.Chunk{}) + cur := &chunks[len(chunks)-1] + cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) + chk.Reset() + } + } + if chk.NumRows() > 0 { + chunks = append(chunks, tipb.Chunk{}) + cur := &chunks[len(chunks)-1] + cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) + chk.Reset() + } + selResp.Chunks = chunks + selResp.EncodeType = tipb.EncodeType_TypeChunk + return nil +} + +func buildResp(selResp *tipb.SelectResponse, execDetails []*execDetail, err error) *coprocessor.Response { + resp := &coprocessor.Response{} + +>>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) if len(execDetails) > 0 { execSummary := make([]*tipb.ExecutorExecutionSummary, 0, len(execDetails)) for _, d := range execDetails { From 31adaf8145bb704da8b4f9e8384812e4dba1a183 Mon Sep 17 00:00:00 2001 From: lzmhhh123 Date: Tue, 31 Mar 2020 17:33:39 +0800 Subject: [PATCH 2/2] fix conflicts --- distsql/request_builder.go | 5 -- distsql/request_builder_test.go | 68 -------------- store/mockstore/mocktikv/cop_handler_dag.go | 99 --------------------- 3 files changed, 172 deletions(-) diff --git a/distsql/request_builder.go b/distsql/request_builder.go index fdaeca24c3cbe..c5e82a62a6312 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -166,17 +166,12 @@ func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *Req builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) -<<<<<<< HEAD builder.Request.ReplicaRead = sv.ReplicaRead - builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion -======= - builder.Request.ReplicaRead = sv.GetReplicaRead() if sv.SnapshotInfoschema != nil { builder.Request.SchemaVar = infoschema.GetInfoSchemaBySessionVars(sv).SchemaMetaVersion() } else { builder.Request.SchemaVar = sv.TxnCtx.SchemaVersion } ->>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 4b9a764fb0be1..e185d8b7a01c9 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -18,11 +18,7 @@ import ( "testing" . "github.com/pingcap/check" -<<<<<<< HEAD -======= - "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/infoschema" ->>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" @@ -592,8 +588,6 @@ func (s *testSuite) TestRequestBuilder7(c *C) { c.Assert(actual, DeepEquals, expect) } -<<<<<<< HEAD -======= func (s *testSuite) TestRequestBuilder8(c *C) { sv := variable.NewSessionVars() @@ -615,65 +609,3 @@ func (s *testSuite) TestRequestBuilder8(c *C) { } c.Assert(actual, DeepEquals, expect) } - -func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) { - ranges := []*ranger.Range{ - { - LowVal: []types.Datum{types.NewIntDatum(1)}, - HighVal: []types.Datum{types.NewIntDatum(4)}, - }, - } - hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) - for i := 0; i < 10; i++ { - hist.Bounds.AppendInt64(0, int64(i)) - hist.Bounds.AppendInt64(0, int64(i+2)) - hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) - } - fb := statistics.NewQueryFeedback(0, hist, 0, false) - lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) - fb.Feedback = []statistics.Feedback{ - {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, - } - actual := TableRangesToKVRanges(0, ranges, fb) - expect := []kv.KeyRange{ - { - StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, - EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, - }, - } - for i := 0; i < len(actual); i++ { - c.Assert(actual[i], DeepEquals, expect[i]) - } -} - -func (s *testSuite) TestIndexRangesToKVRangesWithFbs(c *C) { - ranges := []*ranger.Range{ - { - LowVal: []types.Datum{types.NewIntDatum(1)}, - HighVal: []types.Datum{types.NewIntDatum(4)}, - }, - } - hist := statistics.NewHistogram(1, 30, 30, 0, types.NewFieldType(mysql.TypeLonglong), chunk.InitialCapacity, 0) - for i := 0; i < 10; i++ { - hist.Bounds.AppendInt64(0, int64(i)) - hist.Bounds.AppendInt64(0, int64(i+2)) - hist.Buckets = append(hist.Buckets, statistics.Bucket{Repeat: 10, Count: int64(i + 30)}) - } - fb := statistics.NewQueryFeedback(0, hist, 0, false) - lower, upper := types.NewIntDatum(2), types.NewIntDatum(3) - fb.Feedback = []statistics.Feedback{ - {Lower: &lower, Upper: &upper, Count: 1, Repeat: 1}, - } - actual, err := IndexRangesToKVRanges(new(stmtctx.StatementContext), 0, 0, ranges, fb) - c.Assert(err, IsNil) - expect := []kv.KeyRange{ - { - StartKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1}, - EndKey: kv.Key{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5f, 0x69, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x3, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5}, - }, - } - for i := 0; i < len(actual); i++ { - c.Assert(actual[i], DeepEquals, expect[i]) - } -} ->>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) diff --git a/store/mockstore/mocktikv/cop_handler_dag.go b/store/mockstore/mocktikv/cop_handler_dag.go index acef350cf83c9..bdb914118ff23 100644 --- a/store/mockstore/mocktikv/cop_handler_dag.go +++ b/store/mockstore/mocktikv/cop_handler_dag.go @@ -556,105 +556,6 @@ func buildResp(chunks []tipb.Chunk, counts []int64, execDetails []*execDetail, e Chunks: chunks, OutputCounts: counts, } -<<<<<<< HEAD -======= - for i := range warnings { - selResp.Warnings = append(selResp.Warnings, toPBError(warnings[i].Err)) - } - return selResp -} - -func (h *rpcHandler) fillUpData4SelectResponse(selResp *tipb.SelectResponse, dagReq *tipb.DAGRequest, dagCtx *dagContext, rows [][][]byte) error { - switch dagReq.EncodeType { - case tipb.EncodeType_TypeDefault: - h.encodeDefault(selResp, rows, dagReq.OutputOffsets) - case tipb.EncodeType_TypeChunk: - colTypes := h.constructRespSchema(dagCtx) - loc := dagCtx.evalCtx.sc.TimeZone - err := h.encodeChunk(selResp, rows, colTypes, dagReq.OutputOffsets, loc) - if err != nil { - return err - } - } - return nil -} - -func (h *rpcHandler) constructRespSchema(dagCtx *dagContext) []*types.FieldType { - root := dagCtx.dagReq.Executors[len(dagCtx.dagReq.Executors)-1] - agg := root.Aggregation - if root.StreamAgg != nil { - agg = root.StreamAgg - } - if agg == nil { - return dagCtx.evalCtx.fieldTps - } - - schema := make([]*types.FieldType, 0, len(agg.AggFunc)+len(agg.GroupBy)) - for i := range agg.AggFunc { - if agg.AggFunc[i].Tp == tipb.ExprType_Avg { - // Avg function requests two columns : Count , Sum - // This line addend the Count(TypeLonglong) to the schema. - schema = append(schema, types.NewFieldType(mysql.TypeLonglong)) - } - schema = append(schema, expression.PbTypeToFieldType(agg.AggFunc[i].FieldType)) - } - for i := range agg.GroupBy { - schema = append(schema, expression.PbTypeToFieldType(agg.GroupBy[i].FieldType)) - } - return schema -} - -func (h *rpcHandler) encodeDefault(selResp *tipb.SelectResponse, rows [][][]byte, colOrdinal []uint32) { - var chunks []tipb.Chunk - for i := range rows { - requestedRow := dummySlice - for _, ordinal := range colOrdinal { - requestedRow = append(requestedRow, rows[i][ordinal]...) - } - chunks = appendRow(chunks, requestedRow, i) - } - selResp.Chunks = chunks - selResp.EncodeType = tipb.EncodeType_TypeDefault -} - -func (h *rpcHandler) encodeChunk(selResp *tipb.SelectResponse, rows [][][]byte, colTypes []*types.FieldType, colOrdinal []uint32, loc *time.Location) error { - var chunks []tipb.Chunk - respColTypes := make([]*types.FieldType, 0, len(colOrdinal)) - for _, ordinal := range colOrdinal { - respColTypes = append(respColTypes, colTypes[ordinal]) - } - chk := chunk.NewChunkWithCapacity(respColTypes, rowsPerChunk) - encoder := chunk.NewCodec(respColTypes) - decoder := codec.NewDecoder(chk, loc) - for i := range rows { - for j, ordinal := range colOrdinal { - _, err := decoder.DecodeOne(rows[i][ordinal], j, colTypes[ordinal]) - if err != nil { - return err - } - } - if i%rowsPerChunk == rowsPerChunk-1 { - chunks = append(chunks, tipb.Chunk{}) - cur := &chunks[len(chunks)-1] - cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) - chk.Reset() - } - } - if chk.NumRows() > 0 { - chunks = append(chunks, tipb.Chunk{}) - cur := &chunks[len(chunks)-1] - cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...) - chk.Reset() - } - selResp.Chunks = chunks - selResp.EncodeType = tipb.EncodeType_TypeChunk - return nil -} - -func buildResp(selResp *tipb.SelectResponse, execDetails []*execDetail, err error) *coprocessor.Response { - resp := &coprocessor.Response{} - ->>>>>>> 1637c42... distsql: fix wrong schema version when snapshot has been set (#15258) if len(execDetails) > 0 { execSummary := make([]*tipb.ExecutorExecutionSummary, 0, len(execDetails)) for _, d := range execDetails {