Skip to content

Commit

Permalink
kvstreamer: fix a possible deadlock with high concurrency
Browse files Browse the repository at this point in the history
Previously, it was possible for the `Streamer` to get into a deadlock
situation when async requests have exausted the concurrency limit. This
could happen because:
- the worker coordinator goroutine is holding the budget's mutex when it
is issuing new async requests. When the semaphore is at its limit, the
worker coordinator would block trying to acquire the quota of 1 for
spinning up a new goroutine;
- however, the quota would never open up because all concurrency async
requests could get stuck trying to release some memory back to the
budget.

This is now fixed by teaching the worker coordinator to proactively
check how many requests it can issue (i.e. how much quota it can get
from the semaphore) and issuing no more than that number.

Additionally, a minor change to the concurrency cluster setting is made
to allow only positive values (0 should be prohibited).

Release note: None
  • Loading branch information
yuzefovich authored and maryliag committed Feb 28, 2022
1 parent 6c9f670 commit f5153e4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 8 deletions.
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvstreamer/large_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func TestLargeKeys(t *testing.T) {
// here.
_, err := db.Exec("SET distsql_workmem='100KiB'")
require.NoError(t, err)
// To improve the test coverage, occasionally lower the maximum number of
// concurrent requests.
if rng.Float64() < 0.25 {
_, err = db.Exec("SET CLUSTER SETTING kv.streamer.concurrency_limit = $1", rng.Intn(10)+1)
require.NoError(t, err)
}
// In both engines, the index joiner will buffer input rows up to a quarter
// of workmem limit, so we have several interesting options for the blob
// size:
Expand Down
65 changes: 57 additions & 8 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ var streamerConcurrencyLimit = settings.RegisterIntSetting(
"kv.streamer.concurrency_limit",
"maximum number of asynchronous requests by a single streamer",
max(128, int64(8*runtime.GOMAXPROCS(0))),
settings.NonNegativeInt,
settings.PositiveInt,
)

func max(a, b int64) int64 {
Expand Down Expand Up @@ -822,7 +822,13 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) {
return
}

err := w.issueRequestsForAsyncProcessing(ctx, requestsToServe, avgResponseSize)
// Now check how many requests we can issue.
maxNumRequestsToIssue, shouldExit := w.getMaxNumRequestsToIssue(ctx)
if shouldExit {
return
}

err := w.issueRequestsForAsyncProcessing(ctx, requestsToServe, maxNumRequestsToIssue, avgResponseSize)
if err != nil {
w.s.setError(err)
return
Expand Down Expand Up @@ -914,19 +920,58 @@ func (w *workerCoordinator) waitUntilEnoughBudget(
return false
}

// getMaxNumRequestsToIssue returns the maximum number of new async requests the
// worker coordinator can issue without exceeding streamerConcurrencyLimit
// limit. It blocks until at least one request can be issued.
//
// This behavior is needed to ensure that the creation of a new async task in
// performRequestAsync doesn't block on w.asyncSem. If it did block, then we
// could get into a deadlock because the main goroutine of the worker
// coordinator is holding the budget's mutex waiting for quota to open up while
// all asynchronous requests that could free up that quota would block on
// attempting to acquire the budget's mutex.
//
// A boolean that indicates whether the coordinator should exit is also
// returned.
func (w *workerCoordinator) getMaxNumRequestsToIssue(ctx context.Context) (_ int, shouldExit bool) {
// Since the worker coordinator goroutine is the only one acquiring quota
// from the semaphore, ApproximateQuota returns the precise quota at the
// moment.
q := w.asyncSem.ApproximateQuota()
if q > 0 {
return int(q), false
}
// The whole quota is currently used up, so we blockingly acquire a quota of
// 1.
alloc, err := w.asyncSem.Acquire(ctx, 1)
if err != nil {
w.s.setError(err)
return 0, true
}
alloc.Release()
return 1, false
}

// issueRequestsForAsyncProcessing iterates over the given requests and issues
// them to be served asynchronously while there is enough budget available to
// receive the responses. Once the budget is exhausted, no new requests are
// issued, the only exception is made for the case when there are no requests in
// progress (both requests in flight as well as unreleased results), and in that
// scenario, a single request will be issued.
//
// maxNumRequestsToIssue specifies the maximum number of requests that can be
// issued as part of this call. The caller guarantees that w.asyncSem has at
// least that much quota available.
//
// It is assumed that requestsToServe is a prefix of w.s.mu.requestsToServe
// (i.e. it is possible that some other requests have been appended to
// w.s.mu.requestsToServe after requestsToServe have been grabbed). All issued
// requests are removed from w.s.mu.requestToServe.
func (w *workerCoordinator) issueRequestsForAsyncProcessing(
ctx context.Context, requestsToServe []singleRangeBatch, avgResponseSize int64,
ctx context.Context,
requestsToServe []singleRangeBatch,
maxNumRequestsToIssue int,
avgResponseSize int64,
) error {
var numRequestsIssued int
defer func() {
Expand All @@ -950,7 +995,7 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(

headOfLine := w.s.getNumRequestsInProgress() == 0
var budgetIsExhausted bool
for numRequestsIssued < len(requestsToServe) && !budgetIsExhausted {
for numRequestsIssued < len(requestsToServe) && numRequestsIssued < maxNumRequestsToIssue && !budgetIsExhausted {
singleRangeReqs := requestsToServe[numRequestsIssued]
availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used()
// minAcceptableBudget is the minimum TargetBytes limit with which it
Expand Down Expand Up @@ -1103,6 +1148,9 @@ const AsyncRequestOp = "streamer-lookup-async"
// memory limitBytes), the "resume" single-range batch will be added into
// requestsToServe, and mainLoop will pick that up to process later.
//
// The caller is responsible for ensuring that there is enough quota in
// w.asyncSem to spin up a new goroutine for this request.
//
// targetBytes specifies the memory budget that this single-range batch should
// be issued with. targetBytes bytes have already been consumed from the budget,
// and this amount of memory is owned by the goroutine that is spun up to
Expand All @@ -1125,10 +1173,11 @@ func (w *workerCoordinator) performRequestAsync(
if err := w.s.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
TaskName: AsyncRequestOp,
SpanOpt: stop.ChildSpan,
Sem: w.asyncSem,
WaitForSem: true,
TaskName: AsyncRequestOp,
SpanOpt: stop.ChildSpan,
// Note that we don't wait for the semaphore since it's the caller's
// responsibility to ensure that a new goroutine can be spun up.
Sem: w.asyncSem,
},
func(ctx context.Context) {
defer w.asyncRequestCleanup(false /* budgetMuAlreadyLocked */)
Expand Down

0 comments on commit f5153e4

Please sign in to comment.