From 40d28a575696ba12d24dd4cba6becb75a4491fa9 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 21 Feb 2023 14:00:52 -0800 Subject: [PATCH] kvstreamer: account for the overhead of GetResponse and ScanResponse The Streamer is careful to account for the requests (both the footprint and the overhead) as well as to estimate the footprint of the responses. However, it currently doesn't account for the overhead of the GetResponse (currently 64 bytes) and ScanResponse (120 bytes) structs. We recently saw a case where this overhead was the largest user of RAM which contributed to the pod OOMing. This commit fixes this accounting oversight in the following manner: - prior to issuing the BatchRequest, we estimate the overhead of a response to each request in the batch. Notably, the BatchResponse will contain a RequestUnion object as well as the GetResponse or ScanResponse object for each request - once the BatchResponse is received, we reconcile the budget to track the precise memory usage of the responses (ignoring the RequestUnion since we don't keep a reference to it). We already tracked the "footprint" and now we also include the "overhead" with both being released to the budget on `Result.Release` call. We track this "responses overhead" usage separately from the target bytes usage (the "footprint") since the KV server doesn't include the overhead when determining how to handle `TargetBytes` limit, and we must behave in the same manner. It's worth noting that the overhead of the response structs is proportional to the number of requests included in the BatchRequest since every request will get a corresponding (possibly empty) response. Release note: None --- .../kvstreamer/avg_response_estimator.go | 6 +- .../kvclient/kvstreamer/requests_provider.go | 5 +- pkg/kv/kvclient/kvstreamer/results_buffer.go | 4 +- pkg/kv/kvclient/kvstreamer/size.go | 7 + pkg/kv/kvclient/kvstreamer/streamer.go | 141 ++++++++++++----- pkg/kv/kvclient/kvstreamer/streamer_test.go | 145 ++++++++++++++++-- pkg/sql/row/kv_batch_fetcher.go | 2 + 7 files changed, 249 insertions(+), 61 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go index 60597f53547f..b0204b8d5861 100644 --- a/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go +++ b/pkg/kv/kvclient/kvstreamer/avg_response_estimator.go @@ -42,6 +42,8 @@ func (e *avgResponseEstimator) init(sv *settings.Values) { e.avgResponseSizeMultiple = streamerAvgResponseSizeMultiple.Get(sv) } +// getAvgResponseSize returns the current estimate of a footprint of a single +// response. func (e *avgResponseEstimator) getAvgResponseSize() int64 { if e.numResponses == 0 { return initialAvgResponseSize @@ -78,8 +80,8 @@ func (e *avgResponseEstimator) getAvgResponseSize() int64 { } // update updates the actual information of the estimator based on numResponses -// responses that took up responseBytes bytes and correspond to a single -// BatchResponse. +// responses that took up responseBytes bytes in footprint and correspond to a +// single BatchResponse. func (e *avgResponseEstimator) update(responseBytes int64, numResponses int64) { e.responseBytes += float64(responseBytes) e.numResponses += float64(numResponses) diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index e0a4ea462f24..b9f1bd1e3411 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -75,6 +75,8 @@ type singleRangeBatch struct { // subRequestIdx is only allocated in InOrder mode when // Hints.SingleRowLookup is false and some Scan requests were enqueued. subRequestIdx []int32 + // numGetsInReqs tracks the number of Get requests in reqs. + numGetsInReqs int64 // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs, excluding the overhead. reqsReservedBytes int64 @@ -83,9 +85,6 @@ type singleRangeBatch struct { // well as the positions and the subRequestIdx slices. Since we reuse these // slices for the resume requests, this can be released only when the // BatchResponse doesn't have any resume spans. - // - // RequestUnion.Size() ignores the overhead of RequestUnion object, so we - // need to account for it separately. overheadAccountedFor int64 // minTargetBytes, if positive, indicates the minimum TargetBytes limit that // this singleRangeBatch should be sent with in order for the response to diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index 7ab3906b03f3..5629568dcdd9 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -159,7 +159,9 @@ type resultsBufferBase struct { hasResults chan struct{} // overheadAccountedFor tracks how much overhead space for the Results in // this results buffer has been consumed from the budget. Note that this - // does not include the footprint of Get and Scan responses. + // does not include the memory usage of Get and Scan responses (i.e. neither + // the footprint nor the overhead of a response is tracked by + // overheadAccountedFor). overheadAccountedFor int64 err error } diff --git a/pkg/kv/kvclient/kvstreamer/size.go b/pkg/kv/kvclient/kvstreamer/size.go index d08ff04f6ad7..8c50b3340cb3 100644 --- a/pkg/kv/kvclient/kvstreamer/size.go +++ b/pkg/kv/kvclient/kvstreamer/size.go @@ -26,6 +26,9 @@ const ( requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{})) requestOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion_Get{}) + unsafe.Sizeof(kvpb.GetRequest{})) + responseUnionOverhead = int64(unsafe.Sizeof(kvpb.ResponseUnion_Get{})) + getResponseOverhead = int64(unsafe.Sizeof(kvpb.GetResponse{})) + scanResponseOverhead = int64(unsafe.Sizeof(kvpb.ScanResponse{})) ) var zeroInt32Slice []int32 @@ -36,6 +39,10 @@ func init() { if requestOverhead != scanRequestOverhead { panic("GetRequest and ScanRequest have different overheads") } + scanResponseUnionOverhead := int64(unsafe.Sizeof(kvpb.ResponseUnion_Scan{})) + if responseUnionOverhead != scanResponseUnionOverhead { + panic("ResponseUnion_Get and ResponseUnion_Scan have different overheads") + } zeroInt32Slice = make([]int32, 1<<10) } diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 051c7695e034..8b648df12f5b 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -562,9 +562,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr } var subRequestIdx []int32 var subRequestIdxOverhead int64 - if !s.hints.SingleRowLookup { - for i, pos := range positions { - if _, isScan := reqs[pos].GetInner().(*kvpb.ScanRequest); isScan { + var numScansInReqs int64 + for i, pos := range positions { + if _, isScan := reqs[pos].GetInner().(*kvpb.ScanRequest); isScan { + numScansInReqs++ + if !s.hints.SingleRowLookup { if firstScanRequest { // We have some ScanRequests, and each might touch // multiple ranges, so we have to set up @@ -597,6 +599,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr } } + numGetsInReqs := int64(len(singleRangeReqs)) - numScansInReqs overheadAccountedFor := requestUnionSliceOverhead + requestUnionOverhead*int64(cap(singleRangeReqs)) + // reqs intSliceOverhead + intSize*int64(cap(positions)) + // positions subRequestIdxOverhead // subRequestIdx @@ -604,6 +607,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []kvpb.RequestUnion) (retEr reqs: singleRangeReqs, positions: positions, subRequestIdx: subRequestIdx, + numGetsInReqs: numGetsInReqs, reqsReservedBytes: requestsMemUsage(singleRangeReqs), overheadAccountedFor: overheadAccountedFor, } @@ -1001,14 +1005,32 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( for !w.s.requestsToServe.emptyLocked() && maxNumRequestsToIssue > 0 && !budgetIsExhausted { singleRangeReqs := w.s.requestsToServe.nextLocked() availableBudget := w.s.budget.limitBytes - w.s.budget.mu.acc.Used() - // minAcceptableBudget is the minimum TargetBytes limit with which it - // makes sense to issue this request (if we issue the request with - // smaller limit, then it's very likely to come back with an empty - // response). - minAcceptableBudget := singleRangeReqs.minTargetBytes - if minAcceptableBudget == 0 { - minAcceptableBudget = avgResponseSize + // minTargetBytes is the minimum TargetBytes limit with which it makes + // sense to issue this single-range BatchRequest (if we issue the + // request with smaller limit, then it's very likely to result only in + // empty responses). + minTargetBytes := singleRangeReqs.minTargetBytes + if minTargetBytes == 0 { + minTargetBytes = avgResponseSize } + // TargetBytes limit only accounts for the footprint of the responses, + // ignoring the overhead of GetResponse and ScanResponse structs. Thus, + // we need to account for that overhead separately from the TargetBytes + // limit. + // + // Regardless of the fact how many non-empty responses we'll receive, + // the BatchResponse will get a corresponding GetResponse or + // ScanResponse struct for each of the requests. Furthermore, the + // BatchResponse will get an extra ResponseUnion struct for each + // response. + responsesOverhead := getResponseOverhead*singleRangeReqs.numGetsInReqs + + scanResponseOverhead*(int64(len(singleRangeReqs.reqs))-singleRangeReqs.numGetsInReqs) + + int64(len(singleRangeReqs.reqs))*responseUnionOverhead + // minAcceptableBudget is an estimate on the lower bound of how much + // memory budget we must have available in order to issue this + // single-range BatchRequest so that we won't discard the corresponding + // BatchResponse. + minAcceptableBudget := minTargetBytes + responsesOverhead if availableBudget < minAcceptableBudget { if !headOfLine { // We don't have enough budget available to serve this request, @@ -1031,10 +1053,10 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // how much memory the response will need, and we reserve this // estimation up front. // - // Note that TargetBytes will be a strict limit on the response size - // (except in a degenerate case for head-of-the-line request that will - // get a very large single row in response which will exceed this - // limit). + // Note that TargetBytes will be a strict limit on the footprint of the + // responses (except in a degenerate case for head-of-the-line request + // that will get a very large single row in response which will exceed + // this limit). targetBytes := int64(len(singleRangeReqs.reqs)) * avgResponseSize // Make sure that targetBytes is sufficient to receive non-empty // response. Our estimate might be an under-estimate when responses vary @@ -1042,14 +1064,32 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( if targetBytes < singleRangeReqs.minTargetBytes { targetBytes = singleRangeReqs.minTargetBytes } - if targetBytes > availableBudget { - // The estimate tells us that we don't have enough budget to receive - // the full response; however, in order to utilize the available - // budget fully, we can still issue this request with the truncated - // TargetBytes value hoping to receive a partial response. - targetBytes = availableBudget + if targetBytes+responsesOverhead > availableBudget { + // We don't have enough budget to account for both the TargetBytes + // limit and the overhead of the responses. We give higher + // precedence to the former (choosing the performance over the + // stability), so we always truncate the responses' overhead and + // might reduce the TargetBytes limit. This is ok since our + // estimates are on the best effort basis, and we'll do precise + // accounting when we receive the BatchResponse. + // TODO(yuzefovich): consider not including all of the requests from + // singleRangeReqs.reqs into the BatchRequest in cases when we're + // low on the available budget. + if targetBytes > availableBudget { + // The estimate tells us that we don't have enough budget to + // receive non-empty responses for all requests in the + // BatchRequest; however, in order to utilize the available + // budget fully, we can still issue this BatchRequest with the + // truncated TargetBytes value hoping to receive non-empty + // results at least for some requests. + targetBytes = availableBudget + responsesOverhead = 0 + } else { + responsesOverhead = availableBudget - targetBytes + } } - if err := w.s.budget.consumeLocked(ctx, targetBytes, headOfLine /* allowDebt */); err != nil { + toConsume := targetBytes + responsesOverhead + if err := w.s.budget.consumeLocked(ctx, toConsume, headOfLine /* allowDebt */); err != nil { // This error cannot be because of the budget going into debt. If // headOfLine is true, then we're allowing debt; otherwise, we have // truncated targetBytes above to not exceed availableBudget, and @@ -1084,7 +1124,7 @@ func (w *workerCoordinator) issueRequestsForAsyncProcessing( // any more responses at the moment. return err } - w.performRequestAsync(ctx, singleRangeReqs, targetBytes, headOfLine) + w.performRequestAsync(ctx, singleRangeReqs, targetBytes, responsesOverhead, headOfLine) w.s.requestsToServe.removeNextLocked() maxNumRequestsToIssue-- headOfLine = false @@ -1139,13 +1179,15 @@ const AsyncRequestOp = "streamer-lookup-async" // w.asyncSem to spin up a new goroutine for this request. // // targetBytes specifies the memory budget that this single-range batch should -// be issued with. targetBytes bytes have already been consumed from the budget, -// and this amount of memory is owned by the goroutine that is spun up to -// perform the request. Once the response is received, performRequestAsync -// reconciles the budget so that the actual footprint of the response is -// consumed. Each Result produced based on that response will track a part of -// the memory reservation (according to the Result's footprint) that will be -// returned back to the budget once Result.Release() is called. +// be issued with. responsesOverhead specifies the estimate for the overhead of +// the responses to the requests in this single-range batch. targetBytes and +// responsesOverhead bytes have already been consumed from the budget, and this +// amount of memory is owned by the goroutine that is spun up to perform the +// request. Once the response is received, performRequestAsync reconciles the +// budget so that the actual footprint of the response is consumed. Each Result +// produced based on that response will track a part of the memory reservation +// (according to the Result's footprint) that will be returned back to the +// budget once Result.Release() is called. // // headOfLine indicates whether this request is the current head of the line and // there are no unreleased Results. Head-of-the-line requests are treated @@ -1153,7 +1195,11 @@ const AsyncRequestOp = "streamer-lookup-async" // caller is responsible for ensuring that there is at most one asynchronous // request with headOfLine=true at all times. func (w *workerCoordinator) performRequestAsync( - ctx context.Context, req singleRangeBatch, targetBytes int64, headOfLine bool, + ctx context.Context, + req singleRangeBatch, + targetBytes int64, + responsesOverhead int64, + headOfLine bool, ) { w.s.waitGroup.Add(1) w.s.adjustNumRequestsInFlight(1 /* delta */) @@ -1220,7 +1266,7 @@ func (w *workerCoordinator) performRequestAsync( // Now adjust the budget based on the actual memory footprint of // non-empty responses as well as resume spans, if any. - respOverestimate := targetBytes - fp.memoryFootprintBytes + respOverestimate := targetBytes + responsesOverhead - fp.memoryFootprintBytes - fp.responsesOverhead reqOveraccounted := req.reqsReservedBytes - fp.resumeReqsMemUsage if fp.resumeReqsMemUsage == 0 { // There will be no resume request, so we will lose the @@ -1245,6 +1291,9 @@ func (w *workerCoordinator) performRequestAsync( // but not enough for that large row). toConsume := -overaccountedTotal if err = w.s.budget.consume(ctx, toConsume, headOfLine /* allowDebt */); err != nil { + // TODO(yuzefovich): rather than dropping the response + // altogether, consider blocking to wait for the budget to + // open up, up to some limit. atomic.AddInt64(&w.s.atomics.droppedBatchResponses, 1) w.s.budget.release(ctx, targetBytes) if !headOfLine { @@ -1301,9 +1350,20 @@ func (w *workerCoordinator) performRequestAsync( // response to a singleRangeBatch. type singleRangeBatchResponseFootprint struct { // memoryFootprintBytes tracks the total memory footprint of non-empty - // responses. This will be equal to the sum of memory tokens created for all - // Results. + // responses (excluding the overhead of the GetResponse and ScanResponse + // structs). + // + // In combination with responsesOverhead it will be equal to the sum of + // memory tokens created for all Results. memoryFootprintBytes int64 + // responsesOverhead tracks the overhead of the GetResponse and ScanResponse + // structs. Note that this doesn't need to track the overhead of + // ResponseUnion structs because we store GetResponses and ScanResponses + // directly in Result. + // + // In combination with memoryFootprintBytes it will be equal to the sum of + // memory tokens created for all Results. + responsesOverhead int64 // resumeReqsMemUsage tracks the memory usage of the requests for the // ResumeSpans. resumeReqsMemUsage int64 @@ -1343,6 +1403,7 @@ func calculateFootprint( } else { // This Get was completed. fp.memoryFootprintBytes += getResponseSize(get) + fp.responsesOverhead += getResponseOverhead fp.numGetResults++ } case *kvpb.ScanRequest: @@ -1361,6 +1422,7 @@ func calculateFootprint( fp.memoryFootprintBytes += scanResponseSize(scan) } if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil { + fp.responsesOverhead += scanResponseOverhead fp.numScanResults++ } if scan.ResumeSpan != nil { @@ -1473,7 +1535,7 @@ func processSingleRangeResults( subRequestDone: true, } result.memoryTok.streamer = s - result.memoryTok.toRelease = getResponseSize(get) + result.memoryTok.toRelease = getResponseSize(get) + getResponseOverhead memoryTokensBytes += result.memoryTok.toRelease if buildutil.CrdbTestBuild { if fp.numGetResults == 0 { @@ -1503,7 +1565,7 @@ func processSingleRangeResults( subRequestDone: scan.ResumeSpan == nil, } result.memoryTok.streamer = s - result.memoryTok.toRelease = scanResponseSize(scan) + result.memoryTok.toRelease = scanResponseSize(scan) + scanResponseOverhead memoryTokensBytes += result.memoryTok.toRelease result.ScanResp = scan if s.hints.SingleRowLookup { @@ -1535,10 +1597,12 @@ func processSingleRangeResults( } if buildutil.CrdbTestBuild { - if fp.memoryFootprintBytes != memoryTokensBytes { + if fp.memoryFootprintBytes+fp.responsesOverhead != memoryTokensBytes { panic(errors.AssertionFailedf( - "different calculation of memory footprint\ncalculateFootprint: %d bytes\n"+ - "processSingleRangeResults: %d bytes", fp.memoryFootprintBytes, memoryTokensBytes, + "different calculation of memory footprint\n"+ + "calculateFootprint: memoryFootprintBytes = %d bytes, responsesOverhead = %d bytes\n"+ + "processSingleRangeResults: %d bytes", + fp.memoryFootprintBytes, fp.responsesOverhead, memoryTokensBytes, )) } } @@ -1560,6 +1624,7 @@ func buildResumeSingleRangeBatch( resumeReq.reqs = req.reqs[:numIncompleteRequests] resumeReq.positions = req.positions[:0] resumeReq.subRequestIdx = req.subRequestIdx[:0] + resumeReq.numGetsInReqs = int64(fp.numIncompleteGets) // We've already reconciled the budget with the actual reservation for the // requests with the ResumeSpans. resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index e2f1de7e1f3c..28e2d6713f7b 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -365,6 +365,22 @@ func TestStreamerWideRows(t *testing.T) { } } +func makeScanRequest(tableID byte, start, end int) kvpb.RequestUnion { + var res kvpb.RequestUnion + var scan kvpb.ScanRequest + var union kvpb.RequestUnion_Scan + makeKey := func(pk int) []byte { + // These numbers essentially make a key like '/t/primary/pk'. + return []byte{tableID + 136, 137, byte(136 + pk)} + } + scan.Key = makeKey(start) + scan.EndKey = makeKey(end) + scan.ScanFormat = kvpb.BATCH_RESPONSE + union.Scan = &scan + res.Value = &union + return res +} + // TestStreamerEmptyScans verifies that the Streamer behaves correctly when // Scan requests return empty responses. func TestStreamerEmptyScans(t *testing.T) { @@ -398,21 +414,6 @@ func TestStreamerEmptyScans(t *testing.T) { _, err = db.Exec("SELECT count(*) from t") require.NoError(t, err) - makeScanRequest := func(start, end int) kvpb.RequestUnion { - var res kvpb.RequestUnion - var scan kvpb.ScanRequest - var union kvpb.RequestUnion_Scan - makeKey := func(pk int) []byte { - // These numbers essentially make a key like '/t/primary/pk'. - return []byte{tableID + 136, 137, byte(136 + pk)} - } - scan.Key = makeKey(start) - scan.EndKey = makeKey(end) - union.Scan = &scan - res.Value = &union - return res - } - getStreamer := func() *Streamer { s := getStreamer(ctx, s, math.MaxInt64, nil /* acc */) // There are two column families in the table. @@ -426,7 +427,7 @@ func TestStreamerEmptyScans(t *testing.T) { // Scan the row with pk=0. reqs := make([]kvpb.RequestUnion, 1) - reqs[0] = makeScanRequest(0, 1) + reqs[0] = makeScanRequest(tableID, 0, 1) require.NoError(t, streamer.Enqueue(ctx, reqs)) results, err := streamer.GetResults(ctx) require.NoError(t, err) @@ -440,7 +441,7 @@ func TestStreamerEmptyScans(t *testing.T) { // Scan the rows with pk in range [1, 4). reqs := make([]kvpb.RequestUnion, 1) - reqs[0] = makeScanRequest(1, 4) + reqs[0] = makeScanRequest(tableID, 1, 4) require.NoError(t, streamer.Enqueue(ctx, reqs)) // We expect an empty response for each range. var numResults int @@ -523,3 +524,113 @@ func TestStreamerMultiRangeScan(t *testing.T) { expected += "}" require.Equal(t, expected, result) } + +// TestStreamerMemoryAccounting performs sanity checking on the memory +// accounting done by the streamer. +func TestStreamerMemoryAccounting(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Create a table (for which we know the encoding of valid keys) with a + // single row. + _, err := db.Exec("CREATE TABLE t (pk PRIMARY KEY, k) AS VALUES (0, 0)") + require.NoError(t, err) + + const tableID = 104 + // Sanity check that the table 't' has the expected TableID. + assertTableID(t, db, "t" /* tableName */, tableID) + + makeGetRequest := func(key int) kvpb.RequestUnion { + var res kvpb.RequestUnion + var get kvpb.GetRequest + var union kvpb.RequestUnion_Get + makeKey := func(pk int) []byte { + // These numbers essentially make a key like '/t/primary/key/0'. + return []byte{tableID + 136, 137, byte(136 + pk), 136} + } + get.Key = makeKey(key) + union.Get = &get + res.Value = &union + return res + } + + monitor := mon.NewMonitor( + "streamer", /* name */ + mon.MemoryResource, + nil, /* curCount */ + nil, /* maxHist */ + -1, /* increment */ + math.MaxInt64, /* noteworthy */ + cluster.MakeTestingClusterSettings(), + ) + monitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) + defer monitor.Stop(ctx) + acc := monitor.MakeBoundAccount() + defer acc.Close(ctx) + + getStreamer := func(singleRowLookup bool) *Streamer { + require.Zero(t, acc.Used()) + s := getStreamer(ctx, s, math.MaxInt64, &acc) + s.Init(OutOfOrder, Hints{UniqueRequests: true, SingleRowLookup: singleRowLookup}, 1 /* maxKeysPerRow */, nil /* diskBuffer */) + return s + } + + t.Run("get", func(t *testing.T) { + acc.Clear(ctx) + // SingleRowLookup hint only influences the accounting when at least + // one Scan request is present. + streamer := getStreamer(false /* singleRowLookup */) + defer streamer.Close(ctx) + + // Get the row with pk=0. + reqs := make([]kvpb.RequestUnion, 1) + reqs[0] = makeGetRequest(0) + require.NoError(t, streamer.Enqueue(ctx, reqs)) + results, err := streamer.GetResults(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(results)) + // 7 is the number of bytes in GetResponse.Value.RawBytes. + var expectedMemToken = getResponseOverhead + 7 + require.Equal(t, expectedMemToken, results[0].memoryTok.toRelease) + var expectedUsed = expectedMemToken + resultSize + require.Equal(t, expectedUsed, acc.Used()) + }) + + for _, singleRowLookup := range []bool{false, true} { + t.Run(fmt.Sprintf("scan/single_row_lookup=%t", singleRowLookup), func(t *testing.T) { + acc.Clear(ctx) + streamer := getStreamer(singleRowLookup) + defer streamer.Close(ctx) + + // Scan the row with pk=0. + reqs := make([]kvpb.RequestUnion, 1) + reqs[0] = makeScanRequest(tableID, 0, 1) + require.NoError(t, streamer.Enqueue(ctx, reqs)) + results, err := streamer.GetResults(ctx) + require.NoError(t, err) + require.Equal(t, 1, len(results)) + // 29 is usually the number of bytes in + // ScanResponse.BatchResponse[0]. We choose to hard-code this number + // rather than consult NumBytes field directly as an additional + // sanity-check. + expectedMemToken := scanResponseOverhead + 29 + if results[0].ScanResp.NumBytes == 33 { + // For some reason, sometimes it's not 29, but 33, and we do + // allow for this. + expectedMemToken += 4 + } + require.Equal(t, expectedMemToken, results[0].memoryTok.toRelease) + expectedUsed := expectedMemToken + resultSize + if !singleRowLookup { + // This is streamer.numRangesPerScanRequestAccountedFor which is + // only non-zero when SingleRowLookup hint is false. + expectedUsed += 4 + } + require.Equal(t, expectedUsed, acc.Used()) + }) + } +} diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 53e527ee523b..4efa029ce4c2 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -466,6 +466,8 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error { } else { f.responses = nil } + // TODO(yuzefovich): BatchResponse.Size ignores the overhead of the + // GetResponse and ScanResponse structs. We should include it here. returnedBytes := int64(br.Size()) if monitoring && (returnedBytes > int64(f.batchBytesLimit) || returnedBytes > f.batchResponseAccountedFor) { // Resize up to the actual amount of bytes we got back from the fetch,