Skip to content

Commit

Permalink
kvstreamer: reduce allocations in the InOrder results buffer
Browse files Browse the repository at this point in the history
This commit improves the InOrder results buffer by reusing the same
scratch space for returning the results on `get()` calls as well as by
reducing the size of `inOrderBufferedResult` struct from the 80 bytes
size class to the 64 bytes size class.

It also cleans up a couple of comments and clarifies `GetResults` method
a bit.
```
name                                                  old time/op    new time/op    delta
LookupJoinEqColsAreKeyOrdering/Cockroach-24             8.47ms ± 3%    8.54ms ± 2%    ~     (p=0.182 n=9+10)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24    10.2ms ± 2%    10.1ms ± 2%    ~     (p=0.280 n=10+10)
LookupJoinOrdering/Cockroach-24                         10.4ms ± 3%    10.3ms ± 4%    ~     (p=0.393 n=10+10)
LookupJoinOrdering/MultinodeCockroach-24                13.8ms ± 5%    13.7ms ± 5%    ~     (p=0.739 n=10+10)

name                                                  old alloc/op   new alloc/op   delta
LookupJoinEqColsAreKeyOrdering/Cockroach-24             1.59MB ± 2%    1.49MB ± 1%  -6.20%  (p=0.000 n=10+9)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24    2.36MB ± 2%    2.27MB ± 2%  -3.49%  (p=0.000 n=10+10)
LookupJoinOrdering/Cockroach-24                         1.51MB ± 1%    1.42MB ± 1%  -6.27%  (p=0.000 n=9+10)
LookupJoinOrdering/MultinodeCockroach-24                2.37MB ± 6%    2.26MB ± 2%  -4.77%  (p=0.000 n=10+9)

name                                                  old allocs/op  new allocs/op  delta
LookupJoinEqColsAreKeyOrdering/Cockroach-24              10.4k ± 1%     10.3k ± 1%    ~     (p=0.055 n=8+9)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24     15.5k ± 0%     15.5k ± 1%  -0.34%  (p=0.037 n=10+10)
LookupJoinOrdering/Cockroach-24                          12.4k ± 1%     12.3k ± 1%  -0.98%  (p=0.000 n=9+10)
LookupJoinOrdering/MultinodeCockroach-24                 18.5k ± 1%     18.5k ± 2%    ~     (p=0.743 n=8+9)
```

Release note: None
  • Loading branch information
yuzefovich committed Jun 24, 2022
1 parent 051459a commit eac62e9
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
12 changes: 9 additions & 3 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type resultsBuffer interface {
// get returns all the Results that the buffer can send to the client at the
// moment. The boolean indicates whether all expected Results have been
// returned. Must be called without holding the budget's mutex.
//
// Calling get() invalidates the results returned on the previous call.
get(context.Context) (_ []Result, allComplete bool, _ error)

// wait blocks until there is at least one Result available to be returned
Expand Down Expand Up @@ -348,9 +350,12 @@ type inOrderResultsBuffer struct {

diskBuffer ResultDiskBuffer

// resultScratch is a scratch space reused by get() calls.
resultScratch []Result

// addCounter tracks the number of times add() has been called. See
// inOrderBufferedResult.addEpoch for why this is needed.
addCounter int
addCounter int32

// singleRowLookup is the value of Hints.SingleRowLookup. Only used for
// debug messages.
Expand Down Expand Up @@ -482,7 +487,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error)
defer b.budget.mu.Unlock()
b.Lock()
defer b.Unlock()
var res []Result
res := b.resultScratch[:0]
if debug {
fmt.Printf("attempting to get results, current headOfLinePosition = %d\n", b.headOfLinePosition)
}
Expand Down Expand Up @@ -541,6 +546,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error)
// All requests are complete IFF we have received the complete responses for
// all requests and there no buffered Results.
allComplete := b.numCompleteResponses == b.numExpectedResponses && len(b.buffered) == 0
b.resultScratch = res
return res, allComplete, b.err
}

Expand Down Expand Up @@ -661,7 +667,7 @@ type inOrderBufferedResult struct {
// is 0 whereas the second response is added during "epoch" 1 - thus, we
// can correctly return 'a' before 'b' although the priority and
// subRequestIdx of two Results are the same.
addEpoch int
addEpoch int32
// If onDisk is true, then the serialized Result is stored on disk in the
// ResultDiskBuffer, identified by diskResultID.
onDisk bool
Expand Down
13 changes: 5 additions & 8 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,6 @@ type Result struct {
ScanResp *roachpb.ScanResponse
// Position tracks the ordinal among all originally enqueued requests that
// this result satisfies. See singleRangeBatch.positions for more details.
//
// If Streamer.Enqueue() was called with nil enqueueKeys argument, then
// EnqueueKeysSatisfied will exactly contain Position; if non-nil
// enqueueKeys argument was passed, then Position is used as an ordinal to
// lookup into enqueueKeys to populate EnqueueKeysSatisfied.
// TODO(yuzefovich): this might need to be []int when non-unique requests
// are supported.
Position int
Expand Down Expand Up @@ -573,9 +568,11 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re

// GetResults blocks until at least one result is available. If the operation
// mode is OutOfOrder, any result will do, and the caller is expected to examine
// Result.EnqueueKeysSatisfied to understand which request the result
// corresponds to. For InOrder, only head-of-line results will do. Zero-length
// result slice is returned once all enqueued requests have been responded to.
// Result.Position to understand which request the result corresponds to. For
// InOrder, only head-of-line results will do. Zero-length result slice is
// returned once all enqueued requests have been responded to.
//
// Calling GetResults() invalidates the results returned on the previous call.
func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) {
for {
results, allComplete, err := s.results.get(ctx)
Expand Down

0 comments on commit eac62e9

Please sign in to comment.