Skip to content

Commit

Permalink
fix(serverless/appsec): goroutine leak in TokenTicker tests
Browse files Browse the repository at this point in the history
The tests for `TokenTicker` rely on a chanel to synchronize with token
bucket updates (each update sends one message to that channel). There
was an issue when closing those test tickers, where it is possible the
ticker goroutine may process the closing of the ticker channel before
the stop signal; leading to one last message being sent to the
synchronization channel. This resulted in the bucket updater goroutine
blocking on the channel send indefinitely, as this last send was not
paired with a receive on the other side.

This could result in those tests leaking a modest amount of goroutines,
affecting the goroutine scheduler's performance. When running tests with
`-count`, this could reliably cause tests to time out, once enough
leaked goroutines have accumulated that the scheduler wastes most of its
time on these.
  • Loading branch information
RomainMuller committed Feb 19, 2024
1 parent 2537525 commit 58736bf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ require (
go.opentelemetry.io/collector/extension v0.91.0
go.opentelemetry.io/collector/otelcol v0.91.0
go.opentelemetry.io/collector/processor v0.91.0
go.uber.org/goleak v1.3.0
go4.org/intern v0.0.0-20230525184215-6c62f75575cb
gotest.tools v2.2.0+incompatible
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/serverless/appsec/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (t *TokenTicker) updateBucket(ticksChan <-chan time.Time, startTime time.Ti
func (t *TokenTicker) Start() {
timeNow := time.Now()
t.ticker = time.NewTicker(500 * time.Microsecond)
t.start(t.ticker.C, timeNow, false)
_ = t.start(t.ticker.C, timeNow, false)
}

// start is used for internal testing. Controlling the ticker means being able to test per-tick
Expand Down
62 changes: 36 additions & 26 deletions pkg/serverless/appsec/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"go.uber.org/goleak"
)

func TestLimiterUnit(t *testing.T) {
Expand All @@ -22,7 +23,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("no-ticks-1", func(t *testing.T) {
l := NewTestTicker(1, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
// No ticks between the requests
require.True(t, l.Allow(), "First call to limiter.Allow() should return True")
require.False(t, l.Allow(), "Second call to limiter.Allow() should return False")
Expand All @@ -31,7 +32,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("no-ticks-2", func(t *testing.T) {
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
// No ticks between the requests
for i := 0; i < 100; i++ {
require.True(t, l.Allow())
Expand All @@ -42,7 +43,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("10ms-ticks", func(t *testing.T) {
l := NewTestTicker(1, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
require.True(t, l.Allow(), "First call to limiter.Allow() should return True")
require.False(t, l.Allow(), "Second call to limiter.Allow() should return false")
l.tick(startTime.Add(10 * time.Millisecond))
Expand All @@ -52,7 +53,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("9ms-ticks", func(t *testing.T) {
l := NewTestTicker(1, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
require.True(t, l.Allow(), "First call to limiter.Allow() should return True")
l.tick(startTime.Add(9 * time.Millisecond))
require.False(t, l.Allow(), "Second call to limiter.Allow() after 9ms should return False")
Expand All @@ -63,7 +64,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("1s-rate", func(t *testing.T) {
l := NewTestTicker(1, 1)
l.start(startTime)
defer l.stop()
defer l.stop(t)
require.True(t, l.Allow(), "First call to limiter.Allow() should return True with 1s per token")
l.tick(startTime.Add(500 * time.Millisecond))
require.False(t, l.Allow(), "Second call to limiter.Allow() should return False with 1s per Token")
Expand All @@ -74,7 +75,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("100-requests-burst", func(t *testing.T) {
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
for i := 0; i < 100; i++ {
require.Truef(t, l.Allow(),
"Burst call %d to limiter.Allow() should return True with 100 initial tokens", i)
Expand All @@ -86,7 +87,7 @@ func TestLimiterUnit(t *testing.T) {
t.Run("101-requests-burst", func(t *testing.T) {
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)
for i := 0; i < 100; i++ {
require.Truef(t, l.Allow(),
"Burst call %d to limiter.Allow() should return True with 100 initial tokens", i)
Expand All @@ -100,32 +101,32 @@ func TestLimiterUnit(t *testing.T) {
t.Run("bucket-refill-short", func(t *testing.T) {
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)

for i := 0; i < 1000; i++ {
startTime = startTime.Add(time.Millisecond)
l.tick(startTime)
require.Equalf(t, int64(100), l.t.tokens.Load(), "Bucket should have exactly 100 tokens")
require.Equalf(t, int64(100), l.tokenTicker.tokens.Load(), "Bucket should have exactly 100 tokens")
}
})

t.Run("bucket-refill-long", func(t *testing.T) {
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)

for i := 0; i < 1000; i++ {
startTime = startTime.Add(3 * time.Second)
l.tick(startTime)
}
require.Equalf(t, int64(100), l.t.tokens.Load(), "Bucket should have exactly 100 tokens")
require.Equalf(t, int64(100), l.tokenTicker.tokens.Load(), "Bucket should have exactly 100 tokens")
})

t.Run("allow-after-stop", func(t *testing.T) {
l := NewTestTicker(3, 3)
l.start(startTime)
require.True(t, l.Allow())
l.stop()
l.stop(t)
// The limiter keeps allowing until there's no more tokens
require.True(t, l.Allow())
require.True(t, l.Allow())
Expand All @@ -144,7 +145,7 @@ func TestLimiterUnit(t *testing.T) {
l.tick(startTime.Add(10 * time.Millisecond))
// The limiter has started refilling its tokens
require.True(t, l.Allow())
l.stop()
l.stop(t)
})
}

Expand All @@ -154,6 +155,8 @@ func TestLimiter(t *testing.T) {
// Each goroutine will continuously call the rate limiter for 1 second
for nbUsers := 1; nbUsers <= 10; nbUsers *= 10 {
t.Run(fmt.Sprintf("continuous-requests-%d-users", nbUsers), func(t *testing.T) {
defer goleak.VerifyNone(t)

var startBarrier, stopBarrier sync.WaitGroup
// Create a start barrier to synchronize every goroutine's launch and
// increase the chances of parallel accesses
Expand Down Expand Up @@ -197,11 +200,13 @@ func TestLimiter(t *testing.T) {
// Simulate sporadic bursts during up to 1 minute
for burstAmount := 1; burstAmount <= 10; burstAmount++ {
t.Run(fmt.Sprintf("requests-bursts-%d-iterations", burstAmount), func(t *testing.T) {
defer goleak.VerifyNone(t)

skipped := 0
kept := 0
l := NewTestTicker(100, 100)
l.start(startTime)
defer l.stop()
defer l.stop(t)

for c := 0; c < burstAmount; c++ {
for i := 0; i < burstSize; i++ {
Expand Down Expand Up @@ -270,33 +275,38 @@ func BenchmarkLimiter(b *testing.B) {
// TestTicker is a utility struct used to send hand-crafted ticks to the rate limiter for controlled testing
// It also makes sure to give time to the bucket update goroutine by using the optional sync channel
type TestTicker struct {
C chan time.Time
syncChan <-chan struct{}
t *TokenTicker
ticks chan time.Time
syncChan <-chan struct{}
tokenTicker *TokenTicker
}

func NewTestTicker(tokens, maxTokens int64) *TestTicker {
return &TestTicker{
C: make(chan time.Time),
t: NewTokenTicker(tokens, maxTokens),
ticks: make(chan time.Time),
tokenTicker: NewTokenTicker(tokens, maxTokens),
}
}

func (t *TestTicker) start(timeStamp time.Time) {
t.syncChan = t.t.start(t.C, timeStamp, true)
t.syncChan = t.tokenTicker.start(t.ticks, timeStamp, true)
}

func (t *TestTicker) stop() {
t.t.Stop()
close(t.C)
// syncChan is closed by the token ticker when sure that nothing else will be sent on it
func (t *TestTicker) stop(test *testing.T) {
defer goleak.VerifyNone(test)

t.tokenTicker.Stop()
close(t.ticks)
// syncChan is closed by the token ticker when sure that nothing else will be sent on it. We read from it to avoid
// possibly leaking a goroutine in case the `select` internal to the `TokenTicker` picks up the closure of `t.ticks`
// before/instead of its stop signal.
<-t.syncChan
}

func (t *TestTicker) tick(timeStamp time.Time) {
t.C <- timeStamp
t.ticks <- timeStamp
<-t.syncChan
}

func (t *TestTicker) Allow() bool {
return t.t.Allow()
return t.tokenTicker.Allow()
}

0 comments on commit 58736bf

Please sign in to comment.