Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: reuse the slice of RequestUnion objects between fetches #82384

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ type singleRangeBatch struct {
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int32
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects).
// Since we reuse the same reqs slice for resume requests, this can be
// released only when the BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
overheadAccountedFor int64
// minTargetBytes, if positive, indicates the minimum TargetBytes limit that
// this singleRangeBatch should be sent with in order for the response to
// not be empty. Note that TargetBytes of at least minTargetBytes is
Expand Down
45 changes: 26 additions & 19 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,12 +501,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
//}

r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: requestUnionOverhead * int64(cap(singleRangeReqs)),
}
totalReqsMemUsage += r.reqsReservedBytes
totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor

if s.mode == OutOfOrder {
// Sort all single-range requests to be in the key order.
Expand Down Expand Up @@ -1090,6 +1091,12 @@ func (w *workerCoordinator) performRequestAsync(
// non-empty responses as well as resume spans, if any.
respOverestimate := targetBytes - memoryFootprintBytes
reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage
if resumeReqsMemUsage == 0 {
// There will be no resume request, so we will lose the
// reference to the req.reqs slice and can release its memory
// reservation.
reqOveraccounted += req.overheadAccountedFor
}
overaccountedTotal := respOverestimate + reqOveraccounted
if overaccountedTotal >= 0 {
w.s.budget.release(ctx, overaccountedTotal)
Expand Down Expand Up @@ -1213,17 +1220,14 @@ func calculateFootprint(
}
}
}
// This addendum is the first step of requestsMemUsage() and we've already
// added the size of each resume request above.
resumeReqsMemUsage += requestUnionOverhead * int64(numIncompleteGets+numIncompleteScans)
return memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans
}

