diff --git a/pkg/util/quotapool/config.go b/pkg/util/quotapool/config.go index 0e6227d4f49a..c4a2998f00ed 100644 --- a/pkg/util/quotapool/config.go +++ b/pkg/util/quotapool/config.go @@ -24,6 +24,20 @@ type Option interface { apply(*config) } +// AcquisitionFunc is used to configure a quotapool to call a function after +// an acquisition has occurred. +type AcquisitionFunc func( + ctx context.Context, poolName string, r Request, start time.Time, +) + +// OnAcquisition creates an Option to configure a callback upon acquisition. +// It is often useful for recording metrics. +func OnAcquisition(f AcquisitionFunc) Option { + return optionFunc(func(cfg *config) { + cfg.onAcquisition = f + }) +} + // OnSlowAcquisition creates an Option to configure a callback upon slow // acquisitions. Only one OnSlowAcquisition may be used. If multiple are // specified only the last will be used. @@ -58,6 +72,7 @@ type optionFunc func(cfg *config) func (f optionFunc) apply(cfg *config) { f(cfg) } type config struct { + onAcquisition AcquisitionFunc onSlowAcquisition SlowAcquisitionFunc slowAcquisitionThreshold time.Duration } diff --git a/pkg/util/quotapool/intpool_test.go b/pkg/util/quotapool/intpool_test.go index ba0053157e66..73a668b87452 100644 --- a/pkg/util/quotapool/intpool_test.go +++ b/pkg/util/quotapool/intpool_test.go @@ -298,6 +298,22 @@ func TestQuotaPoolCappedAcquisition(t *testing.T) { } } +func TestOnAcquisition(t *testing.T) { + const quota = 100 + var called bool + qp := quotapool.NewIntPool("test", quota, + quotapool.OnAcquisition(func(ctx context.Context, poolName string, _ quotapool.Request, start time.Time, + ) { + assert.Equal(t, poolName, "test") + called = true + })) + ctx := context.Background() + alloc, err := qp.Acquire(ctx, 1) + assert.Nil(t, err) + assert.True(t, called) + alloc.Release() +} + // TestSlowAcquisition ensures that the SlowAcquisition callback is called // when an Acquire call takes longer than the configured timeout. func TestSlowAcquisition(t *testing.T) { diff --git a/pkg/util/quotapool/quotapool.go b/pkg/util/quotapool/quotapool.go index e966d1a8d2c5..ac821eb856ad 100644 --- a/pkg/util/quotapool/quotapool.go +++ b/pkg/util/quotapool/quotapool.go @@ -196,12 +196,20 @@ func (qp *QuotaPool) Acquire(ctx context.Context, r Request) (err error) { if closeErr != nil { return closeErr } + start := timeutil.Now() + // Set up onAcquisition if we have one. + if qp.config.onAcquisition != nil { + defer func() { + if err == nil { + qp.config.onAcquisition(ctx, qp.name, r, start) + } + }() + } + // Set up the infrastructure to report slow requests. var slowTimer *timeutil.Timer var slowTimerC <-chan time.Time - var start time.Time if qp.onSlowAcquisition != nil { - start = timeutil.Now() slowTimer = timeutil.NewTimer() defer slowTimer.Stop() // Intentionally reset only once, for we care more about the select duration in