diff --git a/.gitmodules b/.gitmodules index f4515068c..2b44c1ae4 100644 --- a/.gitmodules +++ b/.gitmodules @@ -153,4 +153,7 @@ url = https://github.com/jmoiron/sqlx [submodule "src/github.com/go-sql-driver/mysql"] path = src/github.com/go-sql-driver/mysql - url = https://github.com/go-sql-driver/mysql \ No newline at end of file + url = https://github.com/go-sql-driver/mysql +[submodule "src/golang.org/x/time"] + path = src/golang.org/x/time + url = https://github.com/golang/time diff --git a/src/autoscaler/ratelimiter/rate_limiter.go b/src/autoscaler/ratelimiter/rate_limiter.go index b94d0528e..67b4721c0 100644 --- a/src/autoscaler/ratelimiter/rate_limiter.go +++ b/src/autoscaler/ratelimiter/rate_limiter.go @@ -12,11 +12,6 @@ const ( defaultExpireCheckInterval = 30 * time.Second ) -type Stats []Stat -type Stat struct { - Key string `json:"key"` - Available int `json:"available"` -} type Limiter interface { ExceedsLimit(string) bool @@ -39,20 +34,10 @@ func NewRateLimiter(bucketCapacity int, maxAmount int, validDuration time.Durati } func (r *RateLimiter) ExceedsLimit(key string) bool { - if _, err := r.store.Increment(key); err != nil { + if err := r.store.Increment(key); err != nil { return true } return false } -func (r *RateLimiter) GetStats() Stats { - s := Stats{} - for k, v := range r.store.Stats() { - s = append(s, Stat{ - Key: k, - Available: v, - }) - } - return s -} diff --git a/src/autoscaler/ratelimiter/rate_limiter_test.go b/src/autoscaler/ratelimiter/rate_limiter_test.go index 9e701f238..b7a235cd2 100644 --- a/src/autoscaler/ratelimiter/rate_limiter_test.go +++ b/src/autoscaler/ratelimiter/rate_limiter_test.go @@ -70,25 +70,6 @@ var _ = Describe("RateLimiter", func() { }) - Describe("GetStats", func() { - BeforeEach(func() { - limiter = NewRateLimiter(bucketCapacity, maxAmount, validDuration, expireDuration, expireCheckInterval, NewLogger("ratelimiter")) - }) - - It("reports stats ", func() { - for i := 5; i < bucketCapacity; i++ { - key := "192.168.1.100" - Expect(limiter.ExceedsLimit(key)).To(BeFalse()) - } - for i := 7; i < bucketCapacity; i++ { - key := "192.168.1.101" - Expect(limiter.ExceedsLimit(key)).To(BeFalse()) - } - - stats := limiter.GetStats() - Expect(len(stats)).To(Equal(2)) - }) - }) Describe("Expire", func() { BeforeEach(func() { @@ -101,11 +82,10 @@ var _ = Describe("RateLimiter", func() { Expect(limiter.ExceedsLimit(key)).To(BeFalse()) } Expect(limiter.ExceedsLimit(key)).To(BeTrue()) - Expect(len(limiter.GetStats())).To(Equal(1)) Sleep(expireDuration + expireCheckInterval) - Expect(len(limiter.GetStats())).To(Equal(0)) Expect(limiter.ExceedsLimit(key)).To(BeFalse()) }) }) }) + diff --git a/src/autoscaler/ratelimiter/store.go b/src/autoscaler/ratelimiter/store.go index 8c244cd9d..c9e25c610 100644 --- a/src/autoscaler/ratelimiter/store.go +++ b/src/autoscaler/ratelimiter/store.go @@ -6,12 +6,11 @@ import ( "time" "code.cloudfoundry.org/lager" - "github.com/juju/ratelimit" + "golang.org/x/time/rate" ) type Store interface { - Increment(string) (int, error) - Stats() map[string]int + Increment(string) ( error) } type InMemoryStore struct { @@ -26,7 +25,7 @@ type InMemoryStore struct { } type entry struct { - bucket *ratelimit.Bucket + limiter *rate.Limiter expiredAt time.Time sync.RWMutex } @@ -59,24 +58,24 @@ func NewStore(bucketCapacity int, maxAmount int, validDuration time.Duration, ex } func newEntry(validDuration time.Duration, bucketCapacity int, maxAmount int) *entry { + limit := 1e9 * float64(maxAmount)/float64(validDuration) return &entry{ - bucket: ratelimit.NewBucketWithQuantum(validDuration, int64(bucketCapacity), int64(maxAmount)), + limiter: rate.NewLimiter(rate.Limit(limit), bucketCapacity ), } } -func (s *InMemoryStore) Increment(key string) (int, error) { +func (s *InMemoryStore) Increment(key string) (error) { v, ok := s.get(key) if !ok { v = newEntry(s.validDuration, s.bucketCapacity, s.maxAmount) } v.SetExpire(time.Now().Add(s.expireDuration)) - if avail := v.bucket.Available(); avail == 0 { + if v.limiter.Allow() == false { s.set(key, v) - return int(avail), errors.New("empty bucket") + return errors.New("empty bucket") } - v.bucket.Take(1) s.set(key, v) - return int(v.bucket.Available()), nil + return nil } func (s *InMemoryStore) get(key string) (*entry, bool) { @@ -107,21 +106,3 @@ func (s *InMemoryStore) expiryCycle() { } }() } - -func (s *InMemoryStore) Available(key string) int { - v, ok := s.get(key) - if !ok { - return 0 - } - return int(v.bucket.Available()) -} - -func (s *InMemoryStore) Stats() map[string]int { - m := make(map[string]int) - s.Lock() - for k, v := range s.storage { - m[k] = int(v.bucket.Available()) - } - s.Unlock() - return m -} diff --git a/src/autoscaler/ratelimiter/store_test.go b/src/autoscaler/ratelimiter/store_test.go index 84175a55f..35e27b883 100644 --- a/src/autoscaler/ratelimiter/store_test.go +++ b/src/autoscaler/ratelimiter/store_test.go @@ -24,6 +24,7 @@ var _ = Describe("Store", func() { var ( store Store + exceedErrMessage = "empty bucket" ) Describe("Increment", func() { @@ -34,23 +35,21 @@ var _ = Describe("Store", func() { It("shows available", func() { for i := 1; i < bucketCapacity+1; i++ { - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).ToNot(HaveOccurred()) - Expect(avail).To(Equal(bucketCapacity - i)) } - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).To(HaveOccurred()) - Expect(avail).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring(exceedErrMessage)) Sleep(validDuration) for i := 1; i < maxAmount+1; i++ { - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).ToNot(HaveOccurred()) - Expect(avail).To(Equal(maxAmount - i)) } - avail, err = store.Increment("foo") + err = store.Increment("foo") Expect(err).To(HaveOccurred()) - Expect(avail).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring(exceedErrMessage)) }) }) @@ -61,73 +60,44 @@ var _ = Describe("Store", func() { It("shows available", func() { for i := 1; i < bucketCapacity+1; i++ { - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).ToNot(HaveOccurred()) - Expect(avail).To(Equal(bucketCapacity - i)) } - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).To(HaveOccurred()) - Expect(avail).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring(exceedErrMessage)) Sleep(longerValidDuration) for i := 1; i < moreMaxAmount+1; i++ { - avail, err := store.Increment("foo") + err := store.Increment("foo") Expect(err).ToNot(HaveOccurred()) - Expect(avail).To(Equal(moreMaxAmount - i)) } - avail, err = store.Increment("foo") + err = store.Increment("foo") Expect(err).To(HaveOccurred()) - Expect(avail).To(Equal(0)) + Expect(err.Error()).To(ContainSubstring(exceedErrMessage)) }) }) }) - Describe("Stats", func() { - BeforeEach(func() { - store = NewStore(bucketCapacity, maxAmount, validDuration, expireDuration, expireCheckInterval, NewLogger("ratelimiter")) - }) - - It("get stats ", func() { - key1 := "foo" - key2 := "bar" - for i := 5; i < bucketCapacity; i++ { - store.Increment(key1) - } - for i := 7; i < bucketCapacity; i++ { - store.Increment(key2) - } - stats1 := store.Stats() - Expect(len(stats1)).To(Equal(2)) - Expect(stats1[key1]).To(Equal(5)) - Expect(stats1[key2]).To(Equal(7)) - - // should increase maxAmount * 2 tokens in each bucket - Sleep(validDuration * 2) - stats2 := store.Stats() - Expect(len(stats2)).To(Equal(2)) - Expect(stats2[key1]).To(Equal(5 + maxAmount * 2)) - Expect(stats2[key2]).To(Equal(7 + maxAmount * 2)) - }) - }) - Describe("expiryCycle", func() { BeforeEach(func() { store = NewStore(bucketCapacity, maxAmount, validDuration, expireDuration, expireCheckInterval, NewLogger("ratelimiter")) }) It("clean the bucket after expire ", func() { - avail := 0 for i := 0; i < bucketCapacity; i++ { - avail, _ = store.Increment("foo") + err := store.Increment("foo") + Expect(err).ToNot(HaveOccurred()) } - Expect(avail).To(Equal(0)) - Expect(len(store.Stats())).To(Equal(1)) + err := store.Increment("foo") + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring(exceedErrMessage)) Sleep(expireDuration + expireCheckInterval) - Expect(len(store.Stats())).To(Equal(0)) - avail, _ = store.Increment("foo") - Expect(avail).To(Equal(bucketCapacity - 1)) + err = store.Increment("foo") + Expect(err).ToNot(HaveOccurred()) }) }) }) + diff --git a/src/golang.org/x/time b/src/golang.org/x/time new file mode 160000 index 000000000..3af7569d3 --- /dev/null +++ b/src/golang.org/x/time @@ -0,0 +1 @@ +Subproject commit 3af7569d3a1e776fc2a3c1cec133b43105ea9c2e