Skip to content

Commit

Permalink
[aggregator] Lockless ratelimiter (#2988)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 8, 2020
1 parent 8e3a0c3 commit 2de0ce0
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 104 deletions.
30 changes: 11 additions & 19 deletions src/aggregator/aggregator/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -179,6 +181,7 @@ type Entry struct {
// The entry keeps a decompressor to reuse the bitset in it, so we can
// save some heap allocations.
decompressor aggregation.IDDecompressor
nowFn clock.NowFn
}

// NewEntry creates a new entry.
Expand All @@ -188,6 +191,8 @@ func NewEntry(lists *metricLists, runtimeOpts runtime.Options, opts Options) *En
aggregations: make(aggregationValues, 0, initialAggregationCapacity),
metrics: newEntryMetrics(scope),
decompressor: aggregation.NewPooledIDDecompressor(opts.AggregationTypesOptions().TypesPool()),
rateLimiter: rate.NewLimiter(0),
nowFn: opts.ClockOptions().NowFn(),
}
e.ResetSetData(lists, runtimeOpts, opts)
return e
Expand All @@ -211,7 +216,7 @@ func (e *Entry) ResetSetData(lists *metricLists, runtimeOpts runtime.Options, op
e.cutoverNanos = uninitializedCutoverNanos
e.lists = lists
e.numWriters = 0
e.recordLastAccessed(e.opts.ClockOptions().NowFn()())
e.recordLastAccessed(e.nowFn())
e.Unlock()
}

