Skip to content

Commit

Permalink
kvstreamer: account for the overhead of GetResponse and ScanResponse
Browse files Browse the repository at this point in the history
The Streamer is careful to account for the requests (both the footprint
and the overhead) as well as to estimate the footprint of the responses.
However, it currently doesn't account for the overhead of the GetResponse
(currently 64 bytes) and ScanResponse (120 bytes) structs. We recently
saw a case where this overhead was the largest user of RAM which
contributed to the pod OOMing. This commit fixes this accounting oversight
in the following manner:
- prior to issuing the BatchRequest, we estimate the overhead of
a response to each request in the batch. Notably, the BatchResponse will
contain a RequestUnion object as well as the GetResponse or ScanResponse
object for each request
- once the BatchResponse is received, we reconcile the budget to track
the precise memory usage of the responses (ignoring the RequestUnion
since we don't keep a reference to it). We already tracked the
"footprint" and now we also include the "overhead" with both being
released to the budget on `Result.Release` call.

We track this "responses overhead" usage separate from the target bytes
usage (the "footprint") since the KV server doesn't include the overhead
when determining how to handle `TargetBytes` limit, and we must behave
in the same manner.

Release note: None
  • Loading branch information
yuzefovich committed Feb 22, 2023
1 parent c58fdaf commit 377f06e
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 61 deletions.
6 changes: 4 additions & 2 deletions pkg/kv/kvclient/kvstreamer/avg_response_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func (e *avgResponseEstimator) init(sv *settings.Values) {
e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv)
}

// getAvgResponseSize returns the current estimate of a footprint of a single
// response.
func (e *avgResponseEstimator) getAvgResponseSize() int64 {
if e.numResponses == 0 {
return initialAvgResponseSize
Expand Down Expand Up @@ -78,8 +80,8 @@ func (e *avgResponseEstimator) getAvgResponseSize() int64 {
}

// update updates the actual information of the estimator based on numResponses
// responses that took up responseBytes bytes and correspond to a single
// BatchResponse.
// responses that took up responseBytes bytes in footprint and correspond to a
// single BatchResponse.
func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) {
e.responseBytes += float64(responseBytes)
e.numResponses += float64(numResponses)
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type singleRangeBatch struct {
// subRequestIdx is only allocated in InOrder mode when
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int32
// numGetsInReqs tracks the number of Get requests in reqs.
numGetsInReqs int64
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs, excluding the overhead.
reqsReservedBytes int64
Expand All @@ -83,9 +85,6 @@ type singleRangeBatch struct {
// well as the positions and the subRequestIdx slices. Since we reuse these
// slices for the resume requests, this can be released only when the
// BatchResponse doesn't have any resume spans.
//
// RequestUnion.Size() ignores the overhead of RequestUnion object, so we
// need to account for it separately.
overheadAccountedFor int64
// minTargetBytes, if positive, indicates the minimum TargetBytes limit that
// this singleRangeBatch should be sent with in order for the response to
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ type resultsBufferBase struct {
hasResults chan struct{}
// overheadAccountedFor tracks how much overhead space for the Results in
// this results buffer has been consumed from the budget. Note that this
// does not include the footprint of Get and Scan responses.
// does not include the memory usage of Get and Scan responses (i.e. neither
// the footprint nor the overhead of a response is tracked by
// overheadAccountedFor).
overheadAccountedFor int64
err error
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvclient/kvstreamer/size.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{}))
requestOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion_Get{}) +
unsafe.Sizeof(kvpb.GetRequest{}))
responseUnionOverhead = int64(unsafe.Sizeof(kvpb.ResponseUnion_Get{}))
getResponseOverhead = int64(unsafe.Sizeof(kvpb.GetResponse{}))
scanResponseOverhead = int64(unsafe.Sizeof(kvpb.ScanResponse{}))
)

var zeroInt32Slice []int32
Expand All @@ -36,6 +39,10 @@ func init() {
if requestOverhead != scanRequestOverhead {
panic("GetRequest and ScanRequest have different overheads")
}
scanResponseUnionOverhead := int64(unsafe.Sizeof(kvpb.ResponseUnion_Get{}))
if responseUnionOverhead != scanResponseUnionOverhead {
panic("ResponseUnion_Get and ResponseUnion_Scan have different overheads")
}
zeroInt32Slice = make([]int32, 1<<10)
}

Expand Down
126 changes: 88 additions & 38 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,9 +562,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
}
var subRequestIdx []int32
var subRequestIdxOverhead int64
if !s.hints.SingleRowLookup {
for i, pos := range positions {
if _, isScan := reqs[pos].GetInner().(*kvpb.ScanRequest); isScan {
var numScansInReqs int64
for i, pos := range positions {
if _, isScan := reqs[pos].GetInner().(*kvpb.ScanRequest); isScan {
numScansInReqs++
if !s.hints.SingleRowLookup {
if firstScanRequest {
// We have some ScanRequests, and each might touch
// multiple ranges, so we have to set up
Expand Down Expand Up @@ -597,13 +599,15 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr
}
}

numGetsInReqs := int64(len(singleRangeReqs)) - numScansInReqs
overheadAccountedFor := requestUnionSliceOverhead + requestUnionOverhead*int64(cap(singleRangeReqs)) + // reqs
intSliceOverhead + intSize*int64(cap(positions)) + // positions
subRequestIdxOverhead // subRequestIdx
r := singleRangeBatch{
reqs: singleRangeReqs,
positions: positions,
subRequestIdx: subRequestIdx,
numGetsInReqs: numGetsInReqs,
reqsReservedBytes: requestsMemUsage(singleRangeReqs),
overheadAccountedFor: overheadAccountedFor,
}
Expand Down Expand Up @@ -1001,14 +1005,15 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted {
singleRangeReqs := w.s.requestsToServe.nextLocked()
availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used()
// minAcceptableBudget is the minimum TargetBytes limit with which it
// makes sense to issue this request (if we issue the request with
// smaller limit, then it's very likely to come back with an empty
// response).
minAcceptableBudget := singleRangeReqs.minTargetBytes
if minAcceptableBudget == 0 {
minAcceptableBudget = avgResponseSize
minTargetBytes := singleRangeReqs.minTargetBytes
if minTargetBytes == 0 {
minTargetBytes = avgResponseSize
}
minResponsesOverhead := getResponseOverhead + responseUnionOverhead
if _, isScan := singleRangeReqs.reqs[0].GetInner().(*kvpb.ScanRequest); isScan {
minResponsesOverhead = scanResponseOverhead + responseUnionOverhead
}
minAcceptableBudget := minTargetBytes + minResponsesOverhead
if availableBudget < minAcceptableBudget {
if !headOfLine {
// We don't have enough budget available to serve this request,
Expand All @@ -1031,25 +1036,45 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
// how much memory the response will need, and we reserve this
// estimation up front.
//
// Note that TargetBytes will be a strict limit on the response size
// (except in a degenerate case for head-of-the-line request that will
// get a very large single row in response which will exceed this
// limit).
// Note that TargetBytes will be a strict limit on the footprint of the
// responses (except in a degenerate case for head-of-the-line request
// that will get a very large single row in response which will exceed
// this limit).
targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize
// TargetBytes limit only accounts for the footprint of the responses,
// ignoring the overhead of GetResponse and ScanResponse structs. Thus,
// we need to consume that overhead from the budget while keeping it
// separately from the targetBytes parameter.
//
// Regardless of the fact how many non-empty responses we'll receive,
// the BatchResponse will get a corresponding GetResponse or
// ScanResponse struct for each of the requests. Furthermore, the
// BatchResponse will get an extra ResponseUnion struct for each
// response.
responsesOverhead := getResponseOverhead*singleRangeReqs.numGetsInReqs +
scanResponseOverhead*(int64(len(singleRangeReqs.reqs))-singleRangeReqs.numGetsInReqs) +
int64(len(singleRangeReqs.reqs))*responseUnionOverhead
// Make sure that targetBytes is sufficient to receive non-empty
// response. Our estimate might be an under-estimate when responses vary
// significantly in size.
if targetBytes < singleRangeReqs.minTargetBytes {
targetBytes = singleRangeReqs.minTargetBytes
}
if targetBytes > availableBudget {
// The estimate tells us that we don't have enough budget to receive
// the full response; however, in order to utilize the available
// budget fully, we can still issue this request with the truncated
// TargetBytes value hoping to receive a partial response.
targetBytes = availableBudget
if targetBytes+responsesOverhead > availableBudget {
if targetBytes > availableBudget {
// The estimate tells us that we don't have enough budget to
// receive the full response; however, in order to utilize the
// available budget fully, we can still issue this request with
// the truncated TargetBytes value hoping to receive a partial
// response.
targetBytes = availableBudget
responsesOverhead = 0
} else {
responsesOverhead = availableBudget - targetBytes
}
}
if err := w.s.budget.consumeLocked(ctx, targetBytes, headOfLine /* allowDebt */); err != nil {
toConsume := targetBytes + responsesOverhead
if err := w.s.budget.consumeLocked(ctx, toConsume, headOfLine /* allowDebt */); err != nil {
// This error cannot be because of the budget going into debt. If
// headOfLine is true, then we're allowing debt; otherwise, we have
// truncated targetBytes above to not exceed availableBudget, and
Expand Down Expand Up @@ -1084,7 +1109,7 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing(
// any more responses at the moment.
return err
}
w.performRequestAsync(ctx, singleRangeReqs, targetBytes, headOfLine)
w.performRequestAsync(ctx, singleRangeReqs, targetBytes, responsesOverhead, headOfLine)
w.s.requestsToServe.removeNextLocked()
maxNumRequestsToIssue--
headOfLine = false
Expand Down Expand Up @@ -1139,21 +1164,27 @@ const AsyncRequestOp = "streamer-lookup-async"
// w.asyncSem to spin up a new goroutine for this request.
//
// targetBytes specifies the memory budget that this single-range batch should
// be issued with. targetBytes bytes have already been consumed from the budget,
// and this amount of memory is owned by the goroutine that is spun up to
// perform the request. Once the response is received, performRequestAsync
// reconciles the budget so that the actual footprint of the response is
// consumed. Each Result produced based on that response will track a part of
// the memory reservation (according to the Result's footprint) that will be
// returned back to the budget once Result.Release() is called.
// be issued with. responsesOverhead specifies the estimate for the overhead of
// the responses to the requests in this single-range batch. targetBytes and
// responsesOverhead bytes have already been consumed from the budget, and this
// amount of memory is owned by the goroutine that is spun up to perform the
// request. Once the response is received, performRequestAsync reconciles the
// budget so that the actual footprint of the response is consumed. Each Result
// produced based on that response will track a part of the memory reservation
// (according to the Result's footprint) that will be returned back to the
// budget once Result.Release() is called.
//
// headOfLine indicates whether this request is the current head of the line and
// there are no unreleased Results. Head-of-the-line requests are treated
// specially in a sense that they are allowed to put the budget into debt. The
// caller is responsible for ensuring that there is at most one asynchronous
// request with headOfLine=true at all times.
func (w *workerCoordinator) performRequestAsync(
ctx context.Context, req singleRangeBatch, targetBytes int64, headOfLine bool,
ctx context.Context,
req singleRangeBatch,
targetBytes int64,
responsesOverhead int64,
headOfLine bool,
) {
w.s.waitGroup.Add(1)
w.s.adjustNumRequestsInFlight(1 /* delta */)
Expand Down Expand Up @@ -1220,7 +1251,7 @@ func (w *workerCoordinator) performRequestAsync(

// Now adjust the budget based on the actual memory footprint of
// non-empty responses as well as resume spans, if any.
respOverestimate := targetBytes - fp.memoryFootprintBytes
respOverestimate := targetBytes + responsesOverhead - fp.memoryFootprintBytes - fp.responsesOverhead
reqOveraccounted := req.reqsReservedBytes - fp.resumeReqsMemUsage
if fp.resumeReqsMemUsage == 0 {
// There will be no resume request, so we will lose the
Expand All @@ -1245,6 +1276,9 @@ func (w *workerCoordinator) performRequestAsync(
// but not enough for that large row).
toConsume := -overaccountedTotal
if err = w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil {
// TODO(yuzefovich): rather than dropping the response
// altogether, consider blocking to wait for the budget to
// open up, up to some limit.
atomic.AddInt64(&w.s.atomics.droppedBatchResponses, 1)
w.s.budget.release(ctx, targetBytes)
if !headOfLine {
Expand Down Expand Up @@ -1301,9 +1335,20 @@ func (w *workerCoordinator) performRequestAsync(
// response to a singleRangeBatch.
type singleRangeBatchResponseFootprint struct {
// memoryFootprintBytes tracks the total memory footprint of non-empty
// responses. This will be equal to the sum of memory tokens created for all
// Results.
// responses (excluding the overhead of the GetResponse and ScanResponse
// structs).
//
// In combination with responsesOverhead it will be equal to the sum of
// memory tokens created for all Results.
memoryFootprintBytes int64
// responsesOverhead tracks the overhead of the GetResponse and ScanResponse
// structs. Note that this doesn't need to track the overhead of
// ResponseUnion structs because we store GetResponses and ScanResponses
// directly in Result.
//
// In combination with memoryFootprintBytes it will be equal to the sum of
// memory tokens created for all Results.
responsesOverhead int64
// resumeReqsMemUsage tracks the memory usage of the requests for the
// ResumeSpans.
resumeReqsMemUsage int64
Expand Down Expand Up @@ -1343,6 +1388,7 @@ func calculateFootprint(
} else {
// This Get was completed.
fp.memoryFootprintBytes += getResponseSize(get)
fp.responsesOverhead += getResponseOverhead
fp.numGetResults++
}
case *kvpb.ScanRequest:
Expand All @@ -1361,6 +1407,7 @@ func calculateFootprint(
fp.memoryFootprintBytes += scanResponseSize(scan)
}
if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil {
fp.responsesOverhead += scanResponseOverhead
fp.numScanResults++
}
if scan.ResumeSpan != nil {
Expand Down Expand Up @@ -1473,7 +1520,7 @@ func processSingleRangeResults(
subRequestDone: true,
}
result.memoryTok.streamer = s
result.memoryTok.toRelease = getResponseSize(get)
result.memoryTok.toRelease = getResponseSize(get) + getResponseOverhead
memoryTokensBytes += result.memoryTok.toRelease
if buildutil.CrdbTestBuild {
if fp.numGetResults == 0 {
Expand Down Expand Up @@ -1503,7 +1550,7 @@ func processSingleRangeResults(
subRequestDone: scan.ResumeSpan == nil,
}
result.memoryTok.streamer = s
result.memoryTok.toRelease = scanResponseSize(scan)
result.memoryTok.toRelease = scanResponseSize(scan) + scanResponseOverhead
memoryTokensBytes += result.memoryTok.toRelease
result.ScanResp = scan
if s.hints.SingleRowLookup {
Expand Down Expand Up @@ -1535,10 +1582,12 @@ func processSingleRangeResults(
}

if buildutil.CrdbTestBuild {
if fp.memoryFootprintBytes != memoryTokensBytes {
if fp.memoryFootprintBytes+fp.responsesOverhead != memoryTokensBytes {
panic(errors.AssertionFailedf(
"different calculation of memory footprint\ncalculateFootprint: %d bytes\n"+
"processSingleRangeResults: %d bytes", fp.memoryFootprintBytes, memoryTokensBytes,
"different calculation of memory footprint\n"+
"calculateFootprint: memoryFootprintBytes = %d bytes, responsesOverhead = %d bytes\n"+
"processSingleRangeResults: %d bytes",
fp.memoryFootprintBytes, fp.responsesOverhead, memoryTokensBytes,
))
}
}
Expand All @@ -1560,6 +1609,7 @@ func buildResumeSingleRangeBatch(
resumeReq.reqs = req.reqs[:numIncompleteRequests]
resumeReq.positions = req.positions[:0]
resumeReq.subRequestIdx = req.subRequestIdx[:0]
resumeReq.numGetsInReqs = int64(fp.numIncompleteGets)
// We've already reconciled the budget with the actual reservation for the
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
Expand Down
Loading

0 comments on commit 377f06e

Please sign in to comment.