Skip to content

Commit

Permalink
sql: reuse the slice of RequestUnion objects between fetches
Browse files Browse the repository at this point in the history
This commit teaches the `KVBatchFetcher`s to reuse the same slice of
`RequestUnion` objects when converting spans into the BatchRequest. Due
to current limitations, we cannot reuse each request deeply (i.e. we
cannot reuse GetRequest or ScanRequest), but I believe it is safe to
reuse the slice of interfaces. Both the non-streamer and the streamer
code paths have been adjusted. Previously, this slice would be allocated
and discarded after the BatchRequest completed (or, in the streamer's
case, when `Streamer.Enqueue` returned), so we didn't perform the memory
accounting, but now - since we will hold on to the slice - we do the
memory accounting. Additionally, in order to not keep the Get and Scan
requests alive, we deeply zero out the slice once it is safe to do so.

Additionally, a similar optimization is applied to how resume requests
are populated by the Streamer.

Release note: None
  • Loading branch information
yuzefovich committed Jun 10, 2022
1 parent c070f2c commit 8a0150c
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 57 deletions.
10 changes: 9 additions & 1 deletion pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,16 @@ type singleRangeBatch struct {
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int32
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects).
// Since we reuse the same reqs slice for resume requests, this can be
// released only when the BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
overheadAccountedFor int64
// minTargetBytes, if positive, indicates the minimum TargetBytes limit that
// this singleRangeBatch should be sent with in order for the response to
// not be empty. Note that TargetBytes of at least minTargetBytes is
Expand Down
46 changes: 26 additions & 20 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"runtime"
"sort"
"sync"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -500,12 +499,13 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
//}

r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: roachpb.RequestUnionSize * int64(cap(singleRangeReqs)),
}
totalReqsMemUsage += r.reqsReservedBytes
totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor

if s.mode == OutOfOrder {
// Sort all single-range requests to be in the key order.
Expand Down Expand Up @@ -1083,6 +1083,12 @@ func (w *workerCoordinator) performRequestAsync(
// non-empty responses as well as resume spans, if any.
respOverestimate := targetBytes - memoryFootprintBytes
reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage
if resumeReqsMemUsage == 0 {
// There will be no resume request, so we will lose the
// reference to the req.reqs slice and can release its memory
// reservation.
reqOveraccounted += req.overheadAccountedFor
}
overaccountedTotal := respOverestimate + reqOveraccounted
if overaccountedTotal >= 0 {
w.s.budget.release(ctx, overaccountedTotal)
Expand Down Expand Up @@ -1206,17 +1212,14 @@ func calculateFootprint(
}
}
}
// This addendum is the first step of requestsMemUsage() and we've already
// added the size of each resume request above.
resumeReqsMemUsage += requestUnionOverhead * int64(numIncompleteGets+numIncompleteScans)
return memoryFootprintBytes, resumeReqsMemUsage, numIncompleteGets, numIncompleteScans
}

// processSingleRangeResults creates a Result for each non-empty response found
// in the BatchResponse. The ResumeSpans, if found, are added into a new
// singleRangeBatch request that is added to be picked up by the mainLoop of the
// worker coordinator. This method assumes that req is no longer needed by the
// caller, so req.positions is reused for the ResumeSpans.
// caller, so the slices from req are reused for the ResumeSpans.
//
// It also assumes that the budget has already been reconciled with the
// reservations for Results that will be created.
Expand All @@ -1229,14 +1232,15 @@ func (w *workerCoordinator) processSingleRangeResults(
) error {
numIncompleteRequests := numIncompleteGets + numIncompleteScans
var resumeReq singleRangeBatch
// We have to allocate the new slice for requests, but we can reuse the
// positions slice.
resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests)
// We have to allocate the new Get and Scan requests, but we can reuse the
// reqs and the positions slices.
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
Expand Down Expand Up @@ -1408,7 +1412,14 @@ func (w *workerCoordinator) processSingleRangeResults(

// If we have any incomplete requests, add them back into the work
// pool.
if len(resumeReq.reqs) > 0 {
if numIncompleteRequests > 0 {
// Make sure to nil out old requests that we didn't include into the
// resume request. We don't have to do this if there aren't any
// incomplete requests since req and resumeReq will be garbage collected
// on their own.
for i := numIncompleteRequests; i < len(req.reqs); i++ {
req.reqs[i] = roachpb.RequestUnion{}
}
w.s.requestsToServe.add(resumeReq)
}

Expand Down Expand Up @@ -1495,12 +1506,7 @@ func init() {
zeroInt32Slice = make([]int32, 1<<10)
}

const requestUnionOverhead = int64(unsafe.Sizeof(roachpb.RequestUnion{}))

func requestsMemUsage(reqs []roachpb.RequestUnion) int64 {
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we'll
// account for it separately first.
memUsage := requestUnionOverhead * int64(cap(reqs))
func requestsMemUsage(reqs []roachpb.RequestUnion) (memUsage int64) {
// No need to account for elements past len(reqs) because those must be
// unset and we have already accounted for RequestUnion object above.
for _, r := range reqs {
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package roachpb

import (
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -1800,3 +1801,6 @@ const (
// with the SpecificTenantOverrides precedence..
AllTenantsOverrides
)

// RequestUnionSize is the size of the RequestUnion object.
const RequestUnionSize = int64(unsafe.Sizeof(RequestUnion{}))
2 changes: 2 additions & 0 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
chunkSize, traceKV, false, /* forceProductionKVBatchSize */
true, /* expectMultipleCalls */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
return roachpb.Key{}, err
Expand Down Expand Up @@ -811,6 +812,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
ctx, txn, []roachpb.Span{sp}, nil, /* spanIDs */
rowinfra.GetDefaultBatchBytesLimit(false /* forceProductionValue */),
initBufferSize, traceKV, false, /* forceProductionKVBatchSize */
false, /* expectMultipleCalls */
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
return nil, nil, 0, err
Expand Down
52 changes: 46 additions & 6 deletions pkg/sql/colfetcher/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ type cFetcher struct {
// keys.
scratch []byte

// reqsScratch is a scratch space that is reused between StartScan() and
// StartScanStreaming() calls when expectMultipleCalls is true. Its memory
// usage is tracked in reqsScratchAccountedFor and is registered with
// kvFetcherMemAcc.
reqsScratch []roachpb.RequestUnion
reqsScratchAccountedFor int64

accountingHelper colmem.SetAccountingHelper

// kvFetcherMemAcc is a memory account that will be used by the underlying
Expand Down Expand Up @@ -494,6 +501,13 @@ func (cf *cFetcher) setFetcher(f *row.KVFetcher, limitHint rowinfra.RowLimit) {
cf.machine.state[1] = stateInitFetch
}

func (cf *cFetcher) keepReqsScratch(ctx context.Context, reqsScratch []roachpb.RequestUnion) error {
cf.reqsScratch = reqsScratch
oldAccountedFor := cf.reqsScratchAccountedFor
cf.reqsScratchAccountedFor = roachpb.RequestUnionSize * int64(cap(reqsScratch))
return cf.kvFetcherMemAcc.Resize(ctx, oldAccountedFor, cf.reqsScratchAccountedFor)
}

// StartScan initializes and starts the key-value scan. Can be used multiple
// times.
//
Expand All @@ -502,6 +516,10 @@ func (cf *cFetcher) setFetcher(f *row.KVFetcher, limitHint rowinfra.RowLimit) {
// spans slice after the fetcher has been closed (which happens when the fetcher
// emits the first zero batch), and if the caller does, it becomes responsible
// for the memory accounting.
//
// expectMultipleCalls, if true, indicates the caller is likely to call this
// method many times on the same cFetcher. This allows the cFetcher to reuse
// some of the allocations.
func (cf *cFetcher) StartScan(
ctx context.Context,
txn *kv.Txn,
Expand All @@ -511,6 +529,7 @@ func (cf *cFetcher) StartScan(
batchBytesLimit rowinfra.BytesLimit,
limitHint rowinfra.RowLimit,
forceProductionKVBatchSize bool,
expectMultipleCalls bool,
) error {
if len(spans) == 0 {
return errors.AssertionFailedf("no spans")
Expand Down Expand Up @@ -544,11 +563,12 @@ func (cf *cFetcher) StartScan(
firstBatchLimit = rowinfra.KeyLimit(int(limitHint) * int(cf.table.spec.MaxKeysPerRow))
}

f, err := row.NewKVFetcher(
f, reqsScratch, err := row.NewKVFetcher(
ctx,
txn,
spans,
nil, /* spanIDs */
cf.reqsScratch,
bsHeader,
cf.reverse,
batchBytesLimit,
Expand All @@ -563,6 +583,11 @@ func (cf *cFetcher) StartScan(
return err
}
cf.setFetcher(f, limitHint)
if expectMultipleCalls {
if err = cf.keepReqsScratch(ctx, reqsScratch); err != nil {
return err
}
}
return nil
}

Expand All @@ -574,18 +599,30 @@ func (cf *cFetcher) StartScan(
// spans slice after the fetcher has been closed (which happens when the fetcher
// emits the first zero batch), and if the caller does, it becomes responsible
// for the memory accounting.
//
// expectMultipleCalls, if true, indicates the caller is likely to call this
// method many times on the same cFetcher. This allows the cFetcher to reuse
// some of the allocations.
func (cf *cFetcher) StartScanStreaming(
ctx context.Context,
streamer *kvstreamer.Streamer,
spans roachpb.Spans,
limitHint rowinfra.RowLimit,
expectMultipleCalls bool,
) error {
kvBatchFetcher, err := row.NewTxnKVStreamer(ctx, streamer, spans, nil /* spanIDs */, cf.lockStrength)
kvBatchFetcher, reqsScratch, err := row.NewTxnKVStreamer(
ctx, streamer, spans, nil /* spanIDs */, cf.lockStrength, cf.reqsScratch,
)
if err != nil {
return err
}
f := row.NewKVStreamingFetcher(kvBatchFetcher)
cf.setFetcher(f, limitHint)
if expectMultipleCalls {
if err = cf.keepReqsScratch(ctx, reqsScratch); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -1375,9 +1412,12 @@ func (cf *cFetcher) Release() {
}

func (cf *cFetcher) Close(ctx context.Context) {
if cf != nil && cf.fetcher != nil {
cf.bytesRead += cf.fetcher.GetBytesRead()
cf.fetcher.Close(ctx)
cf.fetcher = nil
if cf != nil {
cf.reqsScratch = nil
if cf.fetcher != nil {
cf.bytesRead += cf.fetcher.GetBytesRead()
cf.fetcher.Close(ctx)
cf.fetcher = nil
}
}
}
1 change: 1 addition & 0 deletions pkg/sql/colfetcher/colbatch_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (s *ColBatchScan) Init(ctx context.Context) {
s.batchBytesLimit,
s.limitHint,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
false, /* expectMultipleCalls */
); err != nil {
colexecerror.InternalError(err)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/colfetcher/index_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (s *ColIndexJoin) Next() coldata.Batch {
s.streamerInfo.Streamer,
spans,
rowinfra.NoRowLimit,
true, /* expectMultipleCalls */
)
} else {
err = s.cf.StartScan(
Expand All @@ -261,6 +262,7 @@ func (s *ColIndexJoin) Next() coldata.Batch {
rowinfra.NoBytesLimit,
rowinfra.NoRowLimit,
s.flowCtx.EvalCtx.TestingKnobs.ForceProductionValues,
true, /* expectMultipleCalls */
)
}
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/delete_preserving_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,9 @@ func fetchIndex(
))

require.NoError(t, fetcher.StartScan(
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit,
0, true, false, /* forceProductionBatchSize */
false, /* expectMultipleCalls */
))
var rows []tree.Datums
for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/indexbackfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,9 @@ INSERT INTO foo VALUES (1), (10), (100);
))

require.NoError(t, fetcher.StartScan(
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit, 0, true, false, /* forceProductionBatchSize */
ctx, txn, spans, nil /* spanIDs */, rowinfra.NoBytesLimit,
0, true, false, /* forceProductionBatchSize */
false, /* expectMultipleCalls */
))
var rows []tree.Datums
for {
Expand Down
Loading

0 comments on commit 8a0150c

Please sign in to comment.