Skip to content

Commit

Permalink
kvstreamer: perform more memory accounting
Browse files Browse the repository at this point in the history
This commit performs memory accounting for more of the internal state of
the streamer. In particular, it adds accounting for the overhead of
`Result`s in the results buffer (previously, we would only account for
the size of the Get or the Scan response but not for the `Result` struct
itself). It also adds accounting for all slices that are `O(N)` in size
where `N` is the number of requests.

Release note: None
  • Loading branch information
yuzefovich committed Jun 24, 2022
1 parent eac62e9 commit 78308b0
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 19 deletions.
12 changes: 7 additions & 5 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
// singleRangeBatch contains parts of the originally enqueued requests that have
// been truncated to be within a single range. All requests within the
// singleRangeBatch will be issued as a single BatchRequest.
// TODO(yuzefovich): perform memory accounting for slices other than reqs in
// singleRangeBatch.
type singleRangeBatch struct {
reqs []roachpb.RequestUnion
// reqsKeys stores the start key of the corresponding request in reqs. It is
Expand Down Expand Up @@ -81,9 +79,10 @@ type singleRangeBatch struct {
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
// overheadAccountedFor tracks the memory reservation against the budget for
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects).
// Since we reuse the same reqs slice for resume requests, this can be
// released only when the BatchResponse doesn't have any resume spans.
// the overhead of the reqs slice (i.e. of roachpb.RequestUnion objects) as
// well as the positions and the subRequestIdx slices. Since we reuse these
// slices for the resume requests, this can be released only when the
// BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
Expand Down Expand Up @@ -212,6 +211,9 @@ type requestsProviderBase struct {
hasWork *sync.Cond
// requests contains all single-range sub-requests that have yet to be
// served.
// TODO(yuzefovich): this memory is not accounted for. However, the number
// of singleRangeBatch objects in flight is limited by the number of ranges
// of a single table, so it doesn't seem urgent to fix the accounting here.
requests []singleRangeBatch
// done is set to true once the Streamer is Close()'d.
done bool
Expand Down
45 changes: 39 additions & 6 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvstreamer
import (
"context"
"fmt"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
Expand Down Expand Up @@ -84,7 +85,9 @@ type resultsBuffer interface {
// added all Results it could, and the resultsBuffer checks whether any
// Results are available to be returned to the client. If there is a
// goroutine blocked in wait(), the goroutine is woken up.
doneAddingLocked()
//
// It is assumed that the budget's mutex is already being held.
doneAddingLocked(context.Context)

///////////////////////////////////////////////////////////////////////////
// //
Expand Down Expand Up @@ -143,7 +146,11 @@ type resultsBufferBase struct {
// hasResults is used in wait() to block until there are some results to be
// picked up.
hasResults chan struct{}
err error
// overheadAccountedFor tracks how much overhead space for the Results in
// this results buffer has been consumed from the budget. Note that this
// does not include the footprint of Get and Scan responses.
overheadAccountedFor int64
err error
}

func newResultsBufferBase(budget *budget) *resultsBufferBase {
Expand Down Expand Up @@ -179,6 +186,23 @@ func (b *resultsBufferBase) checkIfCompleteLocked(r Result) {
}
}

func (b *resultsBufferBase) accountForOverheadLocked(ctx context.Context, overheadMemUsage int64) {
b.budget.mu.AssertHeld()
b.Mutex.AssertHeld()
if overheadMemUsage > b.overheadAccountedFor {
// We're allowing the budget to go into debt here since the results
// buffer doesn't have a way to push back on the Results. It would also
// be unfortunate to discard these Results - instead, we rely on the
// worker coordinator to make sure the budget gets out of debt.
if err := b.budget.consumeLocked(ctx, overheadMemUsage-b.overheadAccountedFor, true /* allowDebt */); err != nil {
b.setErrorLocked(err)
}
} else {
b.budget.releaseLocked(ctx, b.overheadAccountedFor-overheadMemUsage)
}
b.overheadAccountedFor = overheadMemUsage
}

// signal non-blockingly sends on hasResults channel.
func (b *resultsBufferBase) signal() {
select {
Expand Down Expand Up @@ -267,15 +291,20 @@ func (b *outOfOrderResultsBuffer) addLocked(r Result) {
b.numUnreleasedResults++
}

func (b *outOfOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
const resultSize = int64(unsafe.Sizeof(Result{}))

func (b *outOfOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
b.accountForOverheadLocked(ctx, int64(cap(b.results))*resultSize)
b.signal()
}

func (b *outOfOrderResultsBuffer) get(context.Context) ([]Result, bool, error) {
b.Lock()
defer b.Unlock()
results := b.results
// Note that although we're losing the reference to the Results slice, we
// still keep the overhead of the slice accounted for with the budget. This
// is done as a way of "amortizing" the reservation.
b.results = nil
allComplete := b.numCompleteResponses == b.numExpectedResponses
return results, allComplete, b.err
Expand Down Expand Up @@ -470,8 +499,12 @@ func (b *inOrderResultsBuffer) addLocked(r Result) {
b.addCounter++
}

func (b *inOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
const inOrderBufferedResultSize = int64(unsafe.Sizeof(inOrderBufferedResult{}))

func (b *inOrderResultsBuffer) doneAddingLocked(ctx context.Context) {
overhead := int64(cap(b.buffered))*inOrderBufferedResultSize + // b.buffered
int64(cap(b.resultScratch))*resultSize // b.resultsScratch
b.accountForOverheadLocked(ctx, overhead)
if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx {
if debug {
fmt.Println("found head-of-the-line")
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,16 @@ func TestInOrderResultsBuffer(t *testing.T) {
break
}

budget.mu.Lock()
b.Lock()
numToAdd := rng.Intn(len(addOrder)) + 1
for i := 0; i < numToAdd; i++ {
b.addLocked(results[addOrder[0]])
addOrder = addOrder[1:]
}
b.doneAddingLocked()
b.doneAddingLocked(ctx)
b.Unlock()
budget.mu.Unlock()

// With 50% probability, try spilling some of the buffered results
// to disk.
Expand Down
54 changes: 47 additions & 7 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ type Streamer struct {
// been returned by GetResults() yet.
results resultsBuffer

// numRangesPerScanRequestAccountedFor tracks how much space has been
// consumed from the budget in order to account for the
// numRangesPerScanRequest slice.
//
// It is only accessed from the Streamer's user goroutine, so it doesn't
// need the mutex protection.
numRangesPerScanRequestAccountedFor int64

mu struct {
// If the budget's mutex also needs to be locked, the budget's mutex
// must be acquired first. If the results' mutex needs to be locked,
Expand All @@ -252,7 +260,6 @@ type Streamer struct {
//
// It is allocated lazily if Hints.SingleRowLookup is false when the
// first ScanRequest is encountered in Enqueue.
// TODO(yuzefovich): perform memory accounting for this.
numRangesPerScanRequest []int32

// numRequestsInFlight tracks the number of single-range batches that
Expand Down Expand Up @@ -364,6 +371,14 @@ func (s *Streamer) Init(
s.maxKeysPerRow = int32(maxKeysPerRow)
}

const (
requestUnionSliceOverhead = int64(unsafe.Sizeof([]roachpb.RequestUnion{}))
intSliceOverhead = int64(unsafe.Sizeof([]int{}))
intSize = int64(unsafe.Sizeof(int(0)))
int32SliceOverhead = int64(unsafe.Sizeof([]int32{}))
int32Size = int64(unsafe.Sizeof(int32(0)))
)

// Enqueue dispatches multiple requests for execution. Results are delivered
// through the GetResults call.
//
Expand Down Expand Up @@ -431,6 +446,9 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
// requestsProvider right away. This is needed in order for the worker
// coordinator to not pick up any work until we account for
// totalReqsMemUsage.
// TODO(yuzefovich): this memory is not accounted for. However, the number
// of singleRangeBatch objects in flight is limited by the number of ranges
// of a single table, so it doesn't seem urgent to fix the accounting here.
var requestsToServe []singleRangeBatch
seekKey := rs.Key
const scanDir = kvcoord.Ascending
Expand All @@ -447,6 +465,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
}
}()
var reqsKeysScratch []roachpb.Key
var newNumRangesPerScanRequestMemoryUsage int64
for ; ri.Valid(); ri.Seek(ctx, seekKey, scanDir) {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
Expand All @@ -459,6 +478,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
return err
}
var subRequestIdx []int32
var subRequestIdxOverhead int64
if !s.hints.SingleRowLookup {
for i, pos := range positions {
if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan {
Expand All @@ -470,6 +490,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
s.mu.Lock()
if cap(s.mu.numRangesPerScanRequest) < len(reqs) {
s.mu.numRangesPerScanRequest = make([]int32, len(reqs))
newNumRangesPerScanRequestMemoryUsage = int64(cap(s.mu.numRangesPerScanRequest)) * int32Size
} else {
// We can reuse numRangesPerScanRequest allocated on
// the previous call to Enqueue after we zero it
Expand All @@ -483,6 +504,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
if s.mode == InOrder {
if subRequestIdx == nil {
subRequestIdx = make([]int32, len(singleRangeReqs))
subRequestIdxOverhead = int32SliceOverhead + int32Size*int64(cap(subRequestIdx))
}
subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos]
}
Expand All @@ -496,12 +518,15 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
//if !s.hints.UniqueRequests {
//}

overheadAccountedFor := requestUnionSliceOverhead + requestUnionOverhead*int64(cap(singleRangeReqs)) + // reqs
intSliceOverhead + intSize*int64(cap(positions)) + // positions
subRequestIdxOverhead // subRequestIdx
r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: requestUnionOverhead * int64(cap(singleRangeReqs)),
overheadAccountedFor: overheadAccountedFor,
}
totalReqsMemUsage += r.reqsReservedBytes + r.overheadAccountedFor

Expand Down Expand Up @@ -550,11 +575,16 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
streamerLocked = false
}

toConsume := totalReqsMemUsage
if newNumRangesPerScanRequestMemoryUsage != 0 && newNumRangesPerScanRequestMemoryUsage != s.numRangesPerScanRequestAccountedFor {
toConsume += newNumRangesPerScanRequestMemoryUsage - s.numRangesPerScanRequestAccountedFor
s.numRangesPerScanRequestAccountedFor = newNumRangesPerScanRequestMemoryUsage
}
// We allow the budget to go into debt iff a single request was enqueued.
// This is needed to support the case of arbitrarily large keys - the caller
// is expected to produce requests with such cases one at a time.
allowDebt := len(reqs) == 1
if err = s.budget.consume(ctx, totalReqsMemUsage, allowDebt); err != nil {
if err = s.budget.consume(ctx, toConsume, allowDebt); err != nil {
return err
}

Expand Down Expand Up @@ -1092,7 +1122,7 @@ func (w *workerCoordinator) performRequestAsync(
reqOveraccounted := req.reqsReservedBytes - resumeReqsMemUsage
if resumeReqsMemUsage == 0 {
// There will be no resume request, so we will lose the
// reference to the req.reqs slice and can release its memory
// reference to the slices in req and can release its memory
// reservation.
reqOveraccounted += req.overheadAccountedFor
}
Expand Down Expand Up @@ -1156,7 +1186,7 @@ func (w *workerCoordinator) performRequestAsync(
// Finally, process the results and add the ResumeSpans to be
// processed as well.
if err := processSingleRangeResults(
w.s, req, br, memoryFootprintBytes, resumeReqsMemUsage,
ctx, w.s, req, br, memoryFootprintBytes, resumeReqsMemUsage,
numIncompleteGets, numIncompleteScans, numGetResults, numScanResults,
); err != nil {
w.s.results.setError(err)
Expand Down Expand Up @@ -1238,6 +1268,7 @@ func calculateFootprint(
// It also assumes that the budget has already been reconciled with the
// reservations for Results that will be created.
func processSingleRangeResults(
ctx context.Context,
s *Streamer,
req singleRangeBatch,
br *roachpb.BatchResponse,
Expand All @@ -1249,7 +1280,7 @@ func processSingleRangeResults(
numIncompleteRequests := numIncompleteGets + numIncompleteScans
var resumeReq singleRangeBatch
// We have to allocate the new Get and Scan requests, but we can reuse the
// reqs and the positions slices.
// reqs, the positions, and the subRequestIdx slices.
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
Expand Down Expand Up @@ -1293,6 +1324,15 @@ func processSingleRangeResults(
}()
}
if numGetResults > 0 || numScanResults > 0 {
// We will add some Results into the results buffer, and
// doneAddingLocked() call below requires that the budget's mutex is
// held. It also must be acquired before the streamer's mutex is locked,
// so we have to do this right away.
// TODO(yuzefovich): check whether the lock contention on this mutex is
// noticeable and possibly refactor the code so that the budget's mutex
// is only acquired for the duration of doneAddingLocked().
s.budget.mu.Lock()
defer s.budget.mu.Unlock()
// We will create some Result objects, so we at least will need to
// update the average response size estimate, guarded by the streamer's
// lock.
Expand All @@ -1317,7 +1357,7 @@ func processSingleRangeResults(
// the Streamer's one.
s.results.Lock()
defer s.results.Unlock()
defer s.results.doneAddingLocked()
defer s.results.doneAddingLocked(ctx)
}
for i, resp := range br.Responses {
position := req.positions[i]
Expand Down

0 comments on commit 78308b0

Please sign in to comment.