Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: implement chunk rpc encoding for unistore #35114

Merged
merged 15 commits into from
Jun 6, 2022
Merged
32 changes: 10 additions & 22 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
if resp == nil {
return nil, errors.New("client returns nil response")
}
encodeType := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodeType = tipb.EncodeType_TypeChunk
}
// TODO: Add metric label and set open tracing.
return &selectResult{
label: "mpp",
Expand All @@ -58,7 +54,6 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.
fieldTypes: fieldTypes,
ctx: sctx,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
encodeType: encodeType,
copPlanIDs: planIDs,
rootPlanID: rootID,
storeType: kv.TiFlash,
Expand Down Expand Up @@ -130,10 +125,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
}, nil
}
encodetype := tipb.EncodeType_TypeDefault
if canUseChunkRPC(sctx) {
encodetype = tipb.EncodeType_TypeChunk
}
return &selectResult{
label: "dag",
resp: resp,
Expand All @@ -143,7 +134,6 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
feedback: fb,
sqlType: label,
memTracker: kvReq.MemTracker,
encodeType: encodetype,
storeType: kvReq.StoreType,
paging: kvReq.Paging,
}, nil
Expand Down Expand Up @@ -186,12 +176,11 @@ func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars inte
label = metrics.LblInternal
}
result := &selectResult{
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "analyze",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: label,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand All @@ -205,12 +194,11 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars int
return nil, errors.New("client returns nil response")
}
result := &selectResult{
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
encodeType: tipb.EncodeType_TypeDefault,
storeType: kvReq.StoreType,
label: "checksum",
resp: resp,
feedback: statistics.NewQueryFeedback(0, nil, 0, false),
sqlType: metrics.LblGeneral,
storeType: kvReq.StoreType,
}
return result, nil
}
Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ type selectResult struct {
feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
sqlType string
encodeType tipb.EncodeType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already a encodeType in this.selectResp.DAGRequest
This field is duplicated...


// copPlanIDs contains all copTasks' planIDs,
// which help to collect copTasks' runtime stats.
Expand Down Expand Up @@ -269,13 +268,14 @@ func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
}
}
// TODO(Shenghui Wu): add metrics
switch r.selectResp.GetEncodeType() {
encodeType := r.selectResp.GetEncodeType()
switch encodeType {
case tipb.EncodeType_TypeDefault:
return r.readFromDefault(ctx, chk)
case tipb.EncodeType_TypeChunk:
return r.readFromChunk(ctx, chk)
}
return errors.Errorf("unsupported encode type:%v", r.encodeType)
return errors.Errorf("unsupported encode type:%v", encodeType)
}

// NextRaw returns the next raw partial result.
Expand Down
10 changes: 9 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"encoding/json"
stderrs "errors"
"fmt"
"math/rand"
"runtime/pprof"
"runtime/trace"
"strconv"
Expand Down Expand Up @@ -2737,7 +2738,14 @@ func (s *session) RefreshVars(ctx context.Context) error {

// CreateSession4Test creates a new session environment for test.
func CreateSession4Test(store kv.Storage) (Session, error) {
return CreateSession4TestWithOpt(store, nil)
se, err := CreateSession4TestWithOpt(store, nil)
if err == nil {
if rand.Intn(2) == 0 {
// Cover both chunk rpc encoding and default encoding.
se.GetSessionVars().EnableChunkRPC = false
}
}
return se, err
}

// Opt describes the option for creating session
Expand Down
81 changes: 61 additions & 20 deletions store/mockstore/unistore/cophandler/cop_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ func handleCopDAGRequest(dbReader *dbreader.DBReader, lockStore *lockstore.MemSt
resp.OtherError = err.Error()
return resp
}
return buildRespWithMPPExec(nil, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime))
return genRespWithMPPExec(nil, nil, nil, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime))
}
return buildRespWithMPPExec(chunks, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime))
return genRespWithMPPExec(chunks, counts, ndvs, exec, dagReq, err, dagCtx.sc.GetWarnings(), time.Since(startTime))
}

func buildAndRunMPPExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (mppExec, []tipb.Chunk, []int64, []int64, error) {
Expand Down Expand Up @@ -194,34 +194,74 @@ func mppExecute(exec mppExec, dagCtx *dagContext, dagReq *tipb.DAGRequest) (chun
if err != nil {
return
}
var buf []byte
var datums []types.Datum

var chk *chunk.Chunk
fields := exec.getFieldTypes()
for {
chk, err = exec.next()
if err != nil || chk == nil || chk.NumRows() == 0 {
return
}
numRows := chk.NumRows()
for i := 0; i < numRows; i++ {
datums = datums[:0]
if dagReq.OutputOffsets != nil {
for _, j := range dagReq.OutputOffsets {
datums = append(datums, chk.GetRow(i).GetDatum(int(j), fields[j]))
}
} else {
for j, ft := range fields {
datums = append(datums, chk.GetRow(i).GetDatum(j, ft))
}

switch dagReq.EncodeType {
case tipb.EncodeType_TypeDefault:
chunks, err = useDefaultEncoding(chk, dagCtx, dagReq, fields, chunks)
case tipb.EncodeType_TypeChunk:
chunks = useChunkEncoding(chk, dagReq, fields, chunks)
default:
err = fmt.Errorf("unsupported DAG request encode type %s", dagReq.EncodeType)
}
if err != nil {
return
}
}
}

func useDefaultEncoding(chk *chunk.Chunk, dagCtx *dagContext, dagReq *tipb.DAGRequest,
fields []*types.FieldType, chunks []tipb.Chunk) ([]tipb.Chunk, error) {
var buf []byte
var datums []types.Datum
var err error
numRows := chk.NumRows()
for i := 0; i < numRows; i++ {
datums = datums[:0]
if dagReq.OutputOffsets != nil {
for _, j := range dagReq.OutputOffsets {
datums = append(datums, chk.GetRow(i).GetDatum(int(j), fields[j]))
}
buf, err = codec.EncodeValue(dagCtx.sc, buf[:0], datums...)
if err != nil {
return nil, errors.Trace(err)
} else {
for j, ft := range fields {
datums = append(datums, chk.GetRow(i).GetDatum(j, ft))
}
chunks = appendRow(chunks, buf, i)
}
buf, err = codec.EncodeValue(dagCtx.sc, buf[:0], datums...)
if err != nil {
return nil, errors.Trace(err)
}
chunks = appendRow(chunks, buf, i)
}
return chunks, nil
}

func useChunkEncoding(chk *chunk.Chunk, dagReq *tipb.DAGRequest, fields []*types.FieldType, chunks []tipb.Chunk) []tipb.Chunk {
if dagReq.OutputOffsets != nil {
offsets := make([]int, len(dagReq.OutputOffsets))
newFields := make([]*types.FieldType, len(dagReq.OutputOffsets))
for i := 0; i < len(dagReq.OutputOffsets); i++ {
offset := dagReq.OutputOffsets[i]
offsets[i] = int(offset)
newFields[i] = fields[offset]
}
chk = chk.Prune(offsets)
fields = newFields
}

c := chunk.NewCodec(fields)
buffer := c.Encode(chk)
chunks = append(chunks, tipb.Chunk{
RowsData: buffer,
})
return chunks
}

func buildDAG(reader *dbreader.DBReader, lockStore *lockstore.MemStore, req *coprocessor.Request) (*dagContext, *tipb.DAGRequest, error) {
Expand Down Expand Up @@ -394,13 +434,14 @@ func (e *ErrLocked) Error() string {
return fmt.Sprintf("key is locked, key: %q, Type: %v, primary: %q, startTS: %v", e.Key, e.LockType, e.Primary, e.StartTS)
}

func buildRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response {
func genRespWithMPPExec(chunks []tipb.Chunk, counts, ndvs []int64, exec mppExec, dagReq *tipb.DAGRequest, err error, warnings []stmtctx.SQLWarn, dur time.Duration) *coprocessor.Response {
resp := &coprocessor.Response{}
selResp := &tipb.SelectResponse{
Error: toPBError(err),
Chunks: chunks,
OutputCounts: counts,
Ndvs: ndvs,
EncodeType: dagReq.EncodeType,
}
executors := dagReq.Executors
if dagReq.CollectExecutionSummaries != nil && *dagReq.CollectExecutionSummaries {
Expand Down
2 changes: 1 addition & 1 deletion util/chunk/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (c *Codec) Encode(chk *Chunk) []byte {
return buffer
}

func (c *Codec) encodeColumn(buffer []byte, col *Column) []byte {
func (_ *Codec) encodeColumn(buffer []byte, col *Column) []byte {
var lenBuffer [4]byte
// encode length.
binary.LittleEndian.PutUint32(lenBuffer[:], uint32(col.length))
Expand Down