Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Lockless ratelimiter #2988

Merged
merged 11 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions src/aggregator/aggregator/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -179,6 +181,7 @@ type Entry struct {
// The entry keeps a decompressor to reuse the bitset in it, so we can
// save some heap allocations.
decompressor aggregation.IDDecompressor
nowFn clock.NowFn
}

// NewEntry creates a new entry.
Expand All @@ -188,6 +191,8 @@ func NewEntry(lists *metricLists, runtimeOpts runtime.Options, opts Options) *En
aggregations: make(aggregationValues, 0, initialAggregationCapacity),
metrics: newEntryMetrics(scope),
decompressor: aggregation.NewPooledIDDecompressor(opts.AggregationTypesOptions().TypesPool()),
rateLimiter: rate.NewLimiter(0),
nowFn: opts.ClockOptions().NowFn(),
}
e.ResetSetData(lists, runtimeOpts, opts)
return e
Expand All @@ -211,7 +216,7 @@ func (e *Entry) ResetSetData(lists *metricLists, runtimeOpts runtime.Options, op
e.cutoverNanos = uninitializedCutoverNanos
e.lists = lists
e.numWriters = 0
e.recordLastAccessed(e.opts.ClockOptions().NowFn()())
e.recordLastAccessed(e.nowFn())
e.Unlock()
}

Expand Down Expand Up @@ -370,7 +375,7 @@ func (e *Entry) addUntimed(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -652,7 +657,7 @@ func (e *Entry) addTimed(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -888,7 +893,7 @@ func (e *Entry) addForwarded(
// so it is guaranteed that actions before when a write lock is acquired
// must have all completed. This is used to ensure we never write metrics
// for times that have already been flushed.
currTime := e.opts.ClockOptions().NowFn()()
currTime := e.nowFn()
e.recordLastAccessed(currTime)

e.RLock()
Expand Down Expand Up @@ -1047,26 +1052,13 @@ func (e *Entry) shouldExpire(now time.Time) bool {

func (e *Entry) resetRateLimiterWithLock(runtimeOpts runtime.Options) {
newLimit := runtimeOpts.WriteValuesPerMetricLimitPerSecond()
if newLimit <= 0 {
e.rateLimiter = nil
return
}
if e.rateLimiter == nil {
nowFn := e.opts.ClockOptions().NowFn()
e.rateLimiter = rate.NewLimiter(newLimit, nowFn)
return
}
e.rateLimiter.Reset(newLimit)
}

func (e *Entry) applyValueRateLimit(numValues int64, m rateLimitEntryMetrics) error {
e.RLock()
rateLimiter := e.rateLimiter
e.RUnlock()
if rateLimiter == nil {
return nil
}
if rateLimiter.IsAllowed(numValues) {

if rateLimiter.IsAllowed(numValues, xtime.ToUnixNano(e.nowFn())) {
return nil
}
m.valueRateLimitExceeded.Inc(1)
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregator/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func TestEntryResetSetData(t *testing.T) {
e, lists, now := testEntry(ctrl, testEntryOptions{})

require.False(t, e.closed)
require.Nil(t, e.rateLimiter)
require.Equal(t, int64(0), e.rateLimiter.Limit())
require.False(t, e.hasDefaultMetadatas)
require.Equal(t, int64(uninitializedCutoverNanos), e.cutoverNanos)
require.Equal(t, lists, e.lists)
Expand Down
2 changes: 1 addition & 1 deletion src/aggregator/aggregator/forwarded_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestForwardedWriterUnregisterWriterClosed(t *testing.T) {
aggKey = testForwardedWriterAggregationKey
)

w.Close()
require.NoError(t, w.Close())
require.Equal(t, errForwardedWriterClosed, w.Unregister(mt, mid, aggKey))
}

Expand Down
16 changes: 3 additions & 13 deletions src/aggregator/aggregator/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/metrics/metric/unaggregated"
"github.com/m3db/m3/src/x/clock"
xresource "github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
)
Expand Down Expand Up @@ -117,6 +118,7 @@ func newMetricMap(shard uint32, opts Options) *metricMap {
metricLists := newMetricLists(shard, opts)
scope := opts.InstrumentOptions().MetricsScope().SubScope("map")
m := &metricMap{
rateLimiter: rate.NewLimiter(0),
shard: shard,
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
Expand Down Expand Up @@ -460,30 +462,18 @@ func (m *metricMap) forEachEntry(entryFn hashedEntryFn) {

func (m *metricMap) resetRateLimiterWithLock(runtimeOpts runtime.Options) {
newLimit := runtimeOpts.WriteNewMetricLimitPerShardPerSecond()
if newLimit <= 0 {
m.rateLimiter = nil
return
}
if m.rateLimiter == nil {
nowFn := m.opts.ClockOptions().NowFn()
m.rateLimiter = rate.NewLimiter(newLimit, nowFn)
return
}
m.rateLimiter.Reset(newLimit)
}

func (m *metricMap) applyNewMetricRateLimitWithLock(now time.Time) error {
if m.rateLimiter == nil {
return nil
}
// If we are still in the warmup phase and possibly ingesting a large amount
// of new metrics, no rate limit is applied.
noLimitWarmupDuration := m.runtimeOpts.WriteNewMetricNoLimitWarmupDuration()
if warmupEnd := m.firstInsertAt.Add(noLimitWarmupDuration); now.Before(warmupEnd) {
m.metrics.noRateLimitWarmup.Inc(1)
return nil
}
if m.rateLimiter.IsAllowed(1) {
if m.rateLimiter.IsAllowed(1, xtime.ToUnixNano(now)) {
return nil
}
m.metrics.newMetricRateLimitExceeded.Inc(1)
Expand Down
4 changes: 2 additions & 2 deletions src/aggregator/aggregator/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func TestMetricMapSetRuntimeOptions(t *testing.T) {
require.NoError(t, m.AddUntimed(testBatchTimer, testDefaultStagedMetadatas))
require.NoError(t, m.AddUntimed(testGauge, testDefaultStagedMetadatas))

// Assert no entries have rate limiters.
// Assert no entries have rate limits.
runtimeOpts := runtime.NewOptions()
require.Equal(t, runtimeOpts, m.runtimeOpts)
for elem := m.entryList.Front(); elem != nil; elem = elem.Next() {
require.Nil(t, elem.Value.(hashedEntry).entry.rateLimiter)
require.Equal(t, int64(0), elem.Value.(hashedEntry).entry.rateLimiter.Limit())
}

// Update runtime options and assert all entries now have rate limiters.
Expand Down
84 changes: 39 additions & 45 deletions src/aggregator/rate/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,77 +21,71 @@
package rate

import (
"sync"
"sync/atomic"
"time"

"github.com/m3db/m3/src/x/clock"
"go.uber.org/atomic"
"golang.org/x/sys/cpu"

xtime "github.com/m3db/m3/src/x/time"
)

var (
zeroTime = time.Unix(0, 0)
zeroTime = xtime.UnixNano(0)
)

// Limiter is a simple rate limiter to control how frequently events are allowed to happen.
// It trades some accuracy on resets for speed.
type Limiter struct {
sync.RWMutex

limitPerSecond int64
nowFn clock.NowFn

alignedLast time.Time
allowed int64
limitPerSecond atomic.Int64
alignedLast atomic.Int64
_ cpu.CacheLinePad // prevent false sharing
allowed atomic.Int64
}

// NewLimiter creates a new rate limiter.
func NewLimiter(l int64, fn clock.NowFn) *Limiter {
return &Limiter{
limitPerSecond: l,
nowFn: fn,
}
func NewLimiter(l int64) *Limiter {
limiter := &Limiter{}
limiter.limitPerSecond.Store(l)
return limiter
}

// Limit returns the current limit.
// Limit returns the current limit. A zero limit means no limit is set.
func (l *Limiter) Limit() int64 {
l.RLock()
limit := l.limitPerSecond
l.RUnlock()
return limit
return l.limitPerSecond.Load()
}

// IsAllowed returns whether n events may happen now.
// NB(xichen): If a large request comes in, this could potentially block following
// requests in the same second from going through. This is a non-issue if the limit
// is much bigger than the typical batch size, which is indeed the case in the aggregation
// layer as each batch size is usually fairly small.
func (l *Limiter) IsAllowed(n int64) bool {
alignedNow := l.nowFn().Truncate(time.Second)
l.RLock()
if !alignedNow.After(l.alignedLast) {
isAllowed := atomic.AddInt64(&l.allowed, n) <= l.limitPerSecond
l.RUnlock()
return isAllowed
// The limiter is racy on window changes, and can be overly aggressive rejecting requests.
// As the limits are usually at least 10k+, the error is worth the speedup.
func (l *Limiter) IsAllowed(n int64, now xtime.UnixNano) bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside, IsAllowed probably isn't the right verbiage here (i know you didn't write it, but just pointing it out) - "is"-style names sound like accessors or readonly evaluations, whereas an "Add" or something would make it clear that the call may mutate internal state.

not asking you to change it here, might be a rainy day thing we clean up later when we're doing other hygiene things.

limit := l.Limit()
if limit <= 0 {
return true
}
l.RUnlock()

l.Lock()
if !alignedNow.After(l.alignedLast) {
isAllowed := atomic.AddInt64(&l.allowed, n) <= l.limitPerSecond
l.Unlock()
return isAllowed
var (
allowed = l.allowed.Add(n)
alignedNow = now - now%xtime.UnixNano(time.Second) // truncate to second boundary
alignedLast = xtime.UnixNano(l.alignedLast.Load())
)

if alignedNow > alignedLast && l.alignedLast.CAS(int64(alignedLast), int64(alignedNow)) {
l.allowed.Store(n)
allowed = n
}
l.alignedLast = alignedNow
l.allowed = n
isAllowed := l.allowed <= l.limitPerSecond
l.Unlock()
return isAllowed

return allowed <= limit
}

// Reset resets the internal state.
// Reset resets the internal state. It does not reset all the values atomically,
// but it should not be a problem, as dynamic rate limits in aggregator
// are usually reset under a lock.
func (l *Limiter) Reset(limit int64) {
l.Lock()
l.alignedLast = zeroTime
l.allowed = 0
l.limitPerSecond = limit
l.Unlock()
l.allowed.Store(0)
l.alignedLast.Store(int64(zeroTime))
l.limitPerSecond.Store(limit)
}
82 changes: 62 additions & 20 deletions src/aggregator/rate/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,48 +21,90 @@
package rate

import (
"math"
"testing"
"time"

"github.com/stretchr/testify/require"

xtime "github.com/m3db/m3/src/x/time"
)

func xnow() xtime.UnixNano {
return xtime.ToUnixNano(time.Now().Truncate(time.Second))
}

func BenchmarkLimiter(b *testing.B) {
var (
allowedPerSecond int64 = 100
now = xnow()
limiter = NewLimiter(allowedPerSecond)
)

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
var allowed bool
for i := int64(0); i <= allowedPerSecond+1; i++ {
allowed = limiter.IsAllowed(1, now)
}

if allowed {
b.Fatalf("expected limit to be hit")
}
}
})
require.Equal(b, allowedPerSecond, limiter.Limit())
}

func TestLimiterLimit(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var allowedPerSecond int64 = 10

limiter := NewLimiter(allowedPerSecond, nowFn)
limiter := NewLimiter(allowedPerSecond)
require.Equal(t, allowedPerSecond, limiter.Limit())
}

func TestLimiterIsAllowed(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var (
allowedPerSecond int64 = 10
now = xnow()
)

limiter := NewLimiter(allowedPerSecond, nowFn)
require.True(t, limiter.IsAllowed(5))
limiter := NewLimiter(allowedPerSecond)
require.True(t, limiter.IsAllowed(5, now))
for i := 0; i < 5; i++ {
now = now.Add(100 * time.Millisecond)
require.True(t, limiter.IsAllowed(1))
now += xtime.UnixNano(100 * time.Millisecond)
require.True(t, limiter.IsAllowed(1, now))
}
require.False(t, limiter.IsAllowed(1))
require.False(t, limiter.IsAllowed(1, now))

// Advance time to the next second and confirm the quota is reset.
now = now.Add(time.Second)
require.True(t, limiter.IsAllowed(5))
now += xtime.UnixNano(time.Second)
require.True(t, limiter.IsAllowed(5, now))
}

func TestLimiterUnlimited(t *testing.T) {
var (
unlimitedLimit int64 = 0
now = xnow()
)

limiter := NewLimiter(unlimitedLimit)
require.True(t, limiter.IsAllowed(math.MaxInt64, now))

limiter.Reset(1)
require.False(t, limiter.IsAllowed(2, now))
}

func TestLimiterReset(t *testing.T) {
allowedPerSecond := int64(10)
now := time.Now().Truncate(time.Second)
nowFn := func() time.Time { return now }
var (
allowedPerSecond int64 = 10
now = xnow()
)

limiter := NewLimiter(allowedPerSecond, nowFn)
require.False(t, limiter.IsAllowed(20))
limiter := NewLimiter(allowedPerSecond)
require.False(t, limiter.IsAllowed(20, now))

// Resetting to a higher limit and confirm all requests are allowed.
limiter.Reset(20)
require.True(t, limiter.IsAllowed(20))
require.True(t, limiter.IsAllowed(20, now))
}
Loading