Skip to content

Commit

Permalink
kvstreamer: remove temporary allocations for []Result
Browse files Browse the repository at this point in the history
Previously, the worker goroutine would accumulate all `Result`s it
can create based on the KV response into a slice, and then the
slice would be passed into the results buffer. At that point, the
slice would be discarded since the results buffer would copy all
`Result`s into its internal state.

This commit refactors the streamer as well as the results buffer to
avoid this temporary allocation of `[]Result`. The main idea is that
`Result`s are now passed one-by-one into the results buffer. The worker
goroutine now acquires the results buffer's mutex, processes the KV
responses one at a time, and whenever a `Result` is created, it is added
into the results buffer right away. However, in order to prevent the
results buffer from eagerly returning a single `Result` on `GetResults`
call, the streamer's user goroutine won't be woken up, until a newly
introduced `doneAddingLocked` method is called by the worker goroutine.

Some care needs to be taken to prevent deadlocks with all of the
mutexes. Now, since we're finalizing the results one at a time, we might
need to hold the streamer's mutex (so that we can set `scanComplete`
correctly), and that mutex must be acquired before the results buffer's
one.

This change shows a modest improvement on the microbenchmarks but is
a lot more important on analytical, TPCH-like queries, where this
`[]Result` is one of the largest sources of garbage.

