diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index af3310fb3917..730aacf7afe8 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -85,10 +85,11 @@ func evalExport( reply.StartTime = cArgs.EvalCtx.GetGCThreshold() } - if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil { + q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx) + if err != nil { return result.Result{}, err } - defer cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Finish() + defer q.Release() makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} || (args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 55bd4ea6fa75..8861496df518 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -177,8 +177,8 @@ func (r *Replica) RangeFeed( var iterSemRelease func() if !args.Timestamp.IsEmpty() { usingCatchupIter = true - lim := &r.store.limiters.ConcurrentRangefeedIters - if err := lim.Begin(ctx); err != nil { + alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx) + if err != nil { return roachpb.NewError(err) } // Finish the iterator limit if we exit before the iterator finishes. @@ -191,7 +191,7 @@ func (r *Replica) RangeFeed( // scan. var iterSemReleaseOnce sync.Once iterSemRelease = func() { - iterSemReleaseOnce.Do(lim.Finish) + iterSemReleaseOnce.Do(alloc.Release) } defer iterSemRelease() } diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index ecfef3bb9e94..b2cb95b105e5 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -56,10 +56,11 @@ func (s *Store) Send( // and block all other writes to the same span. if ba.IsSingleAddSSTableRequest() { before := timeutil.Now() - if err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx); err != nil { + alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx) + if err != nil { return nil, roachpb.NewError(err) } - defer s.limiters.ConcurrentAddSSTableRequests.Finish() + defer alloc.Release() beforeEngineDelay := timeutil.Now() s.engine.PreIngestDelay(ctx) diff --git a/pkg/util/limit/BUILD.bazel b/pkg/util/limit/BUILD.bazel index ab94c6d41c22..3b350f3723b2 100644 --- a/pkg/util/limit/BUILD.bazel +++ b/pkg/util/limit/BUILD.bazel @@ -6,8 +6,9 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/util/limit", visibility = ["//visibility:public"], deps = [ + "//pkg/util/quotapool", "//pkg/util/tracing", - "@com_github_marusama_semaphore//:semaphore", + "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/util/limit/limiter.go b/pkg/util/limit/limiter.go index 962a1003d9d8..0a68748324bc 100644 --- a/pkg/util/limit/limiter.go +++ b/pkg/util/limit/limiter.go @@ -13,48 +13,45 @@ package limit import ( "context" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/tracing" - "github.com/marusama/semaphore" + "github.com/cockroachdb/errors" ) // ConcurrentRequestLimiter wraps a simple semaphore, adding a tracing span when // a request is forced to wait. type ConcurrentRequestLimiter struct { spanName string - sem semaphore.Semaphore + sem *quotapool.IntPool } // MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter. func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter { - return ConcurrentRequestLimiter{spanName: spanName, sem: semaphore.New(limit)} + return ConcurrentRequestLimiter{ + spanName: spanName, + sem: quotapool.NewIntPool(spanName, uint64(limit)), + } } // Begin attempts to reserve a spot in the pool, blocking if needed until the // one is available or the context is canceled and adding a tracing span if it // is forced to block. -func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: +func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) { + if err := ctx.Err(); err != nil { + return nil, err } - if l.sem.TryAcquire(1) { - return nil + alloc, err := l.sem.TryAcquire(ctx, 1) + if errors.Is(err, quotapool.ErrNotEnoughQuota) { + var span *tracing.Span + ctx, span = tracing.ChildSpan(ctx, l.spanName) + defer span.Finish() + alloc, err = l.sem.Acquire(ctx, 1) } - // If not, start a span and begin waiting. - ctx, span := tracing.ChildSpan(ctx, l.spanName) - defer span.Finish() - return l.sem.Acquire(ctx, 1) -} - -// Finish indicates a concurrent request has completed and its reservation can -// be returned to the pool. -func (l *ConcurrentRequestLimiter) Finish() { - l.sem.Release(1) + return alloc, err } // SetLimit adjusts the size of the pool. func (l *ConcurrentRequestLimiter) SetLimit(newLimit int) { - l.sem.SetLimit(newLimit) + l.sem.UpdateCapacity(uint64(newLimit)) } diff --git a/pkg/util/limit/limiter_test.go b/pkg/util/limit/limiter_test.go index ec80c573dccb..1deda00912a0 100644 --- a/pkg/util/limit/limiter_test.go +++ b/pkg/util/limit/limiter_test.go @@ -40,7 +40,8 @@ func TestConcurrentRequestLimiter(t *testing.T) { req := 0 for { //t.Logf("waiting to make request %d... (%d / %d)", req+1, l.sem.GetCount(), l.sem.GetLimit()) - if err := l.Begin(ctx); err != nil { + alloc, err := l.Begin(ctx) + if err != nil { if errors.Is(err, ctx.Err()) { break } else { @@ -52,7 +53,7 @@ func TestConcurrentRequestLimiter(t *testing.T) { cancel() } req++ - l.Finish() + alloc.Release() } t.Logf("thread done after handling %d requests", req) return nil