Skip to content

Commit

Permalink
kvstreamer: remove some redundant function calls
Browse files Browse the repository at this point in the history
This commit refactors the logic of processing the BatchResponse in order
to avoid getting the "inner" request for each response - it is
sufficient for us to do a type switch on the response itself.

I did run the microbenchmarks, and they showed no difference, but
I believe this change should be beneficial.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent 41228d1 commit 746696a
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
29 changes: 16 additions & 13 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ type Streamer struct {
hints Hints
maxKeysPerRow int32
budget *budget
keyLocking lock.Strength

streamerStatistics

Expand Down Expand Up @@ -354,6 +355,7 @@ func NewStreamer(
limitBytes int64,
acc *mon.BoundAccount,
batchRequestsIssued *int64,
keyLocking lock.Strength,
) *Streamer {
if txn.Type() != kv.LeafTxn {
panic(errors.AssertionFailedf("RootTxn is given to the Streamer"))
Expand All @@ -362,6 +364,7 @@ func NewStreamer(
distSender: distSender,
stopper: stopper,
budget: newBudget(acc, limitBytes),
keyLocking: keyLocking,
}
if batchRequestsIssued == nil {
batchRequestsIssued = new(int64)
Expand Down Expand Up @@ -1428,9 +1431,9 @@ func processSingleRangeResults(
subRequestIdx = req.subRequestIdx[i]
}
reply := resp.GetInner()
switch req.reqs[i].GetInner().(type) {
case *roachpb.GetRequest:
get := reply.(*roachpb.GetResponse)
switch response := reply.(type) {
case *roachpb.GetResponse:
get := response
if get.ResumeSpan != nil {
// This Get wasn't completed.
continue
Expand All @@ -1454,8 +1457,8 @@ func processSingleRangeResults(
}
s.results.addLocked(result)

case *roachpb.ScanRequest:
scan := reply.(*roachpb.ScanResponse)
case *roachpb.ScanResponse:
scan := response
if len(scan.BatchResponses) == 0 && scan.ResumeSpan != nil {
// Only the first part of the conditional is true whenever we
// received an empty response for the Scan request (i.e. there
Expand Down Expand Up @@ -1546,9 +1549,9 @@ func buildResumeSingleRangeBatch(
for i, resp := range br.Responses {
position := req.positions[i]
reply := resp.GetInner()
switch origRequest := req.reqs[i].GetInner().(type) {
case *roachpb.GetRequest:
get := reply.(*roachpb.GetResponse)
switch response := reply.(type) {
case *roachpb.GetResponse:
get := response
if get.ResumeSpan == nil {
emptyResponse = false
continue
Expand All @@ -1557,10 +1560,10 @@ func buildResumeSingleRangeBatch(
// can just reuse the original request since it hasn't been
// modified which is also asserted below).
if buildutil.CrdbTestBuild {
if !get.ResumeSpan.Equal(origRequest.Span()) {
if origSpan := req.reqs[i].GetInner().Header().Span(); !get.ResumeSpan.Equal(origSpan) {
panic(errors.AssertionFailedf(
"unexpectedly the ResumeSpan %s on the GetResponse is different from the original span %s",
get.ResumeSpan, origRequest.Span(),
get.ResumeSpan, origSpan,
))
}
}
Expand All @@ -1574,8 +1577,8 @@ func buildResumeSingleRangeBatch(
}
resumeReqIdx++

case *roachpb.ScanRequest:
scan := reply.(*roachpb.ScanResponse)
case *roachpb.ScanResponse:
scan := response
if scan.ResumeSpan == nil {
emptyResponse = false
continue
Expand All @@ -1586,7 +1589,7 @@ func buildResumeSingleRangeBatch(
scans = scans[1:]
newScan.req.SetSpan(*scan.ResumeSpan)
newScan.req.ScanFormat = roachpb.BATCH_RESPONSE
newScan.req.KeyLocking = origRequest.KeyLocking
newScan.req.KeyLocking = s.keyLocking
newScan.union.Scan = &newScan.req
resumeReq.reqs[resumeReqIdx].Value = &newScan.union
resumeReq.positions = append(resumeReq.positions, position)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvclient/kvstreamer/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func getStreamer(
limitBytes,
acc,
nil, /* batchRequestsIssued */
lock.None,
)
}

Expand Down Expand Up @@ -98,6 +99,7 @@ func TestStreamerLimitations(t *testing.T) {
math.MaxInt64, /* limitBytes */
nil, /* acc */
nil, /* batchRequestsIssued */
lock.None,
)
})
})
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func NewStreamingKVFetcher(
streamerBudgetLimit,
streamerBudgetAcc,
&batchRequestsIssued,
getKeyLockingStrength(lockStrength),
)
mode := kvstreamer.OutOfOrder
if maintainOrdering {
Expand Down

0 comments on commit 746696a

Please sign in to comment.