```
name                                                    old time/op    new time/op    delta
IndexJoin/Cockroach-24                                    5.98ms ± 1%    5.95ms ± 1%    ~     (p=0.079 n=9+10)
IndexJoin/MultinodeCockroach-24                           7.55ms ± 1%    7.59ms ± 1%  +0.47%  (p=0.015 n=8+9)
IndexJoinColumnFamilies/Cockroach-24                      8.68ms ± 3%    8.56ms ± 2%    ~     (p=0.133 n=9+10)
IndexJoinColumnFamilies/MultinodeCockroach-24             11.8ms ± 5%    11.7ms ± 3%    ~     (p=0.315 n=10+10)
LookupJoinEqColsAreKeyNoOrdering/Cockroach-24             6.67ms ± 1%    6.69ms ± 1%    ~     (p=0.315 n=10+9)
LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24    7.87ms ± 1%    7.92ms ± 1%  +0.73%  (p=0.015 n=10+10)
LookupJoinEqColsAreKeyOrdering/Cockroach-24               9.30ms ± 2%    9.31ms ± 4%    ~     (p=0.796 n=10+10)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24      10.9ms ± 4%    10.9ms ± 2%    ~     (p=0.971 n=10+10)
LookupJoinNoOrdering/Cockroach-24                         8.99ms ± 1%    9.03ms ± 4%    ~     (p=0.549 n=9+10)
LookupJoinNoOrdering/MultinodeCockroach-24                12.1ms ± 4%    11.9ms ± 6%    ~     (p=0.143 n=10+10)
LookupJoinOrdering/Cockroach-24                           10.9ms ± 3%    10.8ms ± 3%    ~     (p=0.243 n=10+9)
LookupJoinOrdering/MultinodeCockroach-24                  14.2ms ± 5%    13.9ms ± 3%    ~     (p=0.113 n=10+9)

name                                                    old alloc/op   new alloc/op   delta
IndexJoin/Cockroach-24                                    1.36MB ± 1%    1.31MB ± 0%  -3.61%  (p=0.000 n=10+9)
IndexJoin/MultinodeCockroach-24                           2.07MB ± 2%    2.04MB ± 3%    ~     (p=0.063 n=10+10)
IndexJoinColumnFamilies/Cockroach-24                      1.43MB ± 1%    1.38MB ± 0%  -3.56%  (p=0.000 n=9+9)
IndexJoinColumnFamilies/MultinodeCockroach-24             2.27MB ± 1%    2.22MB ± 2%  -2.09%  (p=0.000 n=8+10)
LookupJoinEqColsAreKeyNoOrdering/Cockroach-24             1.71MB ± 0%    1.67MB ± 0%  -2.70%  (p=0.000 n=9+10)
LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24    2.43MB ± 5%    2.35MB ± 1%  -3.31%  (p=0.000 n=10+10)
LookupJoinEqColsAreKeyOrdering/Cockroach-24               1.72MB ± 1%    1.62MB ± 1%  -6.20%  (p=0.000 n=10+10)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24      2.39MB ± 2%    2.30MB ± 3%  -3.53%  (p=0.000 n=10+10)
LookupJoinNoOrdering/Cockroach-24                         1.79MB ± 1%    1.74MB ± 1%  -2.80%  (p=0.000 n=10+9)
LookupJoinNoOrdering/MultinodeCockroach-24                2.35MB ± 3%    2.32MB ± 2%    ~     (p=0.079 n=10+9)
LookupJoinOrdering/Cockroach-24                           1.63MB ± 1%    1.53MB ± 1%  -5.77%  (p=0.000 n=10+10)
LookupJoinOrdering/MultinodeCockroach-24                  2.30MB ± 4%    2.23MB ± 2%  -3.41%  (p=0.002 n=9+8)

name                                                    old allocs/op  new allocs/op  delta
IndexJoin/Cockroach-24                                     7.15k ± 1%     7.16k ± 1%    ~     (p=0.888 n=10+9)
IndexJoin/MultinodeCockroach-24                            11.9k ± 2%     11.9k ± 2%    ~     (p=0.968 n=10+9)
IndexJoinColumnFamilies/Cockroach-24                       11.9k ± 0%     11.9k ± 0%    ~     (p=0.075 n=9+10)
IndexJoinColumnFamilies/MultinodeCockroach-24              17.6k ± 1%     17.5k ± 1%    ~     (p=0.566 n=10+10)
LookupJoinEqColsAreKeyNoOrdering/Cockroach-24              9.86k ± 1%     9.88k ± 1%    ~     (p=0.150 n=9+10)
LookupJoinEqColsAreKeyNoOrdering/MultinodeCockroach-24     14.1k ± 0%     14.1k ± 1%    ~     (p=0.055 n=8+10)
LookupJoinEqColsAreKeyOrdering/Cockroach-24                12.6k ± 1%     12.5k ± 1%  -0.77%  (p=0.005 n=10+10)
LookupJoinEqColsAreKeyOrdering/MultinodeCockroach-24       17.2k ± 1%     17.0k ± 0%  -0.88%  (p=0.000 n=10+8)
LookupJoinNoOrdering/Cockroach-24                          12.3k ± 1%     12.3k ± 1%    ~     (p=0.929 n=10+10)
LookupJoinNoOrdering/MultinodeCockroach-24                 16.8k ± 1%     16.8k ± 1%    ~     (p=0.968 n=9+10)
LookupJoinOrdering/Cockroach-24                            14.5k ± 1%     14.5k ± 1%    ~     (p=0.271 n=10+10)
LookupJoinOrdering/MultinodeCockroach-24                   19.4k ± 1%     19.3k ± 1%    ~     (p=0.056 n=9+8)
```

Release note: None
  • Loading branch information
yuzefovich committed Jun 28, 2022
1 parent 9cd494b commit ded11a4
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 128 deletions.
85 changes: 50 additions & 35 deletions pkg/kv/kvclient/kvstreamer/results_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,26 @@ type resultsBuffer interface {
// //
///////////////////////////////////////////////////////////////////////////

// add adds the provided Results into the buffer. If any Results are
// available to be returned to the client and there is a goroutine blocked
// in wait(), the goroutine is woken up.
add([]Result)
// Lock and Unlock expose methods on the mutex of the resultsBuffer. If the
// Streamer's mutex needs to be locked, then the Streamer's mutex must be
// acquired first.
Lock()
Unlock()

// addLocked adds the provided Result into the buffer. Note that if the
// Result is available to be returned to the client and there is a goroutine
// blocked in wait(), the goroutine is **not** woken up - doneAddingLocked()
// has to be called.
//
// The combination of multiple addLocked() calls followed by a single
// doneAddingLocked() call allows us to simulate adding many Results at
// once, without having to allocate a slice for that.
addLocked(Result)
// doneAddingLocked notifies the resultsBuffer that the worker goroutine
// added all Results it could, and the resultsBuffer checks whether any
// Results are available to be returned to the client. If there is a
// goroutine blocked in wait(), the goroutine is woken up.
doneAddingLocked()

///////////////////////////////////////////////////////////////////////////
// //
Expand Down Expand Up @@ -155,11 +171,10 @@ func (b *resultsBufferBase) initLocked(isEmpty bool, numExpectedResponses int) e
return nil
}

