Skip to content

Commit

Permalink
distsql: clean the memory usage of MemTracker when a query end… (#10898
Browse files Browse the repository at this point in the history
…) (#10971)
  • Loading branch information
SunRunAway authored and jackysp committed Jun 28, 2019
1 parent daea7cb commit b65f81f
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 20 deletions.
17 changes: 17 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C) (*selectResul
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, "testSuite.createSelectNormal").
Build()
c.Assert(err, IsNil)

Expand Down Expand Up @@ -94,13 +95,29 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(numAllRows, Equals, 2)
err := response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectMemTracker(c *C) {
response, colTypes := s.createSelectNormal(2, 6, c)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 3, 3)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.IsFull(), Equals, true)
err = response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
c.Assert(response.Close(), IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) createSelectStreaming(batch, totalRows int, c *C) (*streamResult, []*types.FieldType) {
Expand Down
4 changes: 4 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -49,6 +50,9 @@ type testSuite struct {

func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker("testSuite", variable.DefTiDBMemQuotaDistSQL),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
Expand Down
57 changes: 37 additions & 20 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand All @@ -94,25 +95,30 @@ func (r *selectResult) fetch(ctx context.Context) {
metrics.DistSQLQueryHistgram.WithLabelValues(r.label, r.sqlType).Observe(duration.Seconds())
}()
for {
var result resultWithErr
resultSubset, err := r.resp.Next(ctx)
if err != nil {
r.results <- resultWithErr{err: errors.Trace(err)}
result.err = err
} else if resultSubset == nil {
// If the result is drained, the resultSubset would be nil
return
}
if resultSubset == nil {
return
}

if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
} else {
result.result = resultSubset
r.memConsume(int64(resultSubset.MemSize()))
}

select {
case r.results <- resultWithErr{result: resultSubset}:
case r.results <- result:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
case <-ctx.Done():
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
}
}
Expand Down Expand Up @@ -157,24 +163,21 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.memConsume(-int64(re.result.MemSize()))
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
r.selectRespSize = r.selectResp.Size()
r.memConsume(int64(r.selectRespSize))
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down Expand Up @@ -207,13 +210,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
return nil
}

func (r *selectResult) memConsume(bytes int64) {
if r.memTracker != nil {
r.memTracker.Consume(bytes)
}
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
// Close this channel to tell the fetch goroutine to exit.
close(r.closed)
for re := range r.results {
if re.result != nil {
r.memConsume(-int64(re.result.MemSize()))
}
}
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
return r.resp.Close()
}

0 comments on commit b65f81f

Please sign in to comment.