diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index 345c97129086..cb004ae44c53 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -24,8 +24,6 @@ import ( // singleRangeBatch contains parts of the originally enqueued requests that have // been truncated to be within a single range. All requests within the // singleRangeBatch will be issued as a single BatchRequest. -// TODO(yuzefovich): perform memory accounting for slices other than reqs in -// singleRangeBatch. type singleRangeBatch struct { reqs []roachpb.RequestUnion // reqsKeys stores the start key of the corresponding request in reqs. It is @@ -81,9 +79,10 @@ type singleRangeBatch struct { // the memory usage of reqs, excluding the overhead. reqsReservedBytes int64 // overheadAccountedFor tracks the memory reservation against the budget for - // the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects). - // Since we reuse the same reqs slice for resume requests, this can be - // released only when the BatchResponse doesn't have any resume spans. + // the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects) as + // well as the positions and the subRequestIdx slices. Since we reuse these + // slices for the resume requests, this can be released only when the + // BatchResponse doesn't have any resume spans. // // RequestUnion.Size() ignores the overhead of RequestUnion object, so we // need to account for it separately. @@ -212,6 +211,9 @@ type requestsProviderBase struct { hasWork *sync.Cond // requests contains all single-range sub-requests that have yet to be // served. + // TODO(yuzefovich): this memory is not accounted for. However, the number + // of singleRangeBatch objects in flight is limited by the number of ranges + // of a single table, so it doesn't seem urgent to fix the accounting here. requests []singleRangeBatch // done is set to true once the Streamer is Close()'d. done bool diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index 857eed6b28d2..7e77fed21c78 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -13,6 +13,7 @@ package kvstreamer import ( "context" "fmt" + "unsafe" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" "github.com/cockroachdb/cockroach/pkg/util/buildutil" @@ -84,7 +85,9 @@ type resultsBuffer interface { // added all Results it could, and the resultsBuffer checks whether any // Results are available to be returned to the client. If there is a // goroutine blocked in wait(), the goroutine is woken up. - doneAddingLocked() + // + // It is assumed that the budget's mutex is already being held. + doneAddingLocked(context.Context) /////////////////////////////////////////////////////////////////////////// // // @@ -143,7 +146,11 @@ type resultsBufferBase struct { // hasResults is used in wait() to block until there are some results to be // picked up. hasResults chan struct{} - err error + // overheadAccountedFor tracks how much overhead space for the Results in + // this results buffer has been consumed from the budget. Note that this + // does not include the footprint of Get and Scan responses. + overheadAccountedFor int64 + err error } func newResultsBufferBase(budget *budget) *resultsBufferBase { @@ -179,6 +186,23 @@ func (b *resultsBufferBase) checkIfCompleteLocked(r Result) { } } +func (b *resultsBufferBase) accountForOverheadLocked(ctx context.Context, overheadMemUsage int64) { + b.budget.mu.AssertHeld() + b.Mutex.AssertHeld() + if overheadMemUsage > b.overheadAccountedFor { + // We're allowing the budget to go into debt here since the results + // buffer doesn't have a way to push back on the Results. It would also + // be unfortunate to discard these Results - instead, we rely on the + // worker coordinator to make sure the budget gets out of debt. + if err := b.budget.consumeLocked(ctx, overheadMemUsage-b.overheadAccountedFor, true /* allowDebt */); err != nil { + b.setErrorLocked(err) + } + } else { + b.budget.releaseLocked(ctx, b.overheadAccountedFor-overheadMemUsage) + } + b.overheadAccountedFor = overheadMemUsage +} + // signal non-blockingly sends on hasResults channel. func (b *resultsBufferBase) signal() { select { @@ -267,8 +291,10 @@ func (b *outOfOrderResultsBuffer) addLocked(r Result) { b.numUnreleasedResults++ } -func (b *outOfOrderResultsBuffer) doneAddingLocked() { - b.Mutex.AssertHeld() +const resultSize = int64(unsafe.Sizeof(Result{})) + +func (b *outOfOrderResultsBuffer) doneAddingLocked(ctx context.Context) { + b.accountForOverheadLocked(ctx, int64(cap(b.results))*resultSize) b.signal() } @@ -276,6 +302,9 @@ func (b *outOfOrderResultsBuffer) get(context.Context) ([]Result, bool, error) { b.Lock() defer b.Unlock() results := b.results + // Note that although we're losing the reference to the Results slice, we + // still keep the overhead of the slice accounted for with the budget. This + // is done as a way of "amortizing" the reservation. b.results = nil allComplete := b.numCompleteResponses == b.numExpectedResponses return results, allComplete, b.err @@ -470,8 +499,12 @@ func (b *inOrderResultsBuffer) addLocked(r Result) { b.addCounter++ } -func (b *inOrderResultsBuffer) doneAddingLocked() { - b.Mutex.AssertHeld() +const inOrderBufferedResultSize = int64(unsafe.Sizeof(inOrderBufferedResult{})) + +func (b *inOrderResultsBuffer) doneAddingLocked(ctx context.Context) { + overhead := int64(cap(b.buffered))*inOrderBufferedResultSize + // b.buffered + int64(cap(b.resultScratch))*resultSize // b.resultsScratch + b.accountForOverheadLocked(ctx, overhead) if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx { if debug { fmt.Println("found head-of-the-line") diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index 63f31baee688..07ec31748fbd 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -113,14 +113,16 @@ func TestInOrderResultsBuffer(t *testing.T) { break } + budget.mu.Lock() b.Lock() numToAdd := rng.Intn(len(addOrder)) + 1 for i := 0; i < numToAdd; i++ { b.addLocked(results[addOrder[0]]) addOrder = addOrder[1:] } - b.doneAddingLocked() + b.doneAddingLocked(ctx) b.Unlock() + budget.mu.Unlock() // With 50% probability, try spilling some of the buffered results // to disk. diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index fd8c63175aa3..d7b1cf97eed5 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -234,6 +234,14 @@ type Streamer struct { // been returned by GetResults() yet. results resultsBuffer + // numRangesPerScanRequestAccountedFor tracks how much space has been + // consumed from the budget in order to account for the + // numRangesPerScanRequest slice. + // + // It is only accessed from the Streamer's user goroutine, so it doesn't + // need the mutex protection. + numRangesPerScanRequestAccountedFor int64 + mu struct { // If the budget's mutex also needs to be locked, the budget's mutex // must be acquired first. If the results' mutex needs to be locked, @@ -252,7 +260,6 @@ type Streamer struct { // // It is allocated lazily if Hints.SingleRowLookup is false when the // first ScanRequest is encountered in Enqueue. - // TODO(yuzefovich): perform memory accounting for this. numRangesPerScanRequest []int32 // numRequestsInFlight tracks the number of single-range batches that @@ -364,6 +371,14 @@ func (s *Streamer) Init( s.maxKeysPerRow = int32(maxKeysPerRow) } +const ( + requestUnionSliceOverhead = int64(unsafe.Sizeof([]roachpb.RequestUnion{})) + intSliceOverhead = int64(unsafe.Sizeof([]int{})) + intSize = int64(unsafe.Sizeof(int(0))) + int32SliceOverhead = int64(unsafe.Sizeof([]int32{})) + int32Size = int64(unsafe.Sizeof(int32(0))) +) + // Enqueue dispatches multiple requests for execution. Results are delivered // through the GetResults call. // @@ -431,6 +446,9 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re // requestsProvider right away. This is needed in order for the worker // coordinator to not pick up any work until we account for // totalReqsMemUsage. + // TODO(yuzefovich): this memory is not accounted for. However, the number + // of singleRangeBatch objects in flight is limited by the number of ranges + // of a single table, so it doesn't seem urgent to fix the accounting here. var requestsToServe []singleRangeBatch seekKey := rs.Key const scanDir = kvcoord.Ascending @@ -447,6 +465,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re } }() var reqsKeysScratch []roachpb.Key + var newNumRangesPerScanRequestMemoryUsage int64 for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) { // Truncate the request span to the current range. singleRangeSpan, err := rs.Intersect(ri.Token().Desc()) @@ -459,6 +478,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re return err } var subRequestIdx []int32 + var subRequestIdxOverhead int64 if !s.hints.SingleRowLookup { for i, pos := range positions { if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan { @@ -470,6 +490,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re s.mu.Lock() if cap(s.mu.numRangesPerScanRequest) < len(reqs) { s.mu.numRangesPerScanRequest = make([]int32, len(reqs)) + newNumRangesPerScanRequestMemoryUsage = int64(cap(s.mu.numRangesPerScanRequest)) * int32Size } else { // We can reuse numRangesPerScanRequest allocated on // the previous call to Enqueue after we zero it @@ -483,6 +504,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re if s.mode == InOrder { if subRequestIdx == nil { subRequestIdx = make([]int32, len(singleRangeReqs)) + subRequestIdxOverhead = int32SliceOverhead + int32Size*int64(cap(subRequestIdx)) } subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos] } @@ -496,12 +518,15 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re //if !s.hints.UniqueRequests { //} + overheadAccountedFor := requestUnionSliceOverhead + requestUnionOverhead*int64(cap(singleRangeReqs)) + // reqs + intSliceOverhead + intSize*int64(cap(positions)) + // positions + subRequestIdxOverhead // subRequestIdx r := singleRangeBatch{ reqs: singleRangeReqs, positions: positions, subRequestIdx: subRequestIdx, reqsReservedBytes: requestsMemUsage(singleRangeReqs), - overheadAccountedFor: requestUnionOverhead * int64(cap(singleRangeReqs)), + overheadAccountedFor: overheadAccountedFor, } totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor @@ -550,11 +575,16 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re streamerLocked = false } + toConsume := totalReqsMemUsage + if newNumRangesPerScanRequestMemoryUsage != 0 && newNumRangesPerScanRequestMemoryUsage != s.numRangesPerScanRequestAccountedFor { + toConsume += newNumRangesPerScanRequestMemoryUsage - s.numRangesPerScanRequestAccountedFor + s.numRangesPerScanRequestAccountedFor = newNumRangesPerScanRequestMemoryUsage + } // We allow the budget to go into debt iff a single request was enqueued. // This is needed to support the case of arbitrarily large keys - the caller // is expected to produce requests with such cases one at a time. allowDebt := len(reqs) == 1 - if err = s.budget.consume(ctx, totalReqsMemUsage, allowDebt); err != nil { + if err = s.budget.consume(ctx, toConsume, allowDebt); err != nil { return err } @@ -1092,7 +1122,7 @@ func (w *workerCoordinator) performRequestAsync( reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage if resumeReqsMemUsage == 0 { // There will be no resume request, so we will lose the - // reference to the req.reqs slice and can release its memory + // reference to the slices in req and can release its memory // reservation. reqOveraccounted += req.overheadAccountedFor } @@ -1156,7 +1186,7 @@ func (w *workerCoordinator) performRequestAsync( // Finally, process the results and add the ResumeSpans to be // processed as well. if err := processSingleRangeResults( - w.s, req, br, memoryFootprintBytes, resumeReqsMemUsage, + ctx, w.s, req, br, memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans, numGetResults, numScanResults, ); err != nil { w.s.results.setError(err) @@ -1238,6 +1268,7 @@ func calculateFootprint( // It also assumes that the budget has already been reconciled with the // reservations for Results that will be created. func processSingleRangeResults( + ctx context.Context, s *Streamer, req singleRangeBatch, br *roachpb.BatchResponse, @@ -1249,7 +1280,7 @@ func processSingleRangeResults( numIncompleteRequests := numIncompleteGets + numIncompleteScans var resumeReq singleRangeBatch // We have to allocate the new Get and Scan requests, but we can reuse the - // reqs and the positions slices. + // reqs, the positions, and the subRequestIdx slices. resumeReq.reqs = req.reqs[:numIncompleteRequests] resumeReq.positions = req.positions[:0] resumeReq.subRequestIdx = req.subRequestIdx[:0] @@ -1293,6 +1324,15 @@ func processSingleRangeResults( }() } if numGetResults > 0 || numScanResults > 0 { + // We will add some Results into the results buffer, and + // doneAddingLocked() call below requires that the budget's mutex is + // held. It also must be acquired before the streamer's mutex is locked, + // so we have to do this right away. + // TODO(yuzefovich): check whether the lock contention on this mutex is + // noticeable and possibly refactor the code so that the budget's mutex + // is only acquired for the duration of doneAddingLocked(). + s.budget.mu.Lock() + defer s.budget.mu.Unlock() // We will create some Result objects, so we at least will need to // update the average response size estimate, guarded by the streamer's // lock. @@ -1317,7 +1357,7 @@ func processSingleRangeResults( // the Streamer's one. s.results.Lock() defer s.results.Unlock() - defer s.results.doneAddingLocked() + defer s.results.doneAddingLocked(ctx) } for i, resp := range br.Responses { position := req.positions[i]