Skip to content

Commit

Permalink
*: fix memory leak introduced by timer.After (#6720)
Browse files Browse the repository at this point in the history
close #6719

Signed-off-by: lhy1024 <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Jul 3, 2023
1 parent 05f71e0 commit ff67696
Show file tree
Hide file tree
Showing 21 changed files with 253 additions and 53 deletions.
4 changes: 3 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,16 @@ func newClientWithKeyspaceName(

func (c *client) initRetry(f func(s string) error, str string) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(str); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,16 @@ func (c *pdServiceDiscovery) Init() error {

func (c *pdServiceDiscovery) initRetry(f func() error) error {
var err error
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for i := 0; i < c.option.maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(time.Second):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down
4 changes: 3 additions & 1 deletion client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,8 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
err error
stream rmpb.ResourceManager_AcquireTokenBucketsClient
)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
cc, err := c.resourceManagerClient()
if err != nil {
Expand All @@ -406,7 +408,7 @@ func (c *client) tryResourceManagerConnect(ctx context.Context, connection *reso
select {
case <-ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return err
Expand Down
43 changes: 43 additions & 0 deletions client/timerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"sync"
"time"
)

// GlobalTimerPool is a global pool for reusing *time.Timer.
var GlobalTimerPool TimerPool

// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
type TimerPool struct {
pool sync.Pool
}

// Get returns a timer with a given duration.
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
if v := tp.pool.Get(); v != nil {
timer := v.(*time.Timer)
timer.Reset(d)
return timer
}
return time.NewTimer(d)
}

// Put tries to call timer.Stop() before putting it back into pool,
// if the timer.Stop() returns false (it has either already expired or been stopped),
// have a shot at draining the channel with residual time if there is one.
func (tp *TimerPool) Put(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
tp.pool.Put(timer)
}
70 changes: 70 additions & 0 deletions client/timerpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"testing"
"time"
)

func TestTimerPool(t *testing.T) {
var tp TimerPool

for i := 0; i < 100; i++ {
timer := tp.Get(20 * time.Millisecond)

select {
case <-timer.C:
t.Errorf("timer expired too early")
continue
default:
}

select {
case <-time.After(100 * time.Millisecond):
t.Errorf("timer didn't expire on time")
case <-timer.C:
}

tp.Put(timer)
}
}

const timeout = 10 * time.Millisecond

func BenchmarkTimerUtilization(b *testing.B) {
b.Run("TimerWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
b.Run("TimerWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.NewTimer(timeout)
t.Stop()
}
})
}

func BenchmarkTimerPoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
}

func BenchmarkTimerNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := time.NewTimer(timeout)
t.Stop()
}
})
}
58 changes: 44 additions & 14 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/timerpool"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -139,11 +140,24 @@ func (c *tsoClient) updateTSODispatcher() {
}

type deadline struct {
timer <-chan time.Time
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}

func newTSDeadline(
timeout time.Duration,
done chan struct{},
cancel context.CancelFunc,
) *deadline {
timer := timerpool.GlobalTimerPool.Get(timeout)
return &deadline{
timer: timer,
done: done,
cancel: cancel,
}
}

