Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: clean the memory usage of MemTracker when a query end… (#10898) #10971

Merged
merged 3 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C) (*selectResul
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, "testSuite.createSelectNormal").
Build()
c.Assert(err, IsNil)

Expand Down Expand Up @@ -94,13 +95,29 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(numAllRows, Equals, 2)
err := response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectMemTracker(c *C) {
response, colTypes := s.createSelectNormal(2, 6, c)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 3, 3)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.IsFull(), Equals, true)
err = response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
c.Assert(response.Close(), IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
Expand Down
4 changes: 4 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -49,6 +50,9 @@ type testSuite struct {

func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker("testSuite", variable.DefTiDBMemQuotaDistSQL),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
Expand Down
57 changes: 37 additions & 20 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand All @@ -94,25 +95,30 @@ func (r *selectResult) fetch(ctx context.Context) {
metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds())
}()
for {
var result resultWithErr
resultSubset, err := r.resp.Next(ctx)
if err != nil {
r.results <- resultWithErr{err: errors.Trace(err)}
result.err = err
} else if resultSubset == nil {
// If the result is drained, the resultSubset would be nil
return
}
if resultSubset == nil {
return
}

if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
} else {
result.result = resultSubset
r.memConsume(int64(resultSubset.MemSize()))
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case r.results <- result:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
case <-ctx.Done():
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
}
}
Expand Down Expand Up @@ -157,24 +163,21 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.memConsume(-int64(re.result.MemSize()))
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
r.selectRespSize = r.selectResp.Size()
r.memConsume(int64(r.selectRespSize))
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down Expand Up @@ -207,13 +210,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
return nil
}

func (r *selectResult) memConsume(bytes int64) {
if r.memTracker != nil {
r.memTracker.Consume(bytes)
}
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
// Close this channel to tell the fetch goroutine to exit.
close(r.closed)
for re := range r.results {
if re.result != nil {
r.memConsume(-int64(re.result.MemSize()))
}
}
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
return r.resp.Close()
}