diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index 7ceebecf4def..55c3102f4848 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -64,10 +64,26 @@ type resultsBuffer interface { // // /////////////////////////////////////////////////////////////////////////// - // add adds the provided Results into the buffer. If any Results are - // available to be returned to the client and there is a goroutine blocked - // in wait(), the goroutine is woken up. - add([]Result) + // Lock and Unlock expose methods on the mutex of the resultsBuffer. If the + // Streamer's mutex needs to be locked, then the Streamer's mutex must be + // acquired first. + Lock() + Unlock() + + // addLocked adds the provided Result into the buffer. Note that if the + // Result is available to be returned to the client and there is a goroutine + // blocked in wait(), the goroutine is **not** woken up - doneAddingLocked() + // has to be called. + // + // The combination of multiple addLocked() calls followed by a single + // doneAddingLocked() call allows us to simulate adding many Results at + // once, without having to allocate a slice for that. + addLocked(Result) + // doneAddingLocked notifies the resultsBuffer that the worker goroutine + // added all Results it could, and the resultsBuffer checks whether any + // Results are available to be returned to the client. If there is a + // goroutine blocked in wait(), the goroutine is woken up. + doneAddingLocked() /////////////////////////////////////////////////////////////////////////// // // @@ -155,11 +171,10 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e return nil } -func (b *resultsBufferBase) findCompleteResponses(results []Result) { - for i := range results { - if results[i].GetResp != nil || results[i].scanComplete { - b.numCompleteResponses++ - } +func (b *resultsBufferBase) checkIfCompleteLocked(r Result) { + b.Mutex.AssertHeld() + if r.GetResp != nil || r.scanComplete { + b.numCompleteResponses++ } } @@ -244,12 +259,15 @@ func (b *outOfOrderResultsBuffer) init(_ context.Context, numExpectedResponses i return nil } -func (b *outOfOrderResultsBuffer) add(results []Result) { - b.Lock() - defer b.Unlock() - b.results = append(b.results, results...) - b.findCompleteResponses(results) - b.numUnreleasedResults += len(results) +func (b *outOfOrderResultsBuffer) addLocked(r Result) { + b.Mutex.AssertHeld() + b.results = append(b.results, r) + b.checkIfCompleteLocked(r) + b.numUnreleasedResults++ +} + +func (b *outOfOrderResultsBuffer) doneAddingLocked() { + b.Mutex.AssertHeld() b.signal() } @@ -397,35 +415,32 @@ func (b *inOrderResultsBuffer) init(ctx context.Context, numExpectedResponses in return nil } -func (b *inOrderResultsBuffer) add(results []Result) { - b.Lock() - defer b.Unlock() +func (b *inOrderResultsBuffer) addLocked(r Result) { + b.Mutex.AssertHeld() // Note that we don't increase b.numUnreleasedResults because all these // results are "buffered". - b.findCompleteResponses(results) - foundHeadOfLine := false - for _, r := range results { - if debug { - subRequestIdx := "" - if !b.singleRowLookup { - subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) - } - fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) - } - // All the Results have already been registered with the budget, so - // we're keeping them in-memory. - heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) - if r.Position == b.headOfLinePosition && r.subRequestIdx == b.headOfLineSubRequestIdx { - foundHeadOfLine = true + b.checkIfCompleteLocked(r) + if debug { + subRequestIdx := "" + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) } + fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) } - if foundHeadOfLine { + // All the Results have already been registered with the budget, so we're + // keeping them in-memory. + heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) + b.addCounter++ +} + +func (b *inOrderResultsBuffer) doneAddingLocked() { + b.Mutex.AssertHeld() + if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx { if debug { fmt.Println("found head-of-the-line") } b.signal() } - b.addCounter++ } func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) { diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index ede110f30360..63f31baee688 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -113,13 +113,14 @@ func TestInOrderResultsBuffer(t *testing.T) { break } + b.Lock() numToAdd := rng.Intn(len(addOrder)) + 1 - toAdd := make([]Result, numToAdd) - for i := range toAdd { - toAdd[i] = results[addOrder[0]] + for i := 0; i < numToAdd; i++ { + b.addLocked(results[addOrder[0]]) addOrder = addOrder[1:] } - b.add(toAdd) + b.doneAddingLocked() + b.Unlock() // With 50% probability, try spilling some of the buffered results // to disk. diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index d798c28e4948..d2b2c43a2aa9 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -241,7 +241,8 @@ type Streamer struct { mu struct { // If the budget's mutex also needs to be locked, the budget's mutex - // must be acquired first. + // must be acquired first. If the results' mutex needs to be locked, + // then this mutex must be acquired first. syncutil.Mutex avgResponseEstimator avgResponseEstimator @@ -1178,10 +1179,17 @@ type singleRangeBatchResponseFootprint struct { memoryFootprintBytes int64 // resumeReqsMemUsage tracks the memory usage of the requests for the // ResumeSpans. - resumeReqsMemUsage int64 + resumeReqsMemUsage int64 + // numGetResults and numScanResults indicate how many Result objects will + // need to be created for Get and Scan responses, respectively. + numGetResults, numScanResults int numIncompleteGets, numIncompleteScans int } +func (fp singleRangeBatchResponseFootprint) hasResults() bool { + return fp.numGetResults > 0 || fp.numScanResults > 0 +} + func (fp singleRangeBatchResponseFootprint) hasIncomplete() bool { return fp.numIncompleteGets > 0 || fp.numIncompleteScans > 0 } @@ -1218,6 +1226,7 @@ func calculateFootprint( } else { // This Get was completed. fp.memoryFootprintBytes += getResponseSize(get) + fp.numGetResults++ } case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) @@ -1234,6 +1243,9 @@ func calculateFootprint( if len(scan.BatchResponses) > 0 { fp.memoryFootprintBytes += scanResponseSize(scan) } + if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil { + fp.numScanResults++ + } if scan.ResumeSpan != nil { // This Scan wasn't completed. scanRequestScratch.SetSpan(*scan.ResumeSpan) @@ -1276,8 +1288,46 @@ func processSingleRangeResults( br *roachpb.BatchResponse, fp singleRangeBatchResponseFootprint, ) { - var results []Result - var hasNonEmptyScanResponse bool + // If there are no results, this function has nothing to do. + if !fp.hasResults() { + return + } + + // We will add some Results into the results buffer, and doneAddingLocked() + // call below requires that the budget's mutex is held. It also must be + // acquired before the streamer's mutex is locked, so we have to do this + // right away. + // TODO(yuzefovich): check whether the lock contention on this mutex is + // noticeable and possibly refactor the code so that the budget's mutex is + // only acquired for the duration of doneAddingLocked(). + s.budget.mu.Lock() + defer s.budget.mu.Unlock() + s.mu.Lock() + + // TODO(yuzefovich): some of the responses might be partial, yet the + // estimator doesn't distinguish the footprint of the full response vs + // the partial one. Think more about this. + s.mu.avgResponseEstimator.update( + fp.memoryFootprintBytes, int64(fp.numGetResults+fp.numScanResults), + ) + + // If we have any Scan results to create and the Scan requests can return + // multiple rows, we'll need to consult s.mu.numRangesPerScanRequest, so + // we'll defer unlocking the streamer's mutex. However, if only Get results + // or Scan results of single rows will be created, we can unlock the + // streamer's mutex right away. + if fp.numScanResults > 0 && !s.hints.SingleRowLookup { + defer s.mu.Unlock() + } else { + s.mu.Unlock() + } + + // Now we can get the resultsBuffer's mutex - it must be acquired after + // the Streamer's one. + s.results.Lock() + defer s.results.Unlock() + defer s.results.doneAddingLocked() + // memoryTokensBytes accumulates all reservations that are made for all // Results created below. The accounting for these reservations has already // been performed, and memoryTokensBytes should be exactly equal to @@ -1307,7 +1357,14 @@ func processSingleRangeResults( result.memoryTok.streamer = s result.memoryTok.toRelease = getResponseSize(get) memoryTokensBytes += result.memoryTok.toRelease - results = append(results, result) + if buildutil.CrdbTestBuild { + if fp.numGetResults == 0 { + panic(errors.AssertionFailedf( + "unexpectedly found a non-empty GetResponse when numGetResults is zero", + )) + } + } + s.results.addLocked(result) case *roachpb.ScanRequest: scan := reply.(*roachpb.ScanResponse) @@ -1332,12 +1389,30 @@ func processSingleRangeResults( memoryTokensBytes += result.memoryTok.toRelease result.ScanResp = scan if s.hints.SingleRowLookup { - // When SingleRowLookup is false, scanComplete field will be - // set in finalizeSingleRangeResults(). result.scanComplete = true + } else if scan.ResumeSpan == nil { + // The scan within the range is complete. + if s.mode == OutOfOrder { + s.mu.numRangesPerScanRequest[position]-- + result.scanComplete = s.mu.numRangesPerScanRequest[position] == 0 + } else { + // In InOrder mode, the scan is marked as complete when the + // last sub-request is satisfied. Note that it is ok if the + // previous sub-requests haven't been satisfied yet - the + // inOrderResultsBuffer will not emit this Result until the + // previous sub-requests are responded to. + numSubRequests := s.mu.numRangesPerScanRequest[position] + result.scanComplete = result.subRequestIdx+1 == numSubRequests + } + } + if buildutil.CrdbTestBuild { + if fp.numScanResults == 0 { + panic(errors.AssertionFailedf( + "unexpectedly found a ScanResponse when numScanResults is zero", + )) + } } - results = append(results, result) - hasNonEmptyScanResponse = true + s.results.addLocked(result) } } @@ -1349,86 +1424,6 @@ func processSingleRangeResults( )) } } - - if len(results) > 0 { - finalizeSingleRangeResults( - s, results, fp.memoryFootprintBytes, hasNonEmptyScanResponse, - ) - } -} - -// finalizeSingleRangeResults "finalizes" the results of evaluation of a -// singleRangeBatch. By "finalization" we mean setting scanComplete field of -// ScanResp to correct value for all scan responses (when Hints.SingleRowLookup -// is false), updating the estimate of an average response size, and telling the -// Streamer about these results. -// -// This method assumes that results has length greater than zero. -func finalizeSingleRangeResults( - s *Streamer, results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool, -) { - if buildutil.CrdbTestBuild { - if len(results) == 0 { - panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results")) - } - } - s.mu.Lock() - defer s.mu.Unlock() - - // If we have non-empty scan response, it might be complete. This will be - // the case when a scan response doesn't have a resume span and there are no - // other scan requests in flight (involving other ranges) that are part of - // the same original ScanRequest. - // - // We need to do this check as well as adding the results to be returned to - // the client as an atomic operation so that scanComplete is set to true - // only on the last partial scan response. - // - // However, if we got a hint that each lookup produces a single row, then we - // know that no original ScanRequest can span multiple ranges, so - // scanComplete field has already been set correctly. - if hasNonEmptyScanResponse && !s.hints.SingleRowLookup { - for i := range results { - if results[i].ScanResp != nil { - if results[i].ScanResp.ResumeSpan == nil { - // The scan within the range is complete. - if s.mode == OutOfOrder { - s.mu.numRangesPerScanRequest[results[i].Position]-- - if s.mu.numRangesPerScanRequest[results[i].Position] == 0 { - // The scan across all ranges is now complete too. - results[i].scanComplete = true - } - } else { - // In InOrder mode, the scan is marked as complete when - // the last sub-request is satisfied. Note that it is ok - // if the previous sub-requests haven't been satisfied - // yet - the inOrderResultsBuffer will not emit this - // Result until the previous sub-requests are responded - // to. - numSubRequests := s.mu.numRangesPerScanRequest[results[i].Position] - results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests - } - } else { - // Unset the ResumeSpan on the result in order to not - // confuse the user of the Streamer. Non-nil resume span was - // already included into resumeReq populated in - // performRequestAsync. - results[i].ScanResp.ResumeSpan = nil - } - } - } - } - - // Update the average response size based on this batch. - // TODO(yuzefovich): some of the responses might be partial, yet the - // estimator doesn't distinguish the footprint of the full response vs the - // partial one. Think more about this. - s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) - if debug { - printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup - fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation) - } - s.results.add(results) } // buildResumeSingleRangeBatch consumes a BatchResponse for a singleRangeBatch