func (b *resultsBufferBase) findCompleteResponses(results []Result) {
for i := range results {
if results[i].GetResp != nil || results[i].scanComplete {
b.numCompleteResponses++
}
func (b *resultsBufferBase) checkIfCompleteLocked(r Result) {
b.Mutex.AssertHeld()
if r.GetResp != nil || r.scanComplete {
b.numCompleteResponses++
}
}

Expand Down Expand Up @@ -244,12 +259,15 @@ func (b *outOfOrderResultsBuffer) init(_ context.Context, numExpectedResponses i
return nil
}

func (b *outOfOrderResultsBuffer) add(results []Result) {
b.Lock()
defer b.Unlock()
b.results = append(b.results, results...)
b.findCompleteResponses(results)
b.numUnreleasedResults += len(results)
func (b *outOfOrderResultsBuffer) addLocked(r Result) {
b.Mutex.AssertHeld()
b.results = append(b.results, r)
b.checkIfCompleteLocked(r)
b.numUnreleasedResults++
}

func (b *outOfOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
b.signal()
}

Expand Down Expand Up @@ -397,35 +415,32 @@ func (b *inOrderResultsBuffer) init(ctx context.Context, numExpectedResponses in
return nil
}

func (b *inOrderResultsBuffer) add(results []Result) {
b.Lock()
defer b.Unlock()
func (b *inOrderResultsBuffer) addLocked(r Result) {
b.Mutex.AssertHeld()
// Note that we don't increase b.numUnreleasedResults because all these
// results are "buffered".
b.findCompleteResponses(results)
foundHeadOfLine := false
for _, r := range results {
if debug {
subRequestIdx := ""
if !b.singleRowLookup {
subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx)
}
fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease)
}
// All the Results have already been registered with the budget, so
// we're keeping them in-memory.
heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter})
if r.Position == b.headOfLinePosition && r.subRequestIdx == b.headOfLineSubRequestIdx {
foundHeadOfLine = true
b.checkIfCompleteLocked(r)
if debug {
subRequestIdx := ""
if !b.singleRowLookup {
subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx)
}
fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease)
}
if foundHeadOfLine {
// All the Results have already been registered with the budget, so we're
// keeping them in-memory.
heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter})
b.addCounter++
}

func (b *inOrderResultsBuffer) doneAddingLocked() {
b.Mutex.AssertHeld()
if len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx {
if debug {
fmt.Println("found head-of-the-line")
}
b.signal()
}
b.addCounter++
}

func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) {
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvclient/kvstreamer/results_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,14 @@ func TestInOrderResultsBuffer(t *testing.T) {
break
}

b.Lock()
numToAdd := rng.Intn(len(addOrder)) + 1
toAdd := make([]Result, numToAdd)
for i := range toAdd {
toAdd[i] = results[addOrder[0]]
for i := 0; i < numToAdd; i++ {
b.addLocked(results[addOrder[0]])
addOrder = addOrder[1:]
}
b.add(toAdd)
b.doneAddingLocked()
b.Unlock()

// With 50% probability, try spilling some of the buffered results
// to disk.
Expand Down
173 changes: 84 additions & 89 deletions pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ type Streamer struct {

mu struct {
// If the budget's mutex also needs to be locked, the budget's mutex
// must be acquired first.
// must be acquired first. If the results' mutex needs to be locked,
// then this mutex must be acquired first.
syncutil.Mutex

avgResponseEstimator avgResponseEstimator
Expand Down Expand Up @@ -1178,10 +1179,17 @@ type singleRangeBatchResponseFootprint struct {
memoryFootprintBytes int64
// resumeReqsMemUsage tracks the memory usage of the requests for the
// ResumeSpans.
resumeReqsMemUsage int64
resumeReqsMemUsage int64
// numGetResults and numScanResults indicate how many Result objects will
// need to be created for Get and Scan responses, respectively.
numGetResults, numScanResults int
numIncompleteGets, numIncompleteScans int
}

func (fp singleRangeBatchResponseFootprint) hasResults() bool {
return fp.numGetResults > 0 || fp.numScanResults > 0
}

func (fp singleRangeBatchResponseFootprint) hasIncomplete() bool {
return fp.numIncompleteGets > 0 || fp.numIncompleteScans > 0
}
Expand Down Expand Up @@ -1218,6 +1226,7 @@ func calculateFootprint(
} else {
// This Get was completed.
fp.memoryFootprintBytes += getResponseSize(get)
fp.numGetResults++
}
case *roachpb.ScanRequest:
scan := reply.(*roachpb.ScanResponse)
Expand All @@ -1234,6 +1243,9 @@ func calculateFootprint(
if len(scan.BatchResponses) > 0 {
fp.memoryFootprintBytes += scanResponseSize(scan)
}
if len(scan.BatchResponses) > 0 || scan.ResumeSpan == nil {
fp.numScanResults++
}
if scan.ResumeSpan != nil {
// This Scan wasn't completed.
scanRequestScratch.SetSpan(*scan.ResumeSpan)
Expand Down Expand Up @@ -1276,8 +1288,46 @@ func processSingleRangeResults(
br *roachpb.BatchResponse,
fp singleRangeBatchResponseFootprint,
) {
var results []Result
var hasNonEmptyScanResponse bool
// If there are no results, this function has nothing to do.
if !fp.hasResults() {
return
}

// We will add some Results into the results buffer, and doneAddingLocked()
// call below requires that the budget's mutex is held. It also must be
// acquired before the streamer's mutex is locked, so we have to do this
// right away.
// TODO(yuzefovich): check whether the lock contention on this mutex is
// noticeable and possibly refactor the code so that the budget's mutex is
// only acquired for the duration of doneAddingLocked().
s.budget.mu.Lock()
defer s.budget.mu.Unlock()
s.mu.Lock()

// TODO(yuzefovich): some of the responses might be partial, yet the
// estimator doesn't distinguish the footprint of the full response vs
// the partial one. Think more about this.
s.mu.avgResponseEstimator.update(
fp.memoryFootprintBytes, int64(fp.numGetResults+fp.numScanResults),
)

// If we have any Scan results to create and the Scan requests can return
// multiple rows, we'll need to consult s.mu.numRangesPerScanRequest, so
// we'll defer unlocking the streamer's mutex. However, if only Get results
// or Scan results of single rows will be created, we can unlock the
// streamer's mutex right away.
if fp.numScanResults > 0 && !s.hints.SingleRowLookup {
defer s.mu.Unlock()
} else {
s.mu.Unlock()
}

// Now we can get the resultsBuffer's mutex - it must be acquired after
// the Streamer's one.
s.results.Lock()
defer s.results.Unlock()
defer s.results.doneAddingLocked()

// memoryTokensBytes accumulates all reservations that are made for all
// Results created below. The accounting for these reservations has already
// been performed, and memoryTokensBytes should be exactly equal to
Expand Down Expand Up @@ -1307,7 +1357,14 @@ func processSingleRangeResults(
result.memoryTok.streamer = s
result.memoryTok.toRelease = getResponseSize(get)
memoryTokensBytes += result.memoryTok.toRelease
results = append(results, result)
if buildutil.CrdbTestBuild {
if fp.numGetResults == 0 {
panic(errors.AssertionFailedf(
"unexpectedly found a non-empty GetResponse when numGetResults is zero",
))
}
}
s.results.addLocked(result)

case *roachpb.ScanRequest:
scan := reply.(*roachpb.ScanResponse)
Expand All @@ -1332,12 +1389,30 @@ func processSingleRangeResults(
memoryTokensBytes += result.memoryTok.toRelease
result.ScanResp = scan
if s.hints.SingleRowLookup {
// When SingleRowLookup is false, scanComplete field will be
// set in finalizeSingleRangeResults().
result.scanComplete = true
} else if scan.ResumeSpan == nil {
// The scan within the range is complete.
if s.mode == OutOfOrder {
s.mu.numRangesPerScanRequest[position]--
result.scanComplete = s.mu.numRangesPerScanRequest[position] == 0
} else {
// In InOrder mode, the scan is marked as complete when the
// last sub-request is satisfied. Note that it is ok if the
// previous sub-requests haven't been satisfied yet - the
// inOrderResultsBuffer will not emit this Result until the
// previous sub-requests are responded to.
numSubRequests := s.mu.numRangesPerScanRequest[position]
result.scanComplete = result.subRequestIdx+1 == numSubRequests
}
}
if buildutil.CrdbTestBuild {
if fp.numScanResults == 0 {
panic(errors.AssertionFailedf(
"unexpectedly found a ScanResponse when numScanResults is zero",
))
}
}
results = append(results, result)
hasNonEmptyScanResponse = true
s.results.addLocked(result)
}
}

Expand All @@ -1349,86 +1424,6 @@ func processSingleRangeResults(
))
}
}

if len(results) > 0 {
finalizeSingleRangeResults(
s, results, fp.memoryFootprintBytes, hasNonEmptyScanResponse,
)
}
}

// finalizeSingleRangeResults "finalizes" the results of evaluation of a
// singleRangeBatch. By "finalization" we mean setting scanComplete field of
// ScanResp to correct value for all scan responses (when Hints.SingleRowLookup
// is false), updating the estimate of an average response size, and telling the
// Streamer about these results.
//
// This method assumes that results has length greater than zero.
func finalizeSingleRangeResults(
s *Streamer, results []Result, actualMemoryReservation int64, hasNonEmptyScanResponse bool,
) {
if buildutil.CrdbTestBuild {
if len(results) == 0 {
panic(errors.AssertionFailedf("finalizeSingleRangeResults is called with no results"))
}
}
s.mu.Lock()
defer s.mu.Unlock()

// If we have non-empty scan response, it might be complete. This will be
// the case when a scan response doesn't have a resume span and there are no
// other scan requests in flight (involving other ranges) that are part of
// the same original ScanRequest.
//
// We need to do this check as well as adding the results to be returned to
// the client as an atomic operation so that scanComplete is set to true
// only on the last partial scan response.
//
// However, if we got a hint that each lookup produces a single row, then we
// know that no original ScanRequest can span multiple ranges, so
// scanComplete field has already been set correctly.
if hasNonEmptyScanResponse && !s.hints.SingleRowLookup {
for i := range results {
if results[i].ScanResp != nil {
if results[i].ScanResp.ResumeSpan == nil {
// The scan within the range is complete.
if s.mode == OutOfOrder {
s.mu.numRangesPerScanRequest[results[i].Position]--
if s.mu.numRangesPerScanRequest[results[i].Position] == 0 {
// The scan across all ranges is now complete too.
results[i].scanComplete = true
}
} else {
// In InOrder mode, the scan is marked as complete when
// the last sub-request is satisfied. Note that it is ok
// if the previous sub-requests haven't been satisfied
// yet - the inOrderResultsBuffer will not emit this
// Result until the previous sub-requests are responded
// to.
numSubRequests := s.mu.numRangesPerScanRequest[results[i].Position]
results[i].scanComplete = results[i].subRequestIdx+1 == numSubRequests
}
} else {
// Unset the ResumeSpan on the result in order to not
// confuse the user of the Streamer. Non-nil resume span was
// already included into resumeReq populated in
// performRequestAsync.
results[i].ScanResp.ResumeSpan = nil
}
}
}
}

// Update the average response size based on this batch.
// TODO(yuzefovich): some of the responses might be partial, yet the
// estimator doesn't distinguish the footprint of the full response vs the
// partial one. Think more about this.
s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results)))
if debug {
printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup
fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation)
}
s.results.add(results)
}

// buildResumeSingleRangeBatch consumes a BatchResponse for a singleRangeBatch
Expand Down

0 comments on commit ded11a4

Please sign in to comment.