Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
38587: util/quotapool: various features and fixes r=ajwerner a=ajwerner

This code is being pulled from ongoing work on admission control but this
change seemed isolated enough to warrant a separate PR.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jul 9, 2019
2 parents 8c6fdc6 + 8dcbaa3 commit 5d37afc
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 6 deletions.
16 changes: 15 additions & 1 deletion pkg/util/quotapool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ type Option interface {
apply(*config)
}

// AcquisitionFunc is used to configure a quotapool to call a function after
// an acquisition has occurred.
type AcquisitionFunc func(
ctx context.Context, poolName string, r Request, start time.Time,
)

// OnAcquisition creates an Option to configure a callback upon acquisition.
// It is often useful for recording metrics.
func OnAcquisition(f AcquisitionFunc) Option {
return optionFunc(func(cfg *config) {
cfg.onAcquisition = f
})
}

// OnSlowAcquisition creates an Option to configure a callback upon slow
// acquisitions. Only one OnSlowAcquisition may be used. If multiple are
// specified only the last will be used.
Expand Down Expand Up @@ -58,7 +72,7 @@ type optionFunc func(cfg *config)
func (f optionFunc) apply(cfg *config) { f(cfg) }

type config struct {
name string
onAcquisition AcquisitionFunc
onSlowAcquisition SlowAcquisitionFunc
slowAcquisitionThreshold time.Duration
}
5 changes: 5 additions & 0 deletions pkg/util/quotapool/intpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc,
return p.newIntAlloc(r.took), nil
}

// Len returns the current length of the queue for this IntPool.
func (p *IntPool) Len() int {
return p.qp.Len()
}

// ApproximateQuota will report approximately the amount of quota available in
// the pool. It is precise if there are no ongoing acquisitions. If there are,
// the return value can be up to 'v' less than actual available quota where 'v'
Expand Down
79 changes: 77 additions & 2 deletions pkg/util/quotapool/intpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -296,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) {
}
}

func TestOnAcquisition(t *testing.T) {
const quota = 100
var called bool
qp := quotapool.NewIntPool("test", quota,
quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time,
) {
assert.Equal(t, poolName, "test")
called = true
}))
ctx := context.Background()
alloc, err := qp.Acquire(ctx, 1)
assert.Nil(t, err)
assert.True(t, called)
alloc.Release()
}

// TestSlowAcquisition ensures that the SlowAcquisition callback is called
// when an Acquire call takes longer than the configured timeout.
func TestSlowAcquisition(t *testing.T) {
Expand All @@ -311,7 +329,9 @@ func TestSlowAcquisition(t *testing.T) {
firstKey := ctxKey(1)
firstCtx := context.WithValue(ctx, firstKey, "foo")
slowCalled, acquiredCalled := make(chan struct{}), make(chan struct{})
f := func(ctx context.Context, _ string, _ quotapool.Request, _ time.Time) func() {
const poolName = "test"
f := func(ctx context.Context, name string, _ quotapool.Request, _ time.Time) func() {
assert.Equal(t, poolName, name)
if ctx.Value(firstKey) != nil {
return func() {}
}
Expand All @@ -320,7 +340,7 @@ func TestSlowAcquisition(t *testing.T) {
close(acquiredCalled)
}
}
qp := quotapool.NewIntPool("test", 1, quotapool.OnSlowAcquisition(time.Microsecond, f))
qp := quotapool.NewIntPool(poolName, 1, quotapool.OnSlowAcquisition(time.Microsecond, f))
alloc, err := qp.Acquire(firstCtx, 1)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -447,6 +467,61 @@ func BenchmarkConcurrentIntQuotaPool(b *testing.B) {
}
}

// TestLen verifies that the Len() method of the IntPool works as expected.
func TestLen(t *testing.T) {
qp := quotapool.NewIntPool("test", 1, quotapool.LogSlowAcquisition)
ctx := context.Background()
allocCh := make(chan *quotapool.IntAlloc)
doAcquire := func(ctx context.Context) {
alloc, err := qp.Acquire(ctx, 1)
if ctx.Err() == nil && assert.Nil(t, err) {
allocCh <- alloc
}
}
assertLenSoon := func(exp int) {
testutils.SucceedsSoon(t, func() error {
if got := qp.Len(); got != exp {
return errors.Errorf("expected queue len to be %d, got %d", got, exp)
}
return nil
})
}
// Initially qp should have a length of 0.
assert.Equal(t, 0, qp.Len())
// Acquire all of the quota from the pool.
alloc, err := qp.Acquire(ctx, 1)
assert.Nil(t, err)
// The length should still be 0.
assert.Equal(t, 0, qp.Len())
// Launch a goroutine to acquire quota, ensure that the length increases.
go doAcquire(ctx)
assertLenSoon(1)
// Create more goroutines which will block to be canceled later in order to
// ensure that cancelations deduct from the length.
const numToCancel = 12 // an arbitrary number
ctxToCancel, cancel := context.WithCancel(ctx)
for i := 0; i < numToCancel; i++ {
go doAcquire(ctxToCancel)
}
// Ensure that all of the new goroutines are reflected in the length.
assertLenSoon(numToCancel + 1)
// Launch another goroutine with the default context.
go doAcquire(ctx)
assertLenSoon(numToCancel + 2)
// Cancel some of the goroutines.
cancel()
// Ensure that they are soon not reflected in the length.
assertLenSoon(2)
// Unblock the first goroutine.
alloc.Release()
alloc = <-allocCh
assert.Equal(t, 1, qp.Len())
// Unblock the second goroutine.
alloc.Release()
<-allocCh
assert.Equal(t, 0, qp.Len())
}

// BenchmarkIntQuotaPoolFunc benchmarks the common case where we have sufficient
// quota available in the pool and we repeatedly acquire and release quota.
func BenchmarkIntQuotaPoolFunc(b *testing.B) {
Expand Down
32 changes: 29 additions & 3 deletions pkg/util/quotapool/quotapool.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (ec *ErrClosed) Error() string {
type QuotaPool struct {
config

// name is used for logging purposes and is passed to functions used to report
// acquistions or slow acqusitions.
name string

// chanSyncPool is used to pool allocations of the channels used to notify
// goroutines waiting in Acquire.
chanSyncPool sync.Pool
Expand Down Expand Up @@ -118,6 +122,11 @@ type QuotaPool struct {
// channel buffer.
q notifyQueue

// numCanceled is the number of members of q which have been canceled.
// It is used to determine the current number of active waiters in the queue
// which is q.len() less this value.
numCanceled int

// closed is set to true when the quota pool is closed (see
// QuotaPool.Close).
closed bool
Expand All @@ -129,6 +138,7 @@ type QuotaPool struct {
// acquired without ever making more than the quota capacity available.
func New(name string, initialResource Resource, options ...Option) *QuotaPool {
qp := &QuotaPool{
name: name,
quota: make(chan Resource, 1),
done: make(chan struct{}),
chanSyncPool: sync.Pool{
Expand Down Expand Up @@ -186,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) {
if closeErr != nil {
return closeErr
}
start := timeutil.Now()
// Set up onAcquisition if we have one.
if qp.config.onAcquisition != nil {
defer func() {
if err == nil {
qp.config.onAcquisition(ctx, qp.name, r, start)
}
}()
}

// Set up the infrastructure to report slow requests.
var slowTimer *timeutil.Timer
var slowTimerC <-chan time.Time
var start time.Time
if qp.onSlowAcquisition != nil {
start = timeutil.Now()
slowTimer = timeutil.NewTimer()
defer slowTimer.Stop()
// Intentionally reset only once, for we care more about the select duration in
Expand Down Expand Up @@ -230,8 +248,8 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) {
// Goroutines are not a risk of getting notified and finding
// out they're not first in line.
notifyCh <- struct{}{}
qp.mu.numCanceled++
}

qp.mu.Unlock()
return ctx.Err()
case <-qp.done:
Expand Down Expand Up @@ -312,6 +330,7 @@ func (qp *QuotaPool) notifyNextLocked() {
// shifting the queue.
<-ch
qp.chanSyncPool.Put(qp.mu.q.dequeue())
qp.mu.numCanceled--
continue
}
break
Expand All @@ -336,6 +355,13 @@ func (qp *QuotaPool) ApproximateQuota(f func(Resource)) {
}
}

// Len returns the current length of the queue for this QuotaPool.
func (qp *QuotaPool) Len() int {
qp.mu.Lock()
defer qp.mu.Unlock()
return int(qp.mu.q.len) - qp.mu.numCanceled
}

// Close signals to all ongoing and subsequent acquisitions that they are
// free to return to their callers. They will receive an *ErrClosed which
// contains this reason.
Expand Down

0 comments on commit 5d37afc

Please sign in to comment.