-
Notifications
You must be signed in to change notification settings - Fork 46
/
ratelimit.go
245 lines (216 loc) · 7.76 KB
/
ratelimit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
/*
Copyright 2018 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"time"
)
// RateLimitKey is a key identifying the operation to be rate limited. The rate limit
// queue will be determined based on the contents of RateKey.
//
// This type will be removed in a future release. Please change all
// references to CallContextKey.
type RateLimitKey = CallContextKey
// RateLimiter is the interface for a rate limiting policy.
type RateLimiter interface {
// Accept uses the RateLimitKey to derive a sleep time for the calling
// goroutine. This call will block until the operation is ready for
// execution.
//
// Accept returns an error if the given context ctx was canceled
// while waiting for acceptance into the queue.
Accept(ctx context.Context, key *RateLimitKey) error
// Observe uses the RateLimitKey to handle response results, which may affect
// the sleep time for the Accept function.
Observe(ctx context.Context, err error, key *RateLimitKey)
}
// acceptor is an object which blocks within Accept until a call is allowed to run.
// Accept is a behavior of the flowcontrol.RateLimiter interface.
type acceptor interface {
// Accept blocks until a call is allowed to run.
Accept()
}
// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters.
type AcceptRateLimiter struct {
// Acceptor is the underlying rate limiter.
Acceptor acceptor
}
// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored.
func (rl *AcceptRateLimiter) Accept(ctx context.Context, _ *RateLimitKey) error {
ch := make(chan struct{})
go func() {
rl.Acceptor.Accept()
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// Observe does nothing.
func (rl *AcceptRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}
// NopRateLimiter is a rate limiter that performs no rate limiting.
type NopRateLimiter struct {
}
// Accept everything immediately.
func (*NopRateLimiter) Accept(context.Context, *RateLimitKey) error {
return nil
}
// Observe does nothing.
func (*NopRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}
// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum
// duration has been met or the context is cancelled.
type MinimumRateLimiter struct {
// RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd.
RateLimiter RateLimiter
// Minimum is the minimum wait time before the underlying ratelimiter is called.
Minimum time.Duration
}
// Accept blocks on the minimum duration and context. Once the minimum duration is met,
// the func is blocked on the underlying ratelimiter.
func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
t := time.NewTimer(m.Minimum)
select {
case <-t.C:
return m.RateLimiter.Accept(ctx, key)
case <-ctx.Done():
t.Stop()
return ctx.Err()
}
}
// Observe just passes error to the underlying ratelimiter.
func (m *MinimumRateLimiter) Observe(ctx context.Context, err error, key *RateLimitKey) {
m.RateLimiter.Observe(ctx, err, key)
}
// TickerRateLimiter uses time.Ticker to spread Accepts over time.
//
// Concurrent calls to Accept will block on the same channel. It is not
// guaranteed what caller will be unblocked first.
type TickerRateLimiter struct {
ticker *time.Ticker
}
// NewTickerRateLimiter creates a new TickerRateLimiter which will space Accept
// calls at least interval/limit time apart.
//
// For example, limit=4 interval=time.Minute will unblock a single Accept call
// every 15 seconds.
func NewTickerRateLimiter(limit int, interval time.Duration) *TickerRateLimiter {
return &TickerRateLimiter{
ticker: time.NewTicker(interval / time.Duration(limit)),
}
}
// Accept will block until a time, specified when creating TickerRateLimiter,
// passes since the last call to Accept.
func (t *TickerRateLimiter) Accept(ctx context.Context, rlk *RateLimitKey) error {
select {
case <-t.ticker.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
// Observe does nothing.
func (*TickerRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}
// Make sure that TickerRateLimiter implements RateLimiter.
var _ RateLimiter = new(TickerRateLimiter)
// CompositeRateLimiter combines rate limiters based on RateLimitKey.
type CompositeRateLimiter struct {
// map[resource name]map[operation name]RateLimiter
rateLimiters map[string]map[string]RateLimiter
// defaultRL is used when no matching RateLimiter was found.
defaultRL RateLimiter
}
// NewCompositeRateLimiter creates a new CompositeRateLimiter that will use
// provided default rate limiter if no better match is found. It is intended to
// be used for a single project.
//
// # Example
//
// bsDefaultRL := /* backend service default rate limiter */
// bsGetListRL := /* backend service rate limiter for get and list operations */
//
// rl := NewCompositeRateLimiter(defaultRL)
// rl.Register("BackendServices", "", bsDefaultRL)
// rl.Register("BackendServices", "Get", bsGetListRL)
// rl.Register("BackendServices", "List", bsGetListRL)
//
// This rate limiter is not nesting. Only one rate limiter is used for any
// particular combination of: resource, operation. For the case above, rate
// limiter registered at ("BackendServices", "") won't be applied to operation
// ("BackendServices", "Get"), because a more specific rate limiter was
// registered.
func NewCompositeRateLimiter(defaultRL RateLimiter) *CompositeRateLimiter {
m := map[string]map[string]RateLimiter{
"": {
"": defaultRL,
},
}
return &CompositeRateLimiter{
rateLimiters: m,
defaultRL: defaultRL,
}
}
// ensureExists creates sub-maps as needed.
func (c *CompositeRateLimiter) ensureExists(service string) {
if _, ok := c.rateLimiters[service]; !ok {
c.rateLimiters[service] = map[string]RateLimiter{}
}
}
// fillMissing finds all combinations where resource and/or operation name
// could be omitted and sets it to defaultRL.
func (c *CompositeRateLimiter) fillMissing() {
for _, subService := range c.rateLimiters {
if subService[""] == nil {
subService[""] = c.defaultRL
}
}
}
// Register adds provided rl to the composite rate limiter. Service, operation
// can be omitted by providing an empty string. In this case, the provided rate
// limiter will be used only when there is no other rate limiter matching a
// particular resource, or operation.
//
// It replaces previous rate limiter provided for the same service, operation
// combination. Once a rate limiter is added, it can't be removed.
//
// Same rate limiter can be used for multiple Register calls.
func (c *CompositeRateLimiter) Register(service, operation string, rl RateLimiter) {
c.ensureExists(service)
c.rateLimiters[service][operation] = rl
c.fillMissing()
}
// Accept either calls underlying rate limiter matching rlk or a default rate
// limiter when none is found.
func (c *CompositeRateLimiter) Accept(ctx context.Context, rlk *RateLimitKey) error {
if rlk == nil {
return c.defaultRL.Accept(ctx, rlk)
}
service := rlk.Service
if _, ok := c.rateLimiters[service]; !ok {
service = ""
}
operation := rlk.Operation
if _, ok := c.rateLimiters[service][operation]; !ok {
operation = ""
}
return c.rateLimiters[service][operation].Accept(ctx, rlk)
}
// Observe does nothing.
func (*CompositeRateLimiter) Observe(context.Context, error, *RateLimitKey) {
}