Expand Down Expand Up @@ -370,7 +375,7 @@ func (e *Entry) addUntimed(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -652,7 +657,7 @@ func (e *Entry) addTimed(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -888,7 +893,7 @@ func (e *Entry) addForwarded(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -1047,26 +1052,13 @@ func (e *Entry) shouldExpire(now time.Time) bool {

func (e *Entry) resetRateLimiterWithLock(runtimeOpts runtime.Options) {
newLimit := runtimeOpts.WriteValuesPerMetricLimitPerSecond()
if newLimit <= 0 {
e.rateLimiter = nil
return
}
if e.rateLimiter == nil {
nowFn := e.opts.ClockOptions().NowFn()
e.rateLimiter = rate.NewLimiter(newLimit, nowFn)
return
}
e.rateLimiter.Reset(newLimit)
}

func (e *Entry) applyValueRateLimit(numValues int64, m rateLimitEntryMetrics) error {
e.RLock()
rateLimiter := e.rateLimiter
e.RUnlock()
if rateLimiter == nil {
return nil
}
if rateLimiter.IsAllowed(numValues) {

if rateLimiter.IsAllowed(numValues, xtime.ToUnixNano(e.nowFn())) {
return nil
}
m.valueRateLimitExceeded.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregator/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestEntryResetSetData(t *testing.T) {
e, lists, now := testEntry(ctrl, testEntryOptions{})

require.False(t, e.closed)
require.Nil(t, e.rateLimiter)
require.Equal(t, int64(0), e.rateLimiter.Limit())
require.False(t, e.hasDefaultMetadatas)
require.Equal(t, int64(uninitializedCutoverNanos), e.cutoverNanos)
require.Equal(t, lists, e.lists)
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregator/forwarded_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestForwardedWriterUnregisterWriterClosed(t *testing.T) {
aggKey = testForwardedWriterAggregationKey
)

w.Close()
require.NoError(t, w.Close())
require.Equal(t, errForwardedWriterClosed, w.Unregister(mt, mid, aggKey))
}

Expand Down
16 changes: 3 additions & 13 deletions src/aggregator/aggregator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/x/clock"
xresource "github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -117,6 +118,7 @@ func newMetricMap(shard uint32, opts Options) *metricMap {
metricLists := newMetricLists(shard, opts)
scope := opts.InstrumentOptions().MetricsScope().SubScope("map")
m := &metricMap{
rateLimiter: rate.NewLimiter(0),
shard: shard,
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
Expand Down Expand Up @@ -460,30 +462,18 @@ func (m *metricMap) forEachEntry(entryFn hashedEntryFn) {

func (m *metricMap) resetRateLimiterWithLock(runtimeOpts runtime.Options) {
newLimit := runtimeOpts.WriteNewMetricLimitPerShardPerSecond()
if newLimit <= 0 {
m.rateLimiter = nil
return
}
if m.rateLimiter == nil {
nowFn := m.opts.ClockOptions().NowFn()
m.rateLimiter = rate.NewLimiter(newLimit, nowFn)
return
}
m.rateLimiter.Reset(newLimit)
}

func (m *metricMap) applyNewMetricRateLimitWithLock(now time.Time) error {
if m.rateLimiter == nil {
return nil
}
// If we are still in the warmup phase and possibly ingesting a large amount
// of new metrics, no rate limit is applied.
noLimitWarmupDuration := m.runtimeOpts.WriteNewMetricNoLimitWarmupDuration()
if warmupEnd := m.firstInsertAt.Add(noLimitWarmupDuration); now.Before(warmupEnd) {
m.metrics.noRateLimitWarmup.Inc(1)
return nil
}
if m.rateLimiter.IsAllowed(1) {
if m.rateLimiter.IsAllowed(1, xtime.ToUnixNano(now)) {
return nil
}
m.metrics.newMetricRateLimitExceeded.Inc(1)
Expand Down
4 changes: 2 additions & 2 deletions src/aggregator/aggregator/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func TestMetricMapSetRuntimeOptions(t *testing.T) {
require.NoError(t, m.AddUntimed(testBatchTimer, testDefaultStagedMetadatas))
require.NoError(t, m.AddUntimed(testGauge, testDefaultStagedMetadatas))

// Assert no entries have rate limiters.
// Assert no entries have rate limits.
runtimeOpts := runtime.NewOptions()
require.Equal(t, runtimeOpts, m.runtimeOpts)
for elem := m.entryList.Front(); elem != nil; elem = elem.Next() {
require.Nil(t, elem.Value.(hashedEntry).entry.rateLimiter)
require.Equal(t, int64(0), elem.Value.(hashedEntry).entry.rateLimiter.Limit())
}

// Update runtime options and assert all entries now have rate limiters.
Expand Down
84 changes: 39 additions & 45 deletions src/aggregator/rate/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,77 +21,71 @@
package rate

import (
"sync"
"sync/atomic"
"time"

"github.com/m3db/m3/src/x/clock"
"go.uber.org/atomic"
"golang.org/x/sys/cpu"

xtime "github.com/m3db/m3/src/x/time"
)

var (
zeroTime = time.Unix(0, 0)
zeroTime = xtime.UnixNano(0)
)

// Limiter is a simple rate limiter to control how frequently events are allowed to happen.
// It trades some accuracy on resets for speed.
type Limiter struct {
sync.RWMutex

limitPerSecond int64
nowFn clock.NowFn

alignedLast time.Time
allowed int64
limitPerSecond atomic.Int64
alignedLast atomic.Int64
_ cpu.CacheLinePad // prevent false sharing
allowed atomic.Int64
}

// NewLimiter creates a new rate limiter.
func NewLimiter(l int64, fn clock.NowFn) *Limiter {
return &Limiter{
limitPerSecond: l,
nowFn: fn,
}
func NewLimiter(l int64) *Limiter {
limiter := &Limiter{}
limiter.limitPerSecond.Store(l)
return limiter
}

// Limit returns the current limit.
// Limit returns the current limit. A zero limit means no limit is set.
func (l *Limiter) Limit() int64 {
l.RLock()
limit := l.limitPerSecond
l.RUnlock()
return limit
return l.limitPerSecond.Load()
}

// IsAllowed returns whether n events may happen now.
// NB(xichen): If a large request comes in, this could potentially block following
// requests in the same second from going through. This is a non-issue if the limit
// is much bigger than the typical batch size, which is indeed the case in the aggregation
// layer as each batch size is usually fairly small.
func (l *Limiter) IsAllowed(n int64) bool {
alignedNow := l.nowFn().Truncate(time.Second)
l.RLock()
if !alignedNow.After(l.alignedLast) {
isAllowed := atomic.AddInt64(&l.allowed, n) <= l.limitPerSecond
l.RUnlock()
return isAllowed
// The limiter is racy on window changes, and can be overly aggressive rejecting requests.
// As the limits are usually at least 10k+, the error is worth the speedup.
func (l *Limiter) IsAllowed(n int64, now xtime.UnixNano) bool {
limit := l.Limit()
if limit <= 0 {
return true
}
l.RUnlock()

l.Lock()
if !alignedNow.After(l.alignedLast) {
isAllowed := atomic.AddInt64(&l.allowed, n) <= l.limitPerSecond
l.Unlock()
return isAllowed
var (
allowed = l.allowed.Add(n)
alignedNow = now - now%xtime.UnixNano(time.Second) // truncate to second boundary
alignedLast = xtime.UnixNano(l.alignedLast.Load())
)

if alignedNow > alignedLast && l.alignedLast.CAS(int64(alignedLast), int64(alignedNow)) {
l.allowed.Store(n)
allowed = n
}
l.alignedLast = alignedNow
l.allowed = n
isAllowed := l.allowed <= l.limitPerSecond
l.Unlock()
return isAllowed

return allowed <= limit
}

// Reset resets the internal state.
// Reset resets the internal state. It does not reset all the values atomically,
// but it should not be a problem, as dynamic rate limits in aggregator
// are usually reset under a lock.
func (l *Limiter) Reset(limit int64) {
l.Lock()
l.alignedLast = zeroTime
l.allowed = 0
l.limitPerSecond = limit
l.Unlock()
l.allowed.Store(0)
l.alignedLast.Store(int64(zeroTime))
l.limitPerSecond.Store(limit)
}
82 changes: 62 additions & 20 deletions src/aggregator/rate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,90 @@
package rate

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/require"

xtime "github.com/m3db/m3/src/x/time"
)

func xnow() xtime.UnixNano {
return xtime.ToUnixNano(time.Now().Truncate(time.Second))
}

func BenchmarkLimiter(b *testing.B) {
var (
allowedPerSecond int64 = 100
now = xnow()
limiter = NewLimiter(allowedPerSecond)
)

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var allowed bool
for i := int64(0); i <= allowedPerSecond+1; i++ {
allowed = limiter.IsAllowed(1, now)
}

if allowed {
b.Fatalf("expected limit to be hit")
}
}
})
require.Equal(b, allowedPerSecond, limiter.Limit())
}

func TestLimiterLimit(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var allowedPerSecond int64 = 10

limiter := NewLimiter(allowedPerSecond, nowFn)
limiter := NewLimiter(allowedPerSecond)
require.Equal(t, allowedPerSecond, limiter.Limit())
}

func TestLimiterIsAllowed(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var (
allowedPerSecond int64 = 10
now = xnow()
)

limiter := NewLimiter(allowedPerSecond, nowFn)
require.True(t, limiter.IsAllowed(5))
limiter := NewLimiter(allowedPerSecond)
require.True(t, limiter.IsAllowed(5, now))
for i := 0; i < 5; i++ {
now = now.Add(100 * time.Millisecond)
require.True(t, limiter.IsAllowed(1))
now += xtime.UnixNano(100 * time.Millisecond)
require.True(t, limiter.IsAllowed(1, now))
}
require.False(t, limiter.IsAllowed(1))
require.False(t, limiter.IsAllowed(1, now))

// Advance time to the next second and confirm the quota is reset.
now = now.Add(time.Second)
require.True(t, limiter.IsAllowed(5))
now += xtime.UnixNano(time.Second)
require.True(t, limiter.IsAllowed(5, now))
}

func TestLimiterUnlimited(t *testing.T) {
var (
unlimitedLimit int64 = 0
now = xnow()
)

limiter := NewLimiter(unlimitedLimit)
require.True(t, limiter.IsAllowed(math.MaxInt64, now))

limiter.Reset(1)
require.False(t, limiter.IsAllowed(2, now))
}

func TestLimiterReset(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var (
allowedPerSecond int64 = 10
now = xnow()
)

limiter := NewLimiter(allowedPerSecond, nowFn)
require.False(t, limiter.IsAllowed(20))
limiter := NewLimiter(allowedPerSecond)
require.False(t, limiter.IsAllowed(20, now))

// Resetting to a higher limit and confirm all requests are allowed.
limiter.Reset(20)
require.True(t, limiter.IsAllowed(20))
require.True(t, limiter.IsAllowed(20, now))
}
Loading

0 comments on commit 2de0ce0

Please sign in to comment.