// processSingleRangeResults creates a Result for each non-empty response found
// in the BatchResponse. The ResumeSpans, if found, are added into a new
// singleRangeBatch request that is added to be picked up by the mainLoop of the
// worker coordinator. This method assumes that req is no longer needed by the
// caller, so req.positions is reused for the ResumeSpans.
// caller, so the slices from req are reused for the ResumeSpans.
//
// It also assumes that the budget has already been reconciled with the
// reservations for Results that will be created.
Expand All @@ -1236,14 +1240,15 @@ func (w *workerCoordinator) processSingleRangeResults(
) error {
numIncompleteRequests := numIncompleteGets + numIncompleteScans
var resumeReq singleRangeBatch
// We have to allocate the new slice for requests, but we can reuse the
// positions slice.
resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests)
// We have to allocate the new Get and Scan requests, but we can reuse the
// reqs and the positions slices.
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
Expand Down Expand Up @@ -1415,7 +1420,14 @@ func (w *workerCoordinator) processSingleRangeResults(

// If we have any incomplete requests, add them back into the work
// pool.
if len(resumeReq.reqs) > 0 {
if numIncompleteRequests > 0 {
// Make sure to nil out old requests that we didn't include into the
// resume request. We don't have to do this if there aren't any
// incomplete requests since req and resumeReq will be garbage collected
// on their own.
for i := numIncompleteRequests; i < len(req.reqs); i++ {
req.reqs[i] = roachpb.RequestUnion{}
}
w.s.requestsToServe.add(resumeReq)
}

Expand Down Expand Up @@ -1504,12 +1516,7 @@ func init() {

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))

func requestsMemUsage(reqs []roachpb.RequestUnion) int64 {
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we'll
// account for it separately first.
memUsage := requestUnionOverhead * int64(cap(reqs))
// No need to account for elements past len(reqs) because those must be
// unset and we have already accounted for RequestUnion object above.
func requestsMemUsage(reqs []roachpb.RequestUnion) (memUsage int64) {
for _, r := range reqs {
memUsage += int64(r.Size())
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,7 @@ func NewColIndexJoin(
rowcontainer.NewKVStreamerResultDiskBuffer(
flowCtx.Cfg.TempStorage, diskMonitor,
),
kvFetcherMemAcc,
)
} else {
kvFetcher = row.NewKVFetcher(
Expand Down
44 changes: 37 additions & 7 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package row
import (
"context"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
Expand Down Expand Up @@ -148,6 +149,7 @@ type txnKVFetcher struct {
// least once.
alreadyFetched bool
batchIdx int
reqsScratch []roachpb.RequestUnion

responses []roachpb.ResponseUnion
remainingBatches [][]byte
Expand All @@ -156,11 +158,12 @@ type txnKVFetcher struct {
getResponseScratch [1]roachpb.KeyValue

acc *mon.BoundAccount
// spansAccountedFor and batchResponseAccountedFor track the number of bytes
// that we've already registered with acc in regards to spans and the batch
// response, respectively.
// spansAccountedFor, batchResponseAccountedFor, and reqsScratchAccountedFor
// track the number of bytes that we've already registered with acc in
// regards to spans, the batch response, and reqsScratch, respectively.
spansAccountedFor int64
batchResponseAccountedFor int64
reqsScratchAccountedFor int64

// If set, we will use the production value for kvBatchSize.
forceProductionKVBatchSize bool
Expand Down Expand Up @@ -395,7 +398,7 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
ba.Header.TargetBytes = int64(f.batchBytesLimit)
ba.Header.MaxSpanRequestKeys = int64(f.getBatchKeyLimit())
ba.AdmissionHeader = f.requestAdmissionHeader
ba.Requests = spansToRequests(f.spans.Spans, f.reverse, f.lockStrength)
ba.Requests = spansToRequests(f.spans.Spans, f.reverse, f.lockStrength, f.reqsScratch)

if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "Scan %s", f.spans)
Expand Down Expand Up @@ -472,6 +475,21 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
f.batchIdx++
f.scratchSpans.reset()
f.alreadyFetched = true
// Keep the reference to the requests slice in order to reuse in the future
// after making sure to nil out the requests in order to lose references to
// the underlying Get and Scan requests which could keep large byte slices
// alive.
f.reqsScratch = ba.Requests
for i := range f.reqsScratch {
f.reqsScratch[i] = roachpb.RequestUnion{}
}
if monitoring {
reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch))
if err := f.acc.Resize(ctx, f.reqsScratchAccountedFor, reqsScratchMemUsage); err != nil {
return err
}
f.reqsScratchAccountedFor = reqsScratchMemUsage
}

// TODO(radu): We should fetch the next chunk in the background instead of waiting for the next
// call to fetch(). We can use a pool of workers to issue the KV ops which will also limit the
Expand Down Expand Up @@ -623,7 +641,9 @@ func (f *txnKVFetcher) reset(ctx context.Context) {
f.remainingBatches = nil
f.spans = identifiableSpans{}
f.scratchSpans = identifiableSpans{}
// Release only the allocations made by this fetcher.
// Release only the allocations made by this fetcher. Note that we're still
// keeping the reference to reqsScratch, so we don't release the allocation
// for it.
f.acc.Shrink(ctx, f.batchResponseAccountedFor+f.spansAccountedFor)
f.batchResponseAccountedFor, f.spansAccountedFor = 0, 0
}
Expand All @@ -633,14 +653,24 @@ func (f *txnKVFetcher) close(ctx context.Context) {
f.reset(ctx)
}

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))

// spansToRequests converts the provided spans to the corresponding requests. If
// a span doesn't have the EndKey set, then a Get request is used for it;
// otherwise, a Scan (or ReverseScan if reverse is true) request is used with
// BATCH_RESPONSE format.
//
// The provided reqsScratch is reused if it has enough capacity for all spans,
// if not, a new slice is allocated.
func spansToRequests(
spans roachpb.Spans, reverse bool, keyLocking lock.Strength,
spans roachpb.Spans, reverse bool, keyLocking lock.Strength, reqsScratch []roachpb.RequestUnion,
) []roachpb.RequestUnion {
reqs := make([]roachpb.RequestUnion, len(spans))
var reqs []roachpb.RequestUnion
if cap(reqsScratch) >= len(spans) {
reqs = reqsScratch[:len(spans)]
} else {
reqs = make([]roachpb.RequestUnion, len(spans))
}
// Detect the number of gets vs scans, so we can batch allocate all of the
// requests precisely.
nGets := 0
Expand Down
24 changes: 19 additions & 5 deletions pkg/sql/row/kv_batch_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand All @@ -45,8 +46,11 @@ type txnKVStreamer struct {
streamer *kvstreamer.Streamer
keyLocking lock.Strength

spans roachpb.Spans
spanIDs []int
spans roachpb.Spans
spanIDs []int
reqsScratch []roachpb.RequestUnion

acc *mon.BoundAccount

// getResponseScratch is reused to return the result of Get requests.
getResponseScratch [1]roachpb.KeyValue
Expand All @@ -63,11 +67,12 @@ var _ KVBatchFetcher = &txnKVStreamer{}

// newTxnKVStreamer creates a new txnKVStreamer.
func newTxnKVStreamer(
streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength,
streamer *kvstreamer.Streamer, lockStrength descpb.ScanLockingStrength, acc *mon.BoundAccount,
) KVBatchFetcher {
return &txnKVStreamer{
streamer: streamer,
keyLocking: getKeyLockingStrength(lockStrength),
acc: acc,
}
}

Expand All @@ -86,13 +91,22 @@ func (f *txnKVStreamer) SetupNextFetch(
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "Scan %s", spans)
}
reqs := spansToRequests(spans, false /* reverse */, f.keyLocking)
reqs := spansToRequests(spans, false /* reverse */, f.keyLocking, f.reqsScratch)
if err := f.streamer.Enqueue(ctx, reqs); err != nil {
return err
}
f.spans = spans
f.spanIDs = spanIDs
return nil
// Keep the reference to the requests slice in order to reuse in the future
// after making sure to nil out the requests in order to lose references to
// the underlying Get and Scan requests which could keep large byte slices
// alive.
f.reqsScratch = reqs
for i := range f.reqsScratch {
f.reqsScratch[i] = roachpb.RequestUnion{}
}
reqsScratchMemUsage := requestUnionOverhead * int64(cap(f.reqsScratch))
return f.acc.ResizeTo(ctx, reqsScratchMemUsage)
}

func (f *txnKVStreamer) getSpanID(resultPosition int) int {
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func NewStreamingKVFetcher(
singleRowLookup bool,
maxKeysPerRow int,
diskBuffer kvstreamer.ResultDiskBuffer,
kvFetcherMemAcc *mon.BoundAccount,
) *KVFetcher {
streamer := kvstreamer.NewStreamer(
distSender,
Expand All @@ -157,7 +158,7 @@ func NewStreamingKVFetcher(
maxKeysPerRow,
diskBuffer,
)
return newKVFetcher(newTxnKVStreamer(streamer, lockStrength))
return newKVFetcher(newTxnKVStreamer(streamer, lockStrength, kvFetcherMemAcc))
}

func newKVFetcher(batchFetcher KVBatchFetcher) *KVFetcher {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type joinReader struct {
unlimitedMemMonitor *mon.BytesMonitor
budgetAcc mon.BoundAccount
diskMonitor *mon.BytesMonitor
txnKVStreamerMemAcc mon.BoundAccount
}

input execinfra.RowSource
Expand Down Expand Up @@ -472,6 +473,7 @@ func newJoinReader(
)
jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()

var diskBuffer kvstreamer.ResultDiskBuffer
if jr.maintainOrdering {
Expand All @@ -496,6 +498,7 @@ func newJoinReader(
singleRowLookup,
int(spec.FetchSpec.MaxKeysPerRow),
diskBuffer,
&jr.streamerInfo.txnKVStreamerMemAcc,
)
} else {
// When not using the Streamer API, we want to limit the batch size hint
Expand Down Expand Up @@ -1090,6 +1093,7 @@ func (jr *joinReader) close() {
}
if jr.usesStreamer {
jr.streamerInfo.budgetAcc.Close(jr.Ctx)
jr.streamerInfo.txnKVStreamerMemAcc.Close(jr.Ctx)
jr.streamerInfo.unlimitedMemMonitor.Stop(jr.Ctx)
if jr.streamerInfo.diskMonitor != nil {
jr.streamerInfo.diskMonitor.Stop(jr.Ctx)
Expand Down