Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Control the number of rows returned by SelectResult #9273

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 99 additions & 10 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/cznic/mathutil"
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/charset"
Expand All @@ -34,7 +35,7 @@ import (
"github.com/pingcap/tipb/go-tipb"
)

func (s *testSuite) TestSelectNormal(c *C) {
func (s *testSuite) createSelectNormal(batch, totalRows int, c *C) (*selectResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
Expand Down Expand Up @@ -67,25 +68,41 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(result.sqlType, Equals, "general")
c.Assert(result.rowLen, Equals, len(colTypes))

resp, ok := result.resp.(*mockResponse)
c.Assert(ok, IsTrue)
resp.total = totalRows
resp.batch = batch

return result, colTypes
}

func (s *testSuite) TestSelectNormal(c *C) {
response, colTypes := s.createSelectNormal(1, 2, c)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
break
}
}
c.Assert(numAllRows, Equals, 2)
err = response.Close()
err := response.Close()
c.Assert(err, IsNil)
}

func (s *testSuite) TestSelectStreaming(c *C) {
func (s *testSuite) TestSelectNormalBatchSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c)
response.Fetch(context.TODO())
s.testBatchSize(response, colTypes, c)
}

func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetDAGRequest(&tipb.DAGRequest{}).
SetDesc(false).
Expand All @@ -112,31 +129,94 @@ func (s *testSuite) TestSelectStreaming(c *C) {

s.sctx.GetSessionVars().EnableStreaming = true

// Test Next.
response, err := Select(context.TODO(), s.sctx, request, colTypes, statistics.NewQueryFeedback(0, nil, 0, false))
c.Assert(err, IsNil)
result, ok := response.(*streamResult)
c.Assert(ok, IsTrue)
c.Assert(result.rowLen, Equals, len(colTypes))

resp, ok := result.resp.(*mockResponse)
c.Assert(ok, IsTrue)
resp.total = totalRows
resp.batch = batch

return result, colTypes
}

func (s *testSuite) TestSelectStreaming(c *C) {
response, colTypes := s.createSelectStreaming(1, 2, c)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 32, 32)
numAllRows := 0
for {
err = response.Next(context.TODO(), chk)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
numAllRows += chk.NumRows()
if chk.NumRows() == 0 {
break
}
}
c.Assert(numAllRows, Equals, 2)
err = response.Close()
err := response.Close()
c.Assert(err, IsNil)
}

func (s *testSuite) TestSelectStreamingBatchSize(c *C) {
response, colTypes := s.createSelectStreaming(100, 1000000, c)
response.Fetch(context.TODO())
s.testBatchSize(response, colTypes, c)
}

func (s *testSuite) testBatchSize(response SelectResult, colTypes []*types.FieldType, c *C) {
chk := chunk.New(colTypes, 32, 32)
batch := chunk.NewRecordBatch(chk)

err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.NumRows(), Equals, 32)

err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 32)

batch.SetRequiredRows(1)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 1)

batch.SetRequiredRows(2)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 2)

batch.SetRequiredRows(17)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 17)

batch.SetRequiredRows(170)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 32)

batch.SetRequiredRows(32)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 32)

batch.SetRequiredRows(0)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 32)

batch.SetRequiredRows(-1)
err = response.NextBatch(context.TODO(), batch)
c.Assert(err, IsNil)
c.Assert(batch.NumRows(), Equals, 32)
}

func (s *testSuite) TestAnalyze(c *C) {
request, err := (&RequestBuilder{}).SetKeyRanges(nil).
SetAnalyzeRequest(&tipb.AnalyzeReq{}).
Expand Down Expand Up @@ -166,6 +246,8 @@ func (s *testSuite) TestAnalyze(c *C) {
// Used only for test.
type mockResponse struct {
count int
total int
batch int
sync.Mutex
}

Expand All @@ -183,17 +265,24 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
resp.Lock()
defer resp.Unlock()

if resp.count == 2 {
if resp.count >= resp.total {
return nil, nil
}
defer func() { resp.count++ }()
numRows := mathutil.Min(resp.batch, resp.total-resp.count)
resp.count += numRows

datum := types.NewIntDatum(1)
bytes := make([]byte, 0, 100)
bytes, _ = codec.EncodeValue(nil, bytes, datum, datum, datum, datum)
chunks := make([]tipb.Chunk, numRows)
for i := range chunks {
chkData := make([]byte, len(bytes))
copy(chkData, bytes)
chunks[i] = tipb.Chunk{RowsData: chkData}
}

respPB := &tipb.SelectResponse{
Chunks: []tipb.Chunk{{RowsData: bytes}},
Chunks: chunks,
OutputCounts: []int64{1},
}
respBytes, err := respPB.Marshal()
Expand Down
10 changes: 8 additions & 2 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{},
MockResponse: &mockResponse{
batch: 1,
total: 2,
},
},
}
s.sctx = ctx
Expand All @@ -67,7 +70,10 @@ func (s *testSuite) SetUpTest(c *C) {
ctx := s.sctx.(*mock.Context)
store := ctx.Store.(*mock.Store)
store.Client = &mock.Client{
MockResponse: &mockResponse{},
MockResponse: &mockResponse{
batch: 1,
total: 2,
},
}
}

Expand Down
23 changes: 15 additions & 8 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type SelectResult interface {
// NextRaw gets the next raw result.
NextRaw(context.Context) ([]byte, error)
// Next reads the data into chunk.
Next(context.Context, *chunk.Chunk) error
// TODO: replace all calls of Next to NextBatch and remove this Next method
Next(ctx context.Context, chk *chunk.Chunk) error
// NextBatch reads the data into batch.
NextBatch(ctx context.Context, batch *chunk.RecordBatch) error
// Close closes the iterator.
Close() error
}
Expand Down Expand Up @@ -115,15 +118,20 @@ func (r *selectResult) NextRaw(ctx context.Context) ([]byte, error) {

// Next reads data to the chunk.
func (r *selectResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
for chk.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
return r.NextBatch(ctx, chunk.NewRecordBatch(chk))
}

// NextBatch reads the data into batch.
func (r *selectResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error {
batch.Reset()
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
if r.selectResp == nil || r.respChkIdx == len(r.selectResp.Chunks) {
err := r.getSelectResp()
if err != nil || r.selectResp == nil {
return errors.Trace(err)
}
}
err := r.readRowsData(chk)
err := r.readRowsData(batch)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -167,11 +175,10 @@ func (r *selectResult) getSelectResp() error {
}
}

func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
func (r *selectResult) readRowsData(batch *chunk.RecordBatch) (err error) {
rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(rowsData) > 0 {
decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location())
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(rowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i])
if err != nil {
Expand Down
20 changes: 12 additions & 8 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ type streamResult struct {

func (r *streamResult) Fetch(context.Context) {}

// Next reads data to the chunk.
func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
for chk.NumRows() < maxChunkSize {
return r.NextBatch(ctx, chunk.NewRecordBatch(chk))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about setting the required rows of that newly created batch to max chunk size?

}

// NextBatch reads the data into batch.
func (r *streamResult) NextBatch(ctx context.Context, batch *chunk.RecordBatch) error {
batch.Reset()
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize {
err := r.readDataIfNecessary(ctx)
if err != nil {
return errors.Trace(err)
Expand All @@ -55,7 +60,7 @@ func (r *streamResult) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}

err = r.flushToChunk(chk)
err = r.flushToBatch(batch)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -113,11 +118,10 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
return nil
}

func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
func (r *streamResult) flushToBatch(batch *chunk.RecordBatch) (err error) {
remainRowsData := r.curr.RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().Location())
for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 {
decoder := codec.NewDecoder(batch.Chunk, r.ctx.GetSessionVars().Location())
for !batch.IsFull() && batch.NumRows() < r.ctx.GetSessionVars().MaxChunkSize && len(remainRowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
if err != nil {
Expand Down
36 changes: 35 additions & 1 deletion util/chunk/recordbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,46 @@

package chunk

// UnspecifiedNumRows represents requiredRows is not specified.
const UnspecifiedNumRows = 0

// RecordBatch is input parameter of Executor.Next` method.
type RecordBatch struct {
*Chunk

// requiredRows indicates how many rows is required by the parent executor.
// Child executor should stop populating rows immediately if there are at
// least required rows in the Chunk.
requiredRows int
}

// NewRecordBatch is used to construct a RecordBatch.
func NewRecordBatch(chk *Chunk) *RecordBatch {
return &RecordBatch{chk}
return &RecordBatch{chk, UnspecifiedNumRows}
}

// SetRequiredRows sets the number of rows the parent executor want.
func (rb *RecordBatch) SetRequiredRows(numRows int) *RecordBatch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we allocate a RecordBatch with zero default values, rb := &RecordBatch{}, and without calling SetRequiredRows(), 0 is not treated like unspecified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnspecifiedNumRows has been updated to zero value of int.

if numRows <= 0 {
numRows = UnspecifiedNumRows
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If numRows <= 0, then set the requiredRows to ctx.MaxChunkSize?

}
rb.requiredRows = numRows
return rb
}

// RequiredRows returns how many rows the parent executor want.
func (rb *RecordBatch) RequiredRows() int {
return rb.requiredRows
}

// IsFull returns if this batch can be considered full.
// IsFull only takes requiredRows into account, the caller of this method should
// also consider maxChunkSize, then it should behave like:
// if !batch.IsFull() && batch.NumRows() < maxChunkSize { ... }
func (rb *RecordBatch) IsFull() bool {
if rb.requiredRows == UnspecifiedNumRows {
return false
}

return rb.NumRows() >= rb.requiredRows
}
Loading