func (c *tsoClient) tsCancelLoop() {
defer c.wg.Done()

Expand Down Expand Up @@ -172,19 +186,21 @@ func (c *tsoClient) tsCancelLoop() {

func (c *tsoClient) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
tsDeadlineCh := make(chan *deadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
go func(dc string, tsDeadlineCh <-chan *deadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
log.Error("[tso] tso request is canceled due to timeout", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
timerpool.GlobalTimerPool.Put(d.timer)
case <-d.done:
continue
timerpool.GlobalTimerPool.Put(d.timer)
case <-ctx.Done():
timerpool.GlobalTimerPool.Put(d.timer)
return
}
case <-ctx.Done():
Expand Down Expand Up @@ -234,6 +250,8 @@ func (c *tsoClient) checkAllocator(
}()
cc, u := c.GetTSOAllocatorClientConnByDCLocation(dc)
healthCli := healthpb.NewHealthClient(cc)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
// the pd/allocator leader change, we need to re-establish the stream
if u != url {
Expand All @@ -259,7 +277,7 @@ func (c *tsoClient) checkAllocator(
select {
case <-dispatcherCtx.Done():
return
case <-time.After(time.Second):
case <-ticker.C:
// To ensure we can get the latest allocator leader
// and once the leader is changed, we can exit this function.
_, u = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -366,6 +384,7 @@ func (c *tsoClient) handleDispatcher(

// Loop through each batch of TSO requests and send them for processing.
streamLoopTimer := time.NewTimer(c.option.timeout)
defer streamLoopTimer.Stop()
tsoBatchLoop:
for {
select {
Expand All @@ -389,6 +408,15 @@ tsoBatchLoop:
if maxBatchWaitInterval >= 0 {
tbc.adjustBestBatchSize()
}
// Stop the timer if it's not stopped.
if !streamLoopTimer.Stop() {
select {
case <-streamLoopTimer.C: // try to drain from the channel
default:
}
}
// We need be careful here, see more details in the comments of Timer.Reset.
// https://pkg.go.dev/time@master#Timer.Reset
streamLoopTimer.Reset(c.option.timeout)
// Choose a stream to send the TSO gRPC request.
streamChoosingLoop:
Expand All @@ -403,16 +431,20 @@ tsoBatchLoop:
if c.updateTSOConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) {
continue streamChoosingLoop
}
timer := time.NewTimer(retryInterval)
select {
case <-dispatcherCtx.Done():
timer.Stop()
return
case <-streamLoopTimer.C:
err = errs.ErrClientCreateTSOStream.FastGenByArgs(errs.RetryTimeoutErr)
log.Error("[tso] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err))
c.svcDiscovery.ScheduleCheckMemberChanged()
c.finishRequest(tbc.getCollectedRequests(), 0, 0, 0, errors.WithStack(err))
timer.Stop()
continue tsoBatchLoop
case <-time.After(retryInterval):
case <-timer.C:
timer.Stop()
continue streamChoosingLoop
}
}
Expand All @@ -429,11 +461,7 @@ tsoBatchLoop:
}
}
done := make(chan struct{})
dl := deadline{
timer: time.After(c.option.timeout),
done: done,
cancel: cancel,
}
dl := newTSDeadline(c.option.timeout, done, cancel)
tsDeadlineCh, ok := c.tsDeadline.Load(dc)
for !ok || tsDeadlineCh == nil {
c.scheduleCheckTSDeadline()
Expand All @@ -443,7 +471,7 @@ tsoBatchLoop:
select {
case <-dispatcherCtx.Done():
return
case tsDeadlineCh.(chan deadline) <- dl:
case tsDeadlineCh.(chan *deadline) <- dl:
}
opts = extractSpanReference(tbc, opts[:0])
err = c.processRequests(stream, dc, tbc, opts)
Expand Down Expand Up @@ -558,6 +586,8 @@ func (c *tsoClient) tryConnectToTSO(
}
// retry several times before falling back to the follower when the network problem happens

ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
c.svcDiscovery.ScheduleCheckMemberChanged()
cc, url = c.GetTSOAllocatorClientConnByDCLocation(dc)
Expand Down Expand Up @@ -587,7 +617,7 @@ func (c *tsoClient) tryConnectToTSO(
select {
case <-dispatcherCtx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}

Expand Down
8 changes: 6 additions & 2 deletions client/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,16 @@ func (c *tsoServiceDiscovery) retry(
maxRetryTimes int, retryInterval time.Duration, f func() error,
) error {
var err error
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()
for i := 0; i < maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
select {
case <-c.ctx.Done():
return err
case <-time.After(retryInterval):
case <-ticker.C:
}
}
return errors.WithStack(err)
Expand Down Expand Up @@ -245,11 +247,13 @@ func (c *tsoServiceDiscovery) startCheckMemberLoop() {

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

for {
select {
case <-c.checkMembershipCh:
case <-time.After(memberUpdateInterval):
case <-ticker.C:
case <-ctx.Done():
log.Info("[tso] exit check member loop")
return
Expand Down
4 changes: 3 additions & 1 deletion client/tso_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ func (b *tsoTSOStreamBuilder) build(
}

func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done chan struct{}, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(timeout):
case <-timer.C:
cancel()
case <-ctx.Done():
}
Expand Down
Loading

0 comments on commit ff67696

Please sign in to comment.