Skip to content

Commit

Permalink
kvstreamer: minor cleanup
Browse files Browse the repository at this point in the history
This commit replaces the last buffered channel we had in the `Streamer`
code in favor of a condition variable which simplifies the control flow
a bit.

Additionally, this commit refactors the code so that empty Get responses
are not returned which allows us to clean up `TxnKVStreamer` a bit. Care
has to be taken so that we still signal the client's goroutine waiting
for results when an empty Get response comes back (similar to the bug
fixed by the previous commit).

Also, the tracking of the "number of outstanding requests" in
`TxnKVStreamer` has been removed since it provided little value (and
probably was broken anyway).

Release note: None
  • Loading branch information
yuzefovich committed Feb 9, 2022
1 parent d65a236 commit 4cc956f
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 114 deletions.
118 changes: 53 additions & 65 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,6 @@ type Streamer struct {

enqueueKeys []int

// waitForResults is used to block GetResults() call until some results are
// available.
waitForResults chan struct{}

mu struct {
// If the budget's mutex also needs to be locked, the budget's mutex
// must be acquired first.
Expand Down Expand Up @@ -261,6 +257,10 @@ type Streamer struct {
// by GetResults() to the caller which the caller hasn't processed yet.
numUnreleasedResults int

// hasResults is used by the client's goroutine to block until there are
// some results to be picked up.
hasResults *sync.Cond

// results are the results of already completed requests that haven't
// been returned by GetResults() yet.
results []Result
Expand Down Expand Up @@ -321,6 +321,7 @@ func NewStreamer(
budget: newBudget(acc, limitBytes),
}
s.mu.hasWork = sync.NewCond(&s.mu.Mutex)
s.mu.hasResults = sync.NewCond(&s.mu.Mutex)
s.coordinator = workerCoordinator{
s: s,
txn: txn,
Expand Down Expand Up @@ -362,7 +363,6 @@ func (s *Streamer) Init(mode OperationMode, hints Hints, maxKeysPerRow int) {
}
s.hints = hints
s.maxKeysPerRow = int32(maxKeysPerRow)
s.waitForResults = make(chan struct{}, 1)
}

// Enqueue dispatches multiple requests for execution. Results are delivered
Expand Down Expand Up @@ -583,45 +583,23 @@ func (s *Streamer) enqueueMemoryAccountingLocked(
// result slice is returned once all enqueued requests have been responded to.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
s.mu.Lock()
results := s.mu.results
err := s.mu.err
s.mu.results = nil
allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests
// Non-blockingly clear the waitForResults channel in case we've just picked
// up some results. We do so while holding the mutex so that new results
// aren't appended.
select {
case <-s.waitForResults:
default:
}
s.mu.Unlock()

if len(results) > 0 || allComplete || err != nil {
return results, err
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.waitForResults:
s.mu.Lock()
results = s.mu.results
err = s.mu.err
defer s.mu.Unlock()
for {
results := s.mu.results
s.mu.results = nil
s.mu.Unlock()
return results, err
}
}

// notifyGetResultsLocked non-blockingly sends a message on waitForResults
// channel. This method should be called only while holding the lock of s.mu so
// that other results couldn't be appended which would cause us to miss the
// notification about that.
func (s *Streamer) notifyGetResultsLocked() {
s.mu.AssertHeld()
select {
case s.waitForResults <- struct{}{}:
default:
allComplete := s.mu.numCompleteRequests == s.mu.numEnqueuedRequests
if len(results) > 0 || allComplete || s.mu.err != nil {
return results, s.mu.err
}
s.mu.hasResults.Wait()
// Check whether the Streamer has been canceled or closed while we were
// waiting for the results.
if err := ctx.Err(); err != nil {
// No need to use setErrorLocked here because the current goroutine
// is the only one blocking on hasResults condition variable.
s.mu.err = err
return nil, err
}
}
}

Expand All @@ -642,7 +620,7 @@ func (s *Streamer) setErrorLocked(err error) {
if s.mu.err == nil {
s.mu.err = err
}
s.notifyGetResultsLocked()
s.mu.hasResults.Signal()
}

// Close cancels all in-flight operations and releases all of the resources of
Expand All @@ -655,6 +633,11 @@ func (s *Streamer) Close() {
s.mu.done = true
// Unblock the coordinator in case it is waiting for more work.
s.mu.hasWork.Signal()
// Note that only the client's goroutine can be blocked waiting for the
// results, and Close() is called only by the same goroutine, so
// signaling hasResult condition variable isn't necessary. However, we
// choose to be safe and do it anyway.
s.mu.hasResults.Signal()
s.mu.Unlock()
// Unblock the coordinator in case it is waiting for the budget.
s.budget.mu.waitForBudget.Signal()
Expand Down Expand Up @@ -1265,17 +1248,22 @@ func (w *workerCoordinator) processSingleRangeResults(
resumeReqIdx++
} else {
// This Get was completed.
result := Result{
GetResp: get,
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
position: req.positions[i],
if get.Value != nil {
// Create a Result only for non-empty Get responses.
result := Result{
GetResp: get,
// This currently only works because all requests
// are unique.
EnqueueKeysSatisfied: []int{enqueueKey},
position: req.positions[i],
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
memoryTokensBytes += result.memoryTok.toRelease
results = append(results, result)
}
result.memoryTok.streamer = w.s
result.memoryTok.toRelease = getResponseSize(get)
memoryTokensBytes += result.memoryTok.toRelease
results = append(results, result)
// Note that we count this Get response as complete regardless
// of the fact whether it is empty or not.
numCompleteGetResponses++
}

Expand Down Expand Up @@ -1329,13 +1317,9 @@ func (w *workerCoordinator) processSingleRangeResults(
}
}

// If we have any results, finalize them.
if len(results) > 0 {
w.finalizeSingleRangeResults(
results, memoryFootprintBytes, hasNonEmptyScanResponse,
numCompleteGetResponses,
)
}
w.finalizeSingleRangeResults(
results, memoryFootprintBytes, hasNonEmptyScanResponse, numCompleteGetResponses,
)

// If we have any incomplete requests, add them back into the work
// pool.
Expand All @@ -1348,8 +1332,6 @@ func (w *workerCoordinator) processSingleRangeResults(
// singleRangeBatch. By "finalization" we mean setting Complete field of
// ScanResp to correct value for all scan responses, updating the estimate of an
// average response size, and telling the Streamer about these results.
//
// This method assumes that results has length greater than zero.
func (w *workerCoordinator) finalizeSingleRangeResults(
results []Result,
actualMemoryReservation int64,
Expand Down Expand Up @@ -1397,9 +1379,15 @@ func (w *workerCoordinator) finalizeSingleRangeResults(
w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results)))
w.s.mu.numCompleteRequests += numCompleteResponses
w.s.mu.numUnreleasedResults += len(results)
// Store the results and non-blockingly notify the Streamer about them.
w.s.mu.results = append(w.s.mu.results, results...)
w.s.notifyGetResultsLocked()
if len(results) > 0 || numCompleteResponses > 0 {
// We want to signal the condition variable when either we have some
// results to return to the client or we received some empty responses.
// The latter is needed so that the client doesn't block forever
// thinking there are more requests in flight when, in fact, all
// responses have already come back empty.
w.s.mu.hasResults.Signal()
}
}

var zeroIntSlice []int
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func TestStreamerLimitations(t *testing.T) {
streamer := getStreamer()
defer streamer.Close()
streamer.Init(OutOfOrder, Hints{UniqueRequests: true}, 1 /* maxKeysPerRow */)
get := roachpb.NewGet(roachpb.Key("key"), false /* forUpdate */)
// Use a Scan request for this test case because Gets of non-existent
// keys aren't added to the results.
scan := roachpb.NewScan(roachpb.Key("key"), roachpb.Key("key1"), false /* forUpdate */)
reqs := []roachpb.RequestUnion{{
Value: &roachpb.RequestUnion_Get{
Get: get.(*roachpb.GetRequest),
Value: &roachpb.RequestUnion_Scan{
Scan: scan.(*roachpb.ScanRequest),
},
}}
require.NoError(t, streamer.Enqueue(ctx, reqs, nil /* enqueueKeys */))
Expand Down
58 changes: 12 additions & 46 deletions pkg/sql/row/kv_batch_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type TxnKVStreamer struct {
streamer *kvstreamer.Streamer
spans roachpb.Spans

// numOutstandingRequests tracks the number of requests that haven't been
// fully responded to yet.
numOutstandingRequests int

// getResponseScratch is reused to return the result of Get requests.
getResponseScratch [1]roachpb.KeyValue

Expand Down Expand Up @@ -82,9 +78,8 @@ func NewTxnKVStreamer(
return nil, err
}
return &TxnKVStreamer{
streamer: streamer,
spans: spans,
numOutstandingRequests: len(spans),
streamer: streamer,
spans: spans,
}, nil
}

Expand All @@ -93,46 +88,30 @@ func NewTxnKVStreamer(
// GetResponses).
func (f *TxnKVStreamer) proceedWithLastResult(
ctx context.Context,
) (skip bool, kvs []roachpb.KeyValue, batchResp []byte, err error) {
) (kvs []roachpb.KeyValue, batchResp []byte, err error) {
result := f.lastResultState.Result
if get := result.GetResp; get != nil {
if get.IntentValue != nil {
return false, nil, nil, errors.AssertionFailedf(
return nil, nil, errors.AssertionFailedf(
"unexpectedly got an IntentValue back from a SQL GetRequest %v", *get.IntentValue,
)
}
if get.Value == nil {
// Nothing found in this particular response, so we skip it.
f.releaseLastResult(ctx)
return true, nil, nil, nil
}
pos := result.EnqueueKeysSatisfied[f.lastResultState.numEmitted]
origSpan := f.spans[pos]
f.lastResultState.numEmitted++
f.numOutstandingRequests--
f.getResponseScratch[0] = roachpb.KeyValue{Key: origSpan.Key, Value: *get.Value}
return false, f.getResponseScratch[:], nil, nil
return f.getResponseScratch[:], nil, nil
}
scan := result.ScanResp
if len(scan.BatchResponses) > 0 {
batchResp, f.lastResultState.remainingBatches = scan.BatchResponses[0], scan.BatchResponses[1:]
}
if len(f.lastResultState.remainingBatches) == 0 {
f.processedScanResponse()
f.lastResultState.numEmitted++
}
// Note that scan.Rows and batchResp might be nil when the ScanResponse is
// empty, and the caller (the KVFetcher) will skip over it.
return false, scan.Rows, batchResp, nil
}

// processedScanResponse updates the lastResultState before emitting the last
// part of the ScanResponse. This method should be called for each request that
// the ScanResponse satisfies.
func (f *TxnKVStreamer) processedScanResponse() {
f.lastResultState.numEmitted++
if f.lastResultState.ScanResp.Complete {
f.numOutstandingRequests--
}
return scan.Rows, batchResp, nil
}

func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) {
Expand All @@ -145,17 +124,11 @@ func (f *TxnKVStreamer) releaseLastResult(ctx context.Context) {
func (f *TxnKVStreamer) nextBatch(
ctx context.Context,
) (ok bool, kvs []roachpb.KeyValue, batchResp []byte, err error) {
if f.numOutstandingRequests == 0 {
// All requests have already been responded to.
f.releaseLastResult(ctx)
return false, nil, nil, nil
}

// Check whether there are more batches in the current ScanResponse.
if len(f.lastResultState.remainingBatches) > 0 {
batchResp, f.lastResultState.remainingBatches = f.lastResultState.remainingBatches[0], f.lastResultState.remainingBatches[1:]
if len(f.lastResultState.remainingBatches) == 0 {
f.processedScanResponse()
f.lastResultState.numEmitted++
}
return true, nil, batchResp, nil
}
Expand All @@ -164,7 +137,7 @@ func (f *TxnKVStreamer) nextBatch(
if f.lastResultState.numEmitted < len(f.lastResultState.EnqueueKeysSatisfied) {
// Note that we should never get an error here since we're processing
// the same result again.
_, kvs, batchResp, err = f.proceedWithLastResult(ctx)
kvs, batchResp, err = f.proceedWithLastResult(ctx)
return true, kvs, batchResp, err
}

Expand All @@ -174,7 +147,7 @@ func (f *TxnKVStreamer) nextBatch(
}

// Process the next result we have already received from the streamer.
for len(f.results) > 0 {
if len(f.results) > 0 {
// Peel off the next result and set it into lastResultState.
f.lastResultState.Result = f.results[0]
f.lastResultState.numEmitted = 0
Expand All @@ -183,15 +156,8 @@ func (f *TxnKVStreamer) nextBatch(
// the next iteration.
f.results[0] = kvstreamer.Result{}
f.results = f.results[1:]
var skip bool
skip, kvs, batchResp, err = f.proceedWithLastResult(ctx)
if err != nil {
return false, nil, nil, err
}
if skip {
continue
}
return true, kvs, batchResp, nil
kvs, batchResp, err = f.proceedWithLastResult(ctx)
return true, kvs, batchResp, err
}

// Get more results from the streamer. This call will block until some
Expand Down

0 comments on commit 4cc956f

Please sign in to comment.