Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: fix memory leak introduced by timer.After #6730

Merged
merged 2 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions pkg/timerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"sync"
"time"
)

// GlobalTimerPool is a global pool for reusing *time.Timer.
var GlobalTimerPool TimerPool

// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
type TimerPool struct {
pool sync.Pool
}

// Get returns a timer with a given duration.
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
if v := tp.pool.Get(); v != nil {
timer := v.(*time.Timer)
timer.Reset(d)
return timer
}
return time.NewTimer(d)
}

// Put tries to call timer.Stop() before putting it back into pool,
// if the timer.Stop() returns false (it has either already expired or been stopped),
// have a shot at draining the channel with residual time if there is one.
func (tp *TimerPool) Put(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
tp.pool.Put(timer)
}
70 changes: 70 additions & 0 deletions pkg/timerpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"testing"
"time"
)

func TestTimerPool(t *testing.T) {
var tp TimerPool

for i := 0; i < 100; i++ {
timer := tp.Get(20 * time.Millisecond)

select {
case <-timer.C:
t.Errorf("timer expired too early")
continue
default:
}

select {
case <-time.After(100 * time.Millisecond):
t.Errorf("timer didn't expire on time")
case <-timer.C:
}

tp.Put(timer)
}
}

const timeout = 10 * time.Millisecond

func BenchmarkTimerUtilization(b *testing.B) {
b.Run("TimerWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
b.Run("TimerWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.NewTimer(timeout)
t.Stop()
}
})
}

func BenchmarkTimerPoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
}

func BenchmarkTimerNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := time.NewTimer(timeout)
t.Stop()
}
})
}
17 changes: 11 additions & 6 deletions pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/timerpool"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -197,7 +198,7 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical

// TSDeadline is used to watch the deadline of each tso request.
type TSDeadline struct {
timer <-chan time.Time
timer *time.Timer
done chan struct{}
cancel context.CancelFunc
}
Expand All @@ -208,8 +209,9 @@ func NewTSDeadline(
done chan struct{},
cancel context.CancelFunc,
) *TSDeadline {
timer := timerpool.GlobalTimerPool.Get(timeout)
return &TSDeadline{
timer: time.After(timeout),
timer: timer,
done: done,
cancel: cancel,
}
Expand All @@ -224,13 +226,15 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
case <-d.timer.C:
log.Error("tso proxy request processing is canceled due to timeout",
errs.ZapError(errs.ErrProxyTSOTimeout))
d.cancel()
timerpool.GlobalTimerPool.Put(d.timer)
case <-d.done:
continue
timerpool.GlobalTimerPool.Put(d.timer)
case <-ctx.Done():
timerpool.GlobalTimerPool.Put(d.timer)
return
}
case <-ctx.Done():
Expand All @@ -241,11 +245,12 @@ func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) {

func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()

timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(3 * time.Second):
case <-timer.C:
cancel()
case <-streamCtx.Done():
}
Expand Down
12 changes: 9 additions & 3 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,15 @@ func (s *tsoServer) Send(m *pdpb.TsoResponse) error {
})
done <- s.stream.Send(m)
}()
timer := time.NewTimer(tsoutil.DefaultTSOProxyTimeout)
defer timer.Stop()
select {
case err := <-done:
if err != nil {
atomic.StoreInt32(&s.closed, 1)
}
return errors.WithStack(err)
case <-time.After(tsoutil.DefaultTSOProxyTimeout):
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return ErrForwardTSOTimeout
}
Expand All @@ -633,14 +635,16 @@ func (s *tsoServer) Recv(timeout time.Duration) (*pdpb.TsoRequest, error) {
request, err := s.stream.Recv()
requestCh <- &pdpbTSORequest{request: request, err: err}
}()
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case req := <-requestCh:
if req.err != nil {
atomic.StoreInt32(&s.closed, 1)
return nil, errors.WithStack(req.err)
}
return req.request, nil
case <-time.After(timeout):
case <-timer.C:
atomic.StoreInt32(&s.closed, 1)
return nil, ErrTSOProxyRecvFromClientTimeout
}
Expand Down Expand Up @@ -2173,10 +2177,12 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient
// TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly.
func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) {
defer logutil.LogPanic()
timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
select {
case <-done:
return
case <-time.After(3 * time.Second):
case <-timer.C:
cancel()
case <-streamCtx.Done():
}
Expand Down