-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
quota_pool.go
288 lines (267 loc) · 9.21 KB
/
quota_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
//
// Author: Irfan Sharif ([email protected])
//
// The code below is a modified version of a similar structure found in
// grpc-go (github.com/grpc/grpc-go/blob/b2fae0c/transport/control.go).
// Specifically we allow for arbitrarily sized acquisitions and avoid
// starvation by internally maintaining a FIFO structure.
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package storage
import (
"errors"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"golang.org/x/net/context"
)
type quotaPool struct {
syncutil.Mutex
// We service quota acquisitions in a first come, first serve basis. This
// is done so as to in order to prevent starvations of large acquisitions
// from a continuous stream of smaller ones. Acquisitions 'register'
// themselves for a notification that indicates they're now first in line.
// This is done by appending to the queue the channel they will then wait
// on. If a thread no longer needs to be notified, i.e. their acquisition
// context has been cancelled, the thread is responsible for blocking
// subsequent notifications to the channel by filling up the channel
// buffer.
queue []chan struct{}
// We use a channel to 'park' our quota value for easier composition with
// context cancellation and quotaPool closing (see quotaPool.acquire).
//
// Quota additions push a value into the channel whereas the acquisition
// first in line waits on the channel itself.
q chan int64
cap int64
// Ongoing acquisitions listen on quotaPool.done which is closed when the quota
// pool is closed (see quotaPool.close)
done chan struct{}
closed bool
}
// newQuotaPool returns a new quota pool initialized with a given quota, the quota
// is capped at this amount.
func newQuotaPool(q int64) *quotaPool {
qp := "aPool{
q: make(chan int64, 1),
done: make(chan struct{}),
cap: q,
}
qp.q <- q
return qp
}
// add adds the specified quota back to the pool. At no point does the total
// quota in the pool exceed the maximum capacity determined during
// initialization. Safe for concurrent use.
func (qp *quotaPool) add(v int64) {
qp.Lock()
select {
case q := <-qp.q:
v += q
default:
}
if v > qp.cap {
v = qp.cap
}
qp.q <- v
qp.Unlock()
}
// acquire acquires the specified amount of quota from the pool. On success,
// nil is returned and the caller must call add(v) or otherwise arrange for the
// quota to be returned to the pool. If 'v' is greater than the total capacity
// of the pool, we instead try to acquire quota equal to the maximum capacity.
// Safe for concurrent use.
func (qp *quotaPool) acquire(ctx context.Context, v int64) error {
if v > qp.cap {
v = qp.cap
}
notifyCh := make(chan struct{}, 1)
qp.Lock()
qp.queue = append(qp.queue, notifyCh)
// If we're first in line, we notify ourself immediately.
if len(qp.queue) == 1 {
notifyCh <- struct{}{}
}
qp.Unlock()
slowTimer := timeutil.NewTimer()
slowTimer.Reset(base.SlowRequestThreshold)
defer slowTimer.Stop()
for {
select {
case <-slowTimer.C:
log.Warningf(ctx, "have been waiting %s attempting to acquire quota",
base.SlowRequestThreshold)
continue
case <-ctx.Done():
qp.Lock()
// We no longer need to notified but we need to be careful and check
// whether or not we're first in queue. If so, we need to notify the
// next acquisition thread and clean up the waiting queue while doing
// so.
// Otherwise we simply 'unregister' ourselves from the queue by filling
// up the channel buffer. This is what is checked when a thread wishes
// to notify the next in line.
if qp.queue[0] == notifyCh {
// We're at the head of the queue. We traverse until we find a
// thread waiting to be notified, notify the thread and truncate
// our queue so to ensure the said thread is at the head of the queue.
// If we determine there are non threads waiting, we simply
// truncate the queue to reflect this.
//
// NB: Notifying the channel before moving it to the head of the
// queue is safe because the queue itself is guarded by a lock.
// Threads are not a risk of getting notified and finding out
// they're not first in line.
for i, ch := range qp.queue[1:] {
select {
case ch <- struct{}{}:
qp.queue = qp.queue[i+1:]
qp.Unlock()
return ctx.Err()
default:
}
}
qp.queue = qp.queue[:0]
} else {
notifyCh <- struct{}{}
}
qp.Unlock()
return ctx.Err()
case <-qp.done:
// We don't need to 'unregister' ourselves as in the case when the
// context is cancelled. By not doing so we in fact 'suck' up the
// notification (if any) and all acquisitions threads further along the
// queue will definitely receive on qp.done.
return errors.New("quota pool no longer in use")
case <-notifyCh:
}
break
}
// We're first in line to receive quota, we keep accumulating quota until
// we've acquired enough or determine we no longer need the acquisition.
// If we have acquired the quota needed or our context gets cancelled,
// we're sure to remove ourselves from the queue and notify the thread next
// in line (if any).
var acquired int64
for acquired < v {
select {
case <-slowTimer.C:
log.Warningf(ctx, "have been waiting %s attempting to acquire quota",
base.SlowRequestThreshold)
case <-ctx.Done():
if acquired > 0 {
qp.add(acquired)
}
qp.Lock()
for i, ch := range qp.queue[1:] {
select {
case ch <- struct{}{}:
qp.queue = qp.queue[i+1:]
qp.Unlock()
return ctx.Err()
default:
}
}
qp.queue = qp.queue[:0]
qp.Unlock()
return ctx.Err()
case <-qp.done:
// We don't need to release quota back as all ongoing and
// subsequent acquisitions will fail with an error indicating that
// the pool is now closed.
return errors.New("quota pool no longer in use")
case q := <-qp.q:
acquired += q
}
}
if acquired > v {
qp.add(acquired - v)
}
qp.Lock()
for i, ch := range qp.queue[1:] {
select {
case ch <- struct{}{}:
qp.queue = qp.queue[i+1:]
qp.Unlock()
return nil
default:
}
}
qp.queue = qp.queue[:0]
qp.Unlock()
return nil
}
// quota will correctly report the amount of quota available in the
// system only if there are no ongoing acquisition threads. If there are, it's
// return value can be up to 'v' less than currently available quota where 'v'
// is the value the acquisition thread first in line is attempting to acquire.
func (qp *quotaPool) quota() int64 {
qp.Lock()
defer qp.Unlock()
select {
case q := <-qp.q:
qp.q <- q
return q
default:
return 0
}
}
// close closes the quota pool and is safe for concurrent use.
//
// NB: This is best effort, we try to fail all ongoing and subsequent
// acquisitions with an error indicating so but acquisitions may still seep
// through. This is due to go's behaviour where if we're waiting on multiple
// channels in a select statement, if there are values ready for more than one
// channel the runtime will pseudo-randomly choose one to proceed.
func (qp *quotaPool) close() {
qp.Lock()
if !qp.closed {
qp.closed = true
close(qp.done)
}
qp.Unlock()
}