diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index e416191487a8..fda9b48bec1e 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -79,8 +79,16 @@ type singleRangeBatch struct { // Hints.SingleRowLookup is false and some Scan requests were enqueued. subRequestIdx []int32 // reqsReservedBytes tracks the memory reservation against the budget for - // the memory usage of reqs. + // the memory usage of reqs, excluding the overhead. reqsReservedBytes int64 + // overheadAccountedFor tracks the memory reservation against the budget for + // the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects). + // Since we reuse the same reqs slice for resume requests, this can be + // released only when the BatchResponse doesn't have any resume spans. + // + // RequestUnion.Size() ignores the overhead of RequestUnion object, so we + // need to account for it separately. + overheadAccountedFor int64 // minTargetBytes, if positive, indicates the minimum TargetBytes limit that // this singleRangeBatch should be sent with in order for the response to // not be empty. Note that TargetBytes of at least minTargetBytes is diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 6418c7e3964b..3a7dd8ddb366 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -501,12 +501,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re //} r := singleRangeBatch{ - reqs: singleRangeReqs, - positions: positions, - subRequestIdx: subRequestIdx, - reqsReservedBytes: requestsMemUsage(singleRangeReqs), + reqs: singleRangeReqs, + positions: positions, + subRequestIdx: subRequestIdx, + reqsReservedBytes: requestsMemUsage(singleRangeReqs), + overheadAccountedFor: requestUnionOverhead * int64(cap(singleRangeReqs)), } - totalReqsMemUsage += r.reqsReservedBytes + totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor if s.mode == OutOfOrder { // Sort all single-range requests to be in the key order. @@ -1090,6 +1091,12 @@ func (w *workerCoordinator) performRequestAsync( // non-empty responses as well as resume spans, if any. respOverestimate := targetBytes - memoryFootprintBytes reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage + if resumeReqsMemUsage == 0 { + // There will be no resume request, so we will lose the + // reference to the req.reqs slice and can release its memory + // reservation. + reqOveraccounted += req.overheadAccountedFor + } overaccountedTotal := respOverestimate + reqOveraccounted if overaccountedTotal >= 0 { w.s.budget.release(ctx, overaccountedTotal) @@ -1213,9 +1220,6 @@ func calculateFootprint( } } } - // This addendum is the first step of requestsMemUsage() and we've already - // added the size of each resume request above. - resumeReqsMemUsage += requestUnionOverhead * int64(numIncompleteGets+numIncompleteScans) return memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans } @@ -1223,7 +1227,7 @@ func calculateFootprint( // in the BatchResponse. The ResumeSpans, if found, are added into a new // singleRangeBatch request that is added to be picked up by the mainLoop of the // worker coordinator. This method assumes that req is no longer needed by the -// caller, so req.positions is reused for the ResumeSpans. +// caller, so the slices from req are reused for the ResumeSpans. // // It also assumes that the budget has already been reconciled with the // reservations for Results that will be created. @@ -1236,14 +1240,15 @@ func (w *workerCoordinator) processSingleRangeResults( ) error { numIncompleteRequests := numIncompleteGets + numIncompleteScans var resumeReq singleRangeBatch - // We have to allocate the new slice for requests, but we can reuse the - // positions slice. - resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests) + // We have to allocate the new Get and Scan requests, but we can reuse the + // reqs and the positions slices. + resumeReq.reqs = req.reqs[:numIncompleteRequests] resumeReq.positions = req.positions[:0] resumeReq.subRequestIdx = req.subRequestIdx[:0] // We've already reconciled the budget with the actual reservation for the // requests with the ResumeSpans. resumeReq.reqsReservedBytes = resumeReqsMemUsage + resumeReq.overheadAccountedFor = req.overheadAccountedFor gets := make([]struct { req roachpb.GetRequest union roachpb.RequestUnion_Get @@ -1415,7 +1420,14 @@ func (w *workerCoordinator) processSingleRangeResults( // If we have any incomplete requests, add them back into the work // pool. - if len(resumeReq.reqs) > 0 { + if numIncompleteRequests > 0 { + // Make sure to nil out old requests that we didn't include into the + // resume request. We don't have to do this if there aren't any + // incomplete requests since req and resumeReq will be garbage collected + // on their own. + for i := numIncompleteRequests; i < len(req.reqs); i++ { + req.reqs[i] = roachpb.RequestUnion{} + } w.s.requestsToServe.add(resumeReq) } @@ -1504,12 +1516,7 @@ func init() { const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{})) -func requestsMemUsage(reqs []roachpb.RequestUnion) int64 { - // RequestUnion.Size() ignores the overhead of RequestUnion object, so we'll - // account for it separately first. - memUsage := requestUnionOverhead * int64(cap(reqs)) - // No need to account for elements past len(reqs) because those must be - // unset and we have already accounted for RequestUnion object above. +func requestsMemUsage(reqs []roachpb.RequestUnion) (memUsage int64) { for _, r := range reqs { memUsage += int64(r.Size()) } diff --git a/pkg/sql/colfetcher/index_join.go b/pkg/sql/colfetcher/index_join.go index 0b94d2b5b3ef..22c9c2426ab2 100644 --- a/pkg/sql/colfetcher/index_join.go +++ b/pkg/sql/colfetcher/index_join.go @@ -512,6 +512,7 @@ func NewColIndexJoin( rowcontainer.NewKVStreamerResultDiskBuffer( flowCtx.Cfg.TempStorage, diskMonitor, ), + kvFetcherMemAcc, ) } else { kvFetcher = row.NewKVFetcher( diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 818c32afcefd..da161806f8f7 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -13,6 +13,7 @@ package row import ( "context" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" @@ -148,6 +149,7 @@ type txnKVFetcher struct { // least once. alreadyFetched bool batchIdx int + reqsScratch []roachpb.RequestUnion responses []roachpb.ResponseUnion remainingBatches [][]byte @@ -156,11 +158,12 @@ type txnKVFetcher struct { getResponseScratch [1]roachpb.KeyValue acc *mon.BoundAccount - // spansAccountedFor and batchResponseAccountedFor track the number of bytes - // that we've already registered with acc in regards to spans and the batch - // response, respectively. + // spansAccountedFor, batchResponseAccountedFor, and reqsScratchAccountedFor + // track the number of bytes that we've already registered with acc in + // regards to spans, the batch response, and reqsScratch, respectively. spansAccountedFor int64 batchResponseAccountedFor int64 + reqsScratchAccountedFor int64 // If set, we will use the production value for kvBatchSize. forceProductionKVBatchSize bool @@ -395,7 +398,7 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { ba.Header.TargetBytes = int64(f.batchBytesLimit) ba.Header.MaxSpanRequestKeys = int64(f.getBatchKeyLimit()) ba.AdmissionHeader = f.requestAdmissionHeader - ba.Requests = spansToRequests(f.spans.Spans, f.reverse, f.lockStrength) + ba.Requests = spansToRequests(f.spans.Spans, f.reverse, f.lockStrength, f.reqsScratch) if log.ExpensiveLogEnabled(ctx, 2) { log.VEventf(ctx, 2, "Scan %s", f.spans) @@ -472,6 +475,21 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { f.batchIdx++ f.scratchSpans.reset() f.alreadyFetched = true + // Keep the reference to the requests slice in order to reuse in the future + // after making sure to nil out the requests in order to lose references to + // the underlying Get and Scan requests which could keep large byte slices + // alive. + f.reqsScratch = ba.Requests + for i := range f.reqsScratch { + f.reqsScratch[i] = roachpb.RequestUnion{} + } + if monitoring { + reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch)) + if err := f.acc.Resize(ctx, f.reqsScratchAccountedFor, reqsScratchMemUsage); err != nil { + return err + } + f.reqsScratchAccountedFor = reqsScratchMemUsage + } // TODO(radu): We should fetch the next chunk in the background instead of waiting for the next // call to fetch(). We can use a pool of workers to issue the KV ops which will also limit the @@ -623,7 +641,9 @@ func (f *txnKVFetcher) reset(ctx context.Context) { f.remainingBatches = nil f.spans = identifiableSpans{} f.scratchSpans = identifiableSpans{} - // Release only the allocations made by this fetcher. + // Release only the allocations made by this fetcher. Note that we're still + // keeping the reference to reqsScratch, so we don't release the allocation + // for it. f.acc.Shrink(ctx, f.batchResponseAccountedFor+f.spansAccountedFor) f.batchResponseAccountedFor, f.spansAccountedFor = 0, 0 } @@ -633,14 +653,24 @@ func (f *txnKVFetcher) close(ctx context.Context) { f.reset(ctx) } +const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{})) + // spansToRequests converts the provided spans to the corresponding requests. If // a span doesn't have the EndKey set, then a Get request is used for it; // otherwise, a Scan (or ReverseScan if reverse is true) request is used with // BATCH_RESPONSE format. +// +// The provided reqsScratch is reused if it has enough capacity for all spans, +// if not, a new slice is allocated. func spansToRequests( - spans roachpb.Spans, reverse bool, keyLocking lock.Strength, + spans roachpb.Spans, reverse bool, keyLocking lock.Strength, reqsScratch []roachpb.RequestUnion, ) []roachpb.RequestUnion { - reqs := make([]roachpb.RequestUnion, len(spans)) + var reqs []roachpb.RequestUnion + if cap(reqsScratch) >= len(spans) { + reqs = reqsScratch[:len(spans)] + } else { + reqs = make([]roachpb.RequestUnion, len(spans)) + } // Detect the number of gets vs scans, so we can batch allocate all of the // requests precisely. nGets := 0 diff --git a/pkg/sql/row/kv_batch_streamer.go b/pkg/sql/row/kv_batch_streamer.go index 20b948ba2888..d3451972d0fd 100644 --- a/pkg/sql/row/kv_batch_streamer.go +++ b/pkg/sql/row/kv_batch_streamer.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/errors" ) @@ -45,8 +46,11 @@ type txnKVStreamer struct { streamer *kvstreamer.Streamer keyLocking lock.Strength - spans roachpb.Spans - spanIDs []int + spans roachpb.Spans + spanIDs []int + reqsScratch []roachpb.RequestUnion + + acc *mon.BoundAccount // getResponseScratch is reused to return the result of Get requests. getResponseScratch [1]roachpb.KeyValue @@ -63,11 +67,12 @@ var _ KVBatchFetcher = &txnKVStreamer{} // newTxnKVStreamer creates a new txnKVStreamer. func newTxnKVStreamer( - streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, + streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, acc *mon.BoundAccount, ) KVBatchFetcher { return &txnKVStreamer{ streamer: streamer, keyLocking: getKeyLockingStrength(lockStrength), + acc: acc, } } @@ -86,13 +91,22 @@ func (f *txnKVStreamer) SetupNextFetch( if log.ExpensiveLogEnabled(ctx, 2) { log.VEventf(ctx, 2, "Scan %s", spans) } - reqs := spansToRequests(spans, false /* reverse */, f.keyLocking) + reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, f.reqsScratch) if err := f.streamer.Enqueue(ctx, reqs); err != nil { return err } f.spans = spans f.spanIDs = spanIDs - return nil + // Keep the reference to the requests slice in order to reuse in the future + // after making sure to nil out the requests in order to lose references to + // the underlying Get and Scan requests which could keep large byte slices + // alive. + f.reqsScratch = reqs + for i := range f.reqsScratch { + f.reqsScratch[i] = roachpb.RequestUnion{} + } + reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch)) + return f.acc.ResizeTo(ctx, reqsScratchMemUsage) } func (f *txnKVStreamer) getSpanID(resultPosition int) int { diff --git a/pkg/sql/row/kv_fetcher.go b/pkg/sql/row/kv_fetcher.go index 6df5df898518..2f988a87c9c9 100644 --- a/pkg/sql/row/kv_fetcher.go +++ b/pkg/sql/row/kv_fetcher.go @@ -134,6 +134,7 @@ func NewStreamingKVFetcher( singleRowLookup bool, maxKeysPerRow int, diskBuffer kvstreamer.ResultDiskBuffer, + kvFetcherMemAcc *mon.BoundAccount, ) *KVFetcher { streamer := kvstreamer.NewStreamer( distSender, @@ -157,7 +158,7 @@ func NewStreamingKVFetcher( maxKeysPerRow, diskBuffer, ) - return newKVFetcher(newTxnKVStreamer(streamer, lockStrength)) + return newKVFetcher(newTxnKVStreamer(streamer, lockStrength, kvFetcherMemAcc)) } func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher { diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 65c56c2d1b6e..4d88690ac0bc 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -129,6 +129,7 @@ type joinReader struct { unlimitedMemMonitor *mon.BytesMonitor budgetAcc mon.BoundAccount diskMonitor *mon.BytesMonitor + txnKVStreamerMemAcc mon.BoundAccount } input execinfra.RowSource @@ -472,6 +473,7 @@ func newJoinReader( ) jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() + jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() var diskBuffer kvstreamer.ResultDiskBuffer if jr.maintainOrdering { @@ -496,6 +498,7 @@ func newJoinReader( singleRowLookup, int(spec.FetchSpec.MaxKeysPerRow), diskBuffer, + &jr.streamerInfo.txnKVStreamerMemAcc, ) } else { // When not using the Streamer API, we want to limit the batch size hint @@ -1090,6 +1093,7 @@ func (jr *joinReader) close() { } if jr.usesStreamer { jr.streamerInfo.budgetAcc.Close(jr.Ctx) + jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx) jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx) if jr.streamerInfo.diskMonitor != nil { jr.streamerInfo.diskMonitor.Stop(jr.Ctx)