Skip to content

Commit

Permalink
kvstreamer: reuse incomplete Get requests on resume batches
Browse files Browse the repository at this point in the history
Previously, for all incomplete requests in a batch we'd allocate new Get
and Scan requests (since - due to a known issue #75452 - at the moment
the lifecycle of the requests is not clearly defined, so we're not
allowed to modify them). However, we can reuse the Get requests since
they won't be ever modified (i.e. they are either complete or
incomplete, and, unlike for Scan requests, the start key won't ever be
shifted), so this commit takes advantage of this observation.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent 8b6e3c3 commit 21f2390
Showing 1 changed file with 14 additions and 17 deletions.
31 changes: 14 additions & 17 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,7 @@ func processSingleRangeResponse(
) {
processSingleRangeResults(ctx, s, req, br, fp)
if fp.hasIncomplete() {
resumeReq := buildResumeSingeRangeBatch(s, req, br, fp)
resumeReq := buildResumeSingleRangeBatch(s, req, br, fp)
s.requestsToServe.add(resumeReq)
}
}
Expand Down Expand Up @@ -1521,7 +1521,7 @@ func processSingleRangeResults(
//
// Note that it should only be called if the response has any incomplete
// requests.
func buildResumeSingeRangeBatch(
func buildResumeSingleRangeBatch(
s *Streamer,
req singleRangeBatch,
br *roachpb.BatchResponse,
Expand All @@ -1537,13 +1537,6 @@ func buildResumeSingeRangeBatch(
// requests with the ResumeSpans.
resumeReq.reqsReservedBytes = fp.resumeReqsMemUsage
resumeReq.overheadAccountedFor = req.overheadAccountedFor
// TODO(yuzefovich): for incomplete Get requests, the ResumeSpan should be
// exactly the same as the original span, so we might be able to reuse the
// original Get requests.
gets := make([]struct {
req roachpb.GetRequest
union roachpb.RequestUnion_Get
}, fp.numIncompleteGets)
scans := make([]struct {
req roachpb.ScanRequest
union roachpb.RequestUnion_Scan
Expand All @@ -1560,14 +1553,18 @@ func buildResumeSingeRangeBatch(
emptyResponse = false
continue
}
// This Get wasn't completed - create a new request according to the
// ResumeSpan and include it into the batch.
newGet := gets[0]
gets = gets[1:]
newGet.req.SetSpan(*get.ResumeSpan)
newGet.req.KeyLocking = origRequest.KeyLocking
newGet.union.Get = &newGet.req
resumeReq.reqs[resumeReqIdx].Value = &newGet.union
// This Get wasn't completed - include it into the batch again (we
// can just reuse the original request since it hasn't been
// modified which is also asserted below).
if buildutil.CrdbTestBuild {
if !get.ResumeSpan.Equal(origRequest.Span()) {
panic(errors.AssertionFailedf(
"unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s",
get.ResumeSpan, origRequest.Span(),
))
}
}
resumeReq.reqs[resumeReqIdx] = req.reqs[i]
resumeReq.positions = append(resumeReq.positions, position)
if req.subRequestIdx != nil {
resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, req.subRequestIdx[i])
Expand Down

0 comments on commit 21f2390

Please sign in to comment.