From ef016d9052ac2153a12ff72877aced306793e072 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 1 Jul 2019 13:53:16 -0400 Subject: [PATCH 1/3] util/quotapool: add Len() method to return the current queue length This code is being pulled from ongoing work on admission control but this change seemed isolated enough to warrant a separate PR. Release note: None --- pkg/util/quotapool/intpool.go | 5 +++ pkg/util/quotapool/intpool_test.go | 57 ++++++++++++++++++++++++++++++ pkg/util/quotapool/quotapool.go | 15 +++++++- 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/pkg/util/quotapool/intpool.go b/pkg/util/quotapool/intpool.go index db548a756dcf..b9644e2a5e44 100644 --- a/pkg/util/quotapool/intpool.go +++ b/pkg/util/quotapool/intpool.go @@ -188,6 +188,11 @@ func (p *IntPool) AcquireFunc(ctx context.Context, f IntRequestFunc) (*IntAlloc, return p.newIntAlloc(r.took), nil } +// Len returns the current length of the queue for this IntPool. +func (p *IntPool) Len() int { + return p.qp.Len() +} + // ApproximateQuota will report approximately the amount of quota available in // the pool. It is precise if there are no ongoing acquisitions. If there are, // the return value can be up to 'v' less than actual available quota where 'v' diff --git a/pkg/util/quotapool/intpool_test.go b/pkg/util/quotapool/intpool_test.go index adbd6d774932..8badf070b311 100644 --- a/pkg/util/quotapool/intpool_test.go +++ b/pkg/util/quotapool/intpool_test.go @@ -20,6 +20,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" "golang.org/x/sync/errgroup" ) @@ -447,6 +449,61 @@ func BenchmarkConcurrentIntQuotaPool(b *testing.B) { } } +// TestLen verifies that the Len() method of the IntPool works as expected. +func TestLen(t *testing.T) { + qp := quotapool.NewIntPool("test", 1, quotapool.LogSlowAcquisition) + ctx := context.Background() + allocCh := make(chan *quotapool.IntAlloc) + doAcquire := func(ctx context.Context) { + alloc, err := qp.Acquire(ctx, 1) + if ctx.Err() == nil && assert.Nil(t, err) { + allocCh <- alloc + } + } + assertLenSoon := func(exp int) { + testutils.SucceedsSoon(t, func() error { + if got := qp.Len(); got != exp { + return errors.Errorf("expected queue len to be %d, got %d", got, exp) + } + return nil + }) + } + // Initially qp should have a length of 0. + assert.Equal(t, 0, qp.Len()) + // Acquire all of the quota from the pool. + alloc, err := qp.Acquire(ctx, 1) + assert.Nil(t, err) + // The length should still be 0. + assert.Equal(t, 0, qp.Len()) + // Launch a goroutine to acquire quota, ensure that the length increases. + go doAcquire(ctx) + assertLenSoon(1) + // Create more goroutines which will block to be canceled later in order to + // ensure that cancelations deduct from the length. + const numToCancel = 12 // an arbitrary number + ctxToCancel, cancel := context.WithCancel(ctx) + for i := 0; i < numToCancel; i++ { + go doAcquire(ctxToCancel) + } + // Ensure that all of the new goroutines are reflected in the length. + assertLenSoon(numToCancel + 1) + // Launch another goroutine with the default context. + go doAcquire(ctx) + assertLenSoon(numToCancel + 2) + // Cancel some of the goroutines. + cancel() + // Ensure that they are soon not reflected in the length. + assertLenSoon(2) + // Unblock the first goroutine. + alloc.Release() + alloc = <-allocCh + assert.Equal(t, 1, qp.Len()) + // Unblock the second goroutine. + alloc.Release() + <-allocCh + assert.Equal(t, 0, qp.Len()) +} + // BenchmarkIntQuotaPoolFunc benchmarks the common case where we have sufficient // quota available in the pool and we repeatedly acquire and release quota. func BenchmarkIntQuotaPoolFunc(b *testing.B) { diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index 403af32eb157..04c3f2430376 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -118,6 +118,11 @@ type QuotaPool struct { // channel buffer. q notifyQueue + // numCanceled is the number of members of q which have been canceled. + // It is used to determine the current number of active waiters in the queue + // which is q.len() less this value. + numCanceled int + // closed is set to true when the quota pool is closed (see // QuotaPool.Close). closed bool @@ -230,8 +235,8 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) { // Goroutines are not a risk of getting notified and finding // out they're not first in line. notifyCh <- struct{}{} + qp.mu.numCanceled++ } - qp.mu.Unlock() return ctx.Err() case <-qp.done: @@ -312,6 +317,7 @@ func (qp *QuotaPool) notifyNextLocked() { // shifting the queue. <-ch qp.chanSyncPool.Put(qp.mu.q.dequeue()) + qp.mu.numCanceled-- continue } break @@ -336,6 +342,13 @@ func (qp *QuotaPool) ApproximateQuota(f func(Resource)) { } } +// Len returns the current length of the queue for this QuotaPool. +func (qp *QuotaPool) Len() int { + qp.mu.Lock() + defer qp.mu.Unlock() + return int(qp.mu.q.len) - qp.mu.numCanceled +} + // Close signals to all ongoing and subsequent acquisitions that they are // free to return to their callers. They will receive an *ErrClosed which // contains this reason. From 3f0ccceee6c222aa591c40ff0642819dcd0bb741 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 1 Jul 2019 16:25:07 -0400 Subject: [PATCH 2/3] util/quotapool: fix name propagation --- pkg/util/quotapool/config.go | 1 - pkg/util/quotapool/intpool_test.go | 6 ++++-- pkg/util/quotapool/quotapool.go | 5 +++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go index e7ecbc11ff2a..0e6227d4f49a 100644 --- a/pkg/util/quotapool/config.go +++ b/pkg/util/quotapool/config.go @@ -58,7 +58,6 @@ type optionFunc func(cfg *config) func (f optionFunc) apply(cfg *config) { f(cfg) } type config struct { - name string onSlowAcquisition SlowAcquisitionFunc slowAcquisitionThreshold time.Duration } diff --git a/pkg/util/quotapool/intpool_test.go b/pkg/util/quotapool/intpool_test.go index 8badf070b311..ba0053157e66 100644 --- a/pkg/util/quotapool/intpool_test.go +++ b/pkg/util/quotapool/intpool_test.go @@ -313,7 +313,9 @@ func TestSlowAcquisition(t *testing.T) { firstKey := ctxKey(1) firstCtx := context.WithValue(ctx, firstKey, "foo") slowCalled, acquiredCalled := make(chan struct{}), make(chan struct{}) - f := func(ctx context.Context, _ string, _ quotapool.Request, _ time.Time) func() { + const poolName = "test" + f := func(ctx context.Context, name string, _ quotapool.Request, _ time.Time) func() { + assert.Equal(t, poolName, name) if ctx.Value(firstKey) != nil { return func() {} } @@ -322,7 +324,7 @@ func TestSlowAcquisition(t *testing.T) { close(acquiredCalled) } } - qp := quotapool.NewIntPool("test", 1, quotapool.OnSlowAcquisition(time.Microsecond, f)) + qp := quotapool.NewIntPool(poolName, 1, quotapool.OnSlowAcquisition(time.Microsecond, f)) alloc, err := qp.Acquire(firstCtx, 1) if err != nil { t.Fatal(err) diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index 04c3f2430376..e966d1a8d2c5 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -86,6 +86,10 @@ func (ec *ErrClosed) Error() string { type QuotaPool struct { config + // name is used for logging purposes and is passed to functions used to report + // acquistions or slow acqusitions. + name string + // chanSyncPool is used to pool allocations of the channels used to notify // goroutines waiting in Acquire. chanSyncPool sync.Pool @@ -134,6 +138,7 @@ type QuotaPool struct { // acquired without ever making more than the quota capacity available. func New(name string, initialResource Resource, options ...Option) *QuotaPool { qp := &QuotaPool{ + name: name, quota: make(chan Resource, 1), done: make(chan struct{}), chanSyncPool: sync.Pool{ From 8dcbaa38d34f4f5b61908200ec5b8b434477b482 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Mon, 1 Jul 2019 16:22:28 -0400 Subject: [PATCH 3/3] util/quotapool: add optional OnAcquire function This change is used in upcoming work on admission control but felt sufficiently isolated to warrant its own PR. This function is especially useful for recording metrics after an acquisition has occurred. Release note: None --- pkg/util/quotapool/config.go | 15 +++++++++++++++ pkg/util/quotapool/intpool_test.go | 16 ++++++++++++++++ pkg/util/quotapool/quotapool.go | 12 ++++++++++-- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go index 0e6227d4f49a..c4a2998f00ed 100644 --- a/pkg/util/quotapool/config.go +++ b/pkg/util/quotapool/config.go @@ -24,6 +24,20 @@ type Option interface { apply(*config) } +// AcquisitionFunc is used to configure a quotapool to call a function after +// an acquisition has occurred. +type AcquisitionFunc func( + ctx context.Context, poolName string, r Request, start time.Time, +) + +// OnAcquisition creates an Option to configure a callback upon acquisition. +// It is often useful for recording metrics. +func OnAcquisition(f AcquisitionFunc) Option { + return optionFunc(func(cfg *config) { + cfg.onAcquisition = f + }) +} + // OnSlowAcquisition creates an Option to configure a callback upon slow // acquisitions. Only one OnSlowAcquisition may be used. If multiple are // specified only the last will be used. @@ -58,6 +72,7 @@ type optionFunc func(cfg *config) func (f optionFunc) apply(cfg *config) { f(cfg) } type config struct { + onAcquisition AcquisitionFunc onSlowAcquisition SlowAcquisitionFunc slowAcquisitionThreshold time.Duration } diff --git a/pkg/util/quotapool/intpool_test.go b/pkg/util/quotapool/intpool_test.go index ba0053157e66..73a668b87452 100644 --- a/pkg/util/quotapool/intpool_test.go +++ b/pkg/util/quotapool/intpool_test.go @@ -298,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) { } } +func TestOnAcquisition(t *testing.T) { + const quota = 100 + var called bool + qp := quotapool.NewIntPool("test", quota, + quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time, + ) { + assert.Equal(t, poolName, "test") + called = true + })) + ctx := context.Background() + alloc, err := qp.Acquire(ctx, 1) + assert.Nil(t, err) + assert.True(t, called) + alloc.Release() +} + // TestSlowAcquisition ensures that the SlowAcquisition callback is called // when an Acquire call takes longer than the configured timeout. func TestSlowAcquisition(t *testing.T) { diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index e966d1a8d2c5..ac821eb856ad 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -196,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) { if closeErr != nil { return closeErr } + start := timeutil.Now() + // Set up onAcquisition if we have one. + if qp.config.onAcquisition != nil { + defer func() { + if err == nil { + qp.config.onAcquisition(ctx, qp.name, r, start) + } + }() + } + // Set up the infrastructure to report slow requests. var slowTimer *timeutil.Timer var slowTimerC <-chan time.Time - var start time.Time if qp.onSlowAcquisition != nil { - start = timeutil.Now() slowTimer = timeutil.NewTimer() defer slowTimer.Stop() // Intentionally reset only once, for we care more about the select duration in