Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: clean the memory usage of MemTracker when a query ends (#10893) #10898

Merged
merged 5 commits into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")).
Build()
c.Assert(err, IsNil)

Expand Down Expand Up @@ -106,13 +107,29 @@ func (s *testSuite) TestSelectNormal(c *C) {
c.Assert(numAllRows, Equals, 2)
err := response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectMemTracker(c *C) {
response, colTypes := s.createSelectNormal(2, 6, c, nil)
response.Fetch(context.TODO())

// Test Next.
chk := chunk.New(colTypes, 3, 3)
err := response.Next(context.TODO(), chk)
c.Assert(err, IsNil)
c.Assert(chk.IsFull(), Equals, true)
err = response.Close()
c.Assert(err, IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectNormalChunkSize(c *C) {
response, colTypes := s.createSelectNormal(100, 1000000, c, nil)
response.Fetch(context.TODO())
s.testChunkSize(response, colTypes, c)
c.Assert(response.Close(), IsNil)
c.Assert(response.memTracker.BytesConsumed(), Equals, int64(0))
}

func (s *testSuite) TestSelectWithRuntimeStats(c *C) {
Expand Down
5 changes: 5 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"
)
Expand All @@ -52,6 +54,9 @@ type testSuite struct {

func (s *testSuite) SetUpSuite(c *C) {
ctx := mock.NewContext()
ctx.GetSessionVars().StmtCtx = &stmtctx.StatementContext{
MemTracker: memory.NewTracker(stringutil.StringerStr("testSuite"), variable.DefTiDBMemQuotaDistSQL),
}
ctx.Store = &mock.Store{
Client: &mock.Client{
MockResponse: &mockResponse{
Expand Down
45 changes: 31 additions & 14 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type selectResult struct {
fieldTypes []*types.FieldType
ctx sessionctx.Context

selectResp *tipb.SelectResponse
respChkIdx int
selectResp *tipb.SelectResponse
selectRespSize int // record the selectResp.Size() when it is initialized.
respChkIdx int

feedback *statistics.QueryFeedback
partialCount int64 // number of partial results.
Expand Down Expand Up @@ -103,20 +104,25 @@ func (r *selectResult) fetch(ctx context.Context) {
if err != nil {
result.err = err
} else if resultSubset == nil {
// If the result is drained, the resultSubset would be nil
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check in here?

Copy link
Contributor Author

@SunRunAway SunRunAway Jun 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we should. Per https://godoc.org/github.com/pingcap/tidb/kv#Response, resultSubset may be nil even if no error take place.

	// Next returns a resultSubset from a single storage unit.
	// When full result set is returned, nil is returned.
	Next(ctx context.Context) (resultSubset ResultSubset, err error)

I will add a comment here.

} else {
result.result = resultSubset
if r.memTracker != nil {
r.memTracker.Consume(int64(resultSubset.MemSize()))
}
r.memConsume(int64(resultSubset.MemSize()))
}

select {
case r.results <- result:
case <-r.closed:
// If selectResult called Close() already, make fetch goroutine exit.
if resultSubset != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to check nil again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if line 102 occurs error, resultSubset would be nil.

r.memConsume(-int64(resultSubset.MemSize()))
}
return
case <-ctx.Done():
if resultSubset != nil {
r.memConsume(-int64(resultSubset.MemSize()))
}
return
}
}
Expand Down Expand Up @@ -161,24 +167,21 @@ func (r *selectResult) getSelectResp() error {
if re.err != nil {
return errors.Trace(re.err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(-int64(r.selectResp.Size()))
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
if re.result == nil {
r.selectResp = nil
return nil
}
if r.memTracker != nil {
r.memTracker.Consume(-int64(re.result.MemSize()))
}
r.memConsume(-int64(re.result.MemSize()))
r.selectResp = new(tipb.SelectResponse)
err := r.selectResp.Unmarshal(re.result.GetData())
if err != nil {
return errors.Trace(err)
}
if r.memTracker != nil && r.selectResp != nil {
r.memTracker.Consume(int64(r.selectResp.Size()))
}
r.selectRespSize = r.selectResp.Size()
r.memConsume(int64(r.selectRespSize))
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
Expand Down Expand Up @@ -234,13 +237,27 @@ func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
return nil
}

func (r *selectResult) memConsume(bytes int64) {
if r.memTracker != nil {
r.memTracker.Consume(bytes)
}
}

// Close closes selectResult.
func (r *selectResult) Close() error {
// Close this channel tell fetch goroutine to exit.
if r.feedback.Actual() >= 0 {
metrics.DistSQLScanKeysHistogram.Observe(float64(r.feedback.Actual()))
}
metrics.DistSQLPartialCountHistogram.Observe(float64(r.partialCount))
// Close this channel to tell the fetch goroutine to exit.
close(r.closed)
for re := range r.results {
if re.result != nil {
r.memConsume(-int64(re.result.MemSize()))
}
}
if r.selectResp != nil {
r.memConsume(-int64(r.selectRespSize))
}
return r.resp.Close()
}