Skip to content

Commit

Permalink
Merge #66092
Browse files Browse the repository at this point in the history
66092: util/limit: replace unfair semaphore with fair quotapool r=dt a=ajwerner

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Jun 4, 2021
2 parents eddb9dc + b419c90 commit 9cef2e9
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 31 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func evalExport(
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}

if err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx); err != nil {
q, err := cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Begin(ctx)
if err != nil {
return result.Result{}, err
}
defer cArgs.EvalCtx.GetLimiters().ConcurrentExportRequests.Finish()
defer q.Release()

makeExternalStorage := !args.ReturnSST || args.Storage != roachpb.ExternalStorage{} ||
(args.StorageByLocalityKV != nil && len(args.StorageByLocalityKV) > 0)
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ func (r *Replica) RangeFeed(
var iterSemRelease func()
if !args.Timestamp.IsEmpty() {
usingCatchupIter = true
lim := &r.store.limiters.ConcurrentRangefeedIters
if err := lim.Begin(ctx); err != nil {
alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx)
if err != nil {
return roachpb.NewError(err)
}
// Finish the iterator limit if we exit before the iterator finishes.
Expand All @@ -191,7 +191,7 @@ func (r *Replica) RangeFeed(
// scan.
var iterSemReleaseOnce sync.Once
iterSemRelease = func() {
iterSemReleaseOnce.Do(lim.Finish)
iterSemReleaseOnce.Do(alloc.Release)
}
defer iterSemRelease()
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ func (s *Store) Send(
// and block all other writes to the same span.
if ba.IsSingleAddSSTableRequest() {
before := timeutil.Now()
if err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx); err != nil {
alloc, err := s.limiters.ConcurrentAddSSTableRequests.Begin(ctx)
if err != nil {
return nil, roachpb.NewError(err)
}
defer s.limiters.ConcurrentAddSSTableRequests.Finish()
defer alloc.Release()

beforeEngineDelay := timeutil.Now()
s.engine.PreIngestDelay(ctx)
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/limit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/util/limit",
visibility = ["//visibility:public"],
deps = [
"//pkg/util/quotapool",
"//pkg/util/tracing",
"@com_github_marusama_semaphore//:semaphore",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
39 changes: 18 additions & 21 deletions pkg/util/limit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,45 @@ package limit
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/marusama/semaphore"
"github.com/cockroachdb/errors"
)

// ConcurrentRequestLimiter wraps a simple semaphore, adding a tracing span when
// a request is forced to wait.
type ConcurrentRequestLimiter struct {
spanName string
sem semaphore.Semaphore
sem *quotapool.IntPool
}

// MakeConcurrentRequestLimiter creates a ConcurrentRequestLimiter.
func MakeConcurrentRequestLimiter(spanName string, limit int) ConcurrentRequestLimiter {
return ConcurrentRequestLimiter{spanName: spanName, sem: semaphore.New(limit)}
return ConcurrentRequestLimiter{
spanName: spanName,
sem: quotapool.NewIntPool(spanName, uint64(limit)),
}
}

// Begin attempts to reserve a spot in the pool, blocking if needed until the
// one is available or the context is canceled and adding a tracing span if it
// is forced to block.
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
func (l *ConcurrentRequestLimiter) Begin(ctx context.Context) (*quotapool.IntAlloc, error) {
if err := ctx.Err(); err != nil {
return nil, err
}

if l.sem.TryAcquire(1) {
return nil
alloc, err := l.sem.TryAcquire(ctx, 1)
if errors.Is(err, quotapool.ErrNotEnoughQuota) {
var span *tracing.Span
ctx, span = tracing.ChildSpan(ctx, l.spanName)
defer span.Finish()
alloc, err = l.sem.Acquire(ctx, 1)
}
// If not, start a span and begin waiting.
ctx, span := tracing.ChildSpan(ctx, l.spanName)
defer span.Finish()
return l.sem.Acquire(ctx, 1)
}

// Finish indicates a concurrent request has completed and its reservation can
// be returned to the pool.
func (l *ConcurrentRequestLimiter) Finish() {
l.sem.Release(1)
return alloc, err
}

// SetLimit adjusts the size of the pool.
func (l *ConcurrentRequestLimiter) SetLimit(newLimit int) {
l.sem.SetLimit(newLimit)
l.sem.UpdateCapacity(uint64(newLimit))
}
5 changes: 3 additions & 2 deletions pkg/util/limit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func TestConcurrentRequestLimiter(t *testing.T) {
req := 0
for {
//t.Logf("waiting to make request %d... (%d / %d)", req+1, l.sem.GetCount(), l.sem.GetLimit())
if err := l.Begin(ctx); err != nil {
alloc, err := l.Begin(ctx)
if err != nil {
if errors.Is(err, ctx.Err()) {
break
} else {
Expand All @@ -52,7 +53,7 @@ func TestConcurrentRequestLimiter(t *testing.T) {
cancel()
}
req++
l.Finish()
alloc.Release()
}
t.Logf("thread done after handling %d requests", req)
return nil
Expand Down

0 comments on commit 9cef2e9

Please sign in to comment.