Skip to content

Commit

Permalink
distsql: clean the memory usage of MemTracker when a query end… (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
SunRunAway committed Jun 27, 2019
1 parent 94d8127 commit 69dd7a1
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 14 deletions.
17 changes: 17 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")).
Build()
c.Assert(err, IsNil)

Expand Down Expand Up @@ -106,13 +107,29 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(numAllRows, Equals, 2)
err := response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectMemTracker(c *C) {
response, colTypes := s.createSelectNormal(2, 6, c, nil)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 3, 3)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.IsFull(), Equals, true)
err = response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
c.Assert(response.Close(), IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
Expand Down
5 changes: 5 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -49,6 +51,9 @@ type testSuite struct {

func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
Expand Down
45 changes: 31 additions & 14 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand Down Expand Up @@ -103,20 +104,25 @@ func (r *selectResult) fetch(ctx context.Context) {
if err != nil {
result.err = err
} else if resultSubset == nil {
// If the result is drained, the resultSubset would be nil
return
} else {
result.result = resultSubset
if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}
r.memConsume(int64(resultSubset.MemSize()))
}

select {
case r.results <- result:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
case <-ctx.Done():
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
}
}
Expand Down Expand Up @@ -161,24 +167,21 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.memConsume(-int64(re.result.MemSize()))
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
r.selectRespSize = r.selectResp.Size()
r.memConsume(int64(r.selectRespSize))
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down Expand Up @@ -234,13 +237,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
return nil
}

func (r *selectResult) memConsume(bytes int64) {
if r.memTracker != nil {
r.memTracker.Consume(bytes)
}
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
// Close this channel to tell the fetch goroutine to exit.
close(r.closed)
for re := range r.results {
if re.result != nil {
r.memConsume(-int64(re.result.MemSize()))
}
}
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
return r.resp.Close()
}

0 comments on commit 69dd7a1

Please sign in to comment.