From 5349fe483f3601872fbd55a89d75162f88aac28d Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 28 Apr 2022 21:20:59 -0700 Subject: [PATCH] kvstreamer: fully support InOrder mode This commit extends the Streamer library to support Scan requests that can span multiple ranges in the InOrder mode which allows us to use the Streamer for the lookup joins when ordering needs to be maintained. As with the lookup joins w/o ordering, this is a "poor man's" support which relies on the de-duplication of requests in the join reader's span generators as well as doesn't remove the disk-backed row container that buffers all looked up rows in the join reader ordering strategy. Those caveats will be addressed in the follow-up commits. The contribution of this commit is such that the in-order results buffer can now correctly return the results when a single original Scan request touches multiple ranges as well as when each sub-request (against a single range) can get an arbitrary number of partial responses. Previously, we only had one axis for ordering the results - `Position` values which identify the original request that a particular Result is a response to. This was sufficient for index joins (as well as lookup joins when `SingleRowLookup` hint is `true`); however, when a Scan request can touch multiple ranges, that single axis is no longer sufficient since the results buffer could order two Results for a single Scan request arbitrarily. We go around this limitation by introducing a second dimension for ordering - "sub-request index" which is the ordinal of a particular single-range request within the multi-range Scan request. Consider the following example: original Scan request is `Scan(b, f)`, and we have three ranges: `[a, c)`, `[c, e)`, `[e, g)`. In `Streamer.Enqueue`, the original Scan is broken down into three single-range Scan requests: ``` singleRangeReq[0]: reqs = [Scan(b, c)] positions = [0] subRequestIdx = [0] singleRangeReq[1]: reqs = [Scan(c, e)] positions = [0] subRequestIdx = [1] singleRangeReq[2]: reqs = [Scan(e, f)] positions = [0] subRequestIdx = [2] ``` Note that `positions` values are the same (indicating that each single-range request is a part of the same original multi-range request), but values of `subRequestIdx` are different - they will allow us to order the responses to these single-range requests (which might come back in any order) correctly when returning the results. This information is plumbed into the requests as well as the results. There is yet another complication though - what if a single-range Scan request results in multiple partial responses? To make sure that these partial results are ordered correctly, we need yet another dimension, but at least that dimension can be fully hidden inside of the in-order results buffer. This is possible due to the fact that partial response for the same single-range Scan request will be added into the buffer at different times, so we'll assign the results "add epochs". Consider the following example: we have the original Scan request as `Scan(a, c)` which goes to a single range `[a, c)` containing keys `a` and `b`. Imagine that the Scan response can only contain a single key, so we first get a partial `ScanResponse('a')` with `ResumeSpan(b, c)`, and then we get a partial `ScanResponse('b')` with an empty `ResumeSpan`. The first response will be added to the buffer when during the first `add` call, so its "epoch" is 0 whereas the second response is added during "epoch" 1 - thus, we can correctly return `a` before `b` although the `Position` and sub-request values of two `Result`s are the same. Release note: None --- pkg/kv/kvclient/kvstreamer/large_keys_test.go | 11 +- .../kvclient/kvstreamer/requests_provider.go | 82 ++++++++++-- .../kvstreamer/requests_provider_test.go | 12 +- pkg/kv/kvclient/kvstreamer/results_buffer.go | 99 ++++++++++++--- .../kvstreamer/results_buffer_test.go | 42 ++++--- pkg/kv/kvclient/kvstreamer/streamer.go | 119 ++++++++++++------ pkg/kv/kvclient/kvstreamer/streamer_test.go | 67 +++++++++- .../kvstreamer_result_disk_buffer.go | 3 +- 8 files changed, 347 insertions(+), 88 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/large_keys_test.go b/pkg/kv/kvclient/kvstreamer/large_keys_test.go index 998c86ce8db3..adf01a2aa555 100644 --- a/pkg/kv/kvclient/kvstreamer/large_keys_test.go +++ b/pkg/kv/kvclient/kvstreamer/large_keys_test.go @@ -52,6 +52,10 @@ func TestLargeKeys(t *testing.T) { name: "lookup join, no ordering", query: "SELECT * FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob", }, + { + name: "lookup join, with ordering", + query: "SELECT max(length(blob)) FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob GROUP BY pk_blob", + }, } rng, _ := randutil.NewTestRand() @@ -132,7 +136,7 @@ func TestLargeKeys(t *testing.T) { INDEX (attribute)%s );`, familiesSuffix)) require.NoError(t, err) - _, err = db.Exec("CREATE TABLE bar (lookup_blob STRING)") + _, err = db.Exec("CREATE TABLE bar (lookup_blob STRING PRIMARY KEY)") require.NoError(t, err) // Insert some number of rows. @@ -193,6 +197,11 @@ func TestLargeKeys(t *testing.T) { func(t *testing.T) { _, err = db.Exec(tc.query) if err != nil { + // Make sure to discard the trace of the + // query that resulted in an error. If + // we don't do this, then the next test + // case will hang. + <-recCh t.Fatal(err) } // Now examine the trace and count the async diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index a4f1c977192f..e24fe07e3d6e 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -17,6 +17,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -24,15 +25,53 @@ import ( // singleRangeBatch contains parts of the originally enqueued requests that have // been truncated to be within a single range. All requests within the // singleRangeBatch will be issued as a single BatchRequest. +// TODO(yuzefovich): perform memory accounting for slices other than reqs in +// singleRangeBatch. type singleRangeBatch struct { reqs []roachpb.RequestUnion // positions is a 1-to-1 mapping with reqs to indicate which ordinal among // the originally enqueued requests a particular reqs[i] corresponds to. In // other words, if reqs[i] is (or a part of) enqueuedReqs[j], then // positions[i] = j. + // + // In the InOrder mode, positions[0] is treated as the priority of this + // singleRangeBatch where the smaller the value is, the sooner the Result + // will be needed, so batches with the smallest priority value have the + // highest "urgency". We look specifically at the 0th position because, by + // construction, values in positions slice are increasing. // TODO(yuzefovich): this might need to be [][]int when non-unique requests // are supported. positions []int + // subRequestIdx, if non-nil, is a 1-to-1 mapping with positions which + // indicates the ordinal of the corresponding reqs[i] among all sub-requests + // that comprise a single originally enqueued Scan request. This ordinal + // allows us to maintain the order of these sub-requests, each going to a + // different range. If reqs[i] is a Get request, then subRequestIdx[i] is 0. + // + // Consider the following example: original Scan request is Scan(b, f), and + // we have three ranges: [a, c), [c, e), [e, g). In Streamer.Enqueue, the + // original Scan is broken down into three single-range Scan requests: + // singleRangeReq[0]: + // reqs = [Scan(b, c)] + // positions = [0] + // subRequestIdx = [0] + // singleRangeReq[1]: + // reqs = [Scan(c, e)] + // positions = [0] + // subRequestIdx = [1] + // singleRangeReq[2]: + // reqs = [Scan(e, f)] + // positions = [0] + // subRequestIdx = [2] + // Note that positions values are the same (indicating that each + // single-range request is a part of the same original multi-range request), + // but values of subRequestIdx are different - they will allow us to order + // the responses to these single-range requests (which might come back in + // any order) correctly. + // + // subRequestIdx is only allocated in InOrder mode when + // Hints.SingleRowLookup is false and some Scan requests were enqueued. + subRequestIdx []int // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs. reqsReservedBytes int64 @@ -41,14 +80,6 @@ type singleRangeBatch struct { // not be empty. Note that TargetBytes of at least minTargetBytes is // necessary but might not be sufficient for the response to be non-empty. minTargetBytes int64 - // priority is the smallest number in positions. It is the priority of this - // singleRangeBatch where the smaller the value is, the sooner the Result - // will be needed, so batches with the smallest priority value has the - // highest "urgency". - // TODO(yuzefovich): once lookup joins are supported, we'll need a way to - // order singleRangeBatches that contain parts of a single ScanRequest - // spanning multiple ranges. - priority int } var _ sort.Interface = &singleRangeBatch{} @@ -60,6 +91,9 @@ func (r *singleRangeBatch) Len() int { func (r *singleRangeBatch) Swap(i, j int) { r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i] r.positions[i], r.positions[j] = r.positions[j], r.positions[i] + if r.subRequestIdx != nil { + r.subRequestIdx[i], r.subRequestIdx[j] = r.subRequestIdx[j], r.subRequestIdx[i] + } } // Less returns true if r.reqs[i]'s key comes before r.reqs[j]'s key. @@ -69,6 +103,24 @@ func (r *singleRangeBatch) Less(i, j int) bool { return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0 } +// priority returns the priority value of this batch. +// +// It is invalid to call this method on a batch with no requests. +func (r singleRangeBatch) priority() int { + return r.positions[0] +} + +// subPriority returns the "sub-priority" value of this batch that should be +// compared when two batches have the same priority value. +// +// It is invalid to call this method on a batch with no requests. +func (r singleRangeBatch) subPriority() int { + if r.subRequestIdx == nil { + return 0 + } + return r.subRequestIdx[0] +} + func reqsToString(reqs []singleRangeBatch) string { result := "requests for positions " for i, r := range reqs { @@ -245,7 +297,19 @@ func (p *inOrderRequestsProvider) Len() int { } func (p *inOrderRequestsProvider) Less(i, j int) bool { - return p.requests[i].priority < p.requests[j].priority + rI, rJ := p.requests[i], p.requests[j] + if buildutil.CrdbTestBuild { + if rI.priority() == rJ.priority() { + subI, subJ := rI.subRequestIdx, rJ.subRequestIdx + if (subI != nil && subJ == nil) || (subI == nil && subJ != nil) { + panic(errors.AssertionFailedf( + "unexpectedly only one subRequestIdx is non-nil when priorities are the same", + )) + } + } + } + return rI.priority() < rJ.priority() || + (rI.priority() == rJ.priority() && rI.subPriority() < rJ.subPriority()) } func (p *inOrderRequestsProvider) Swap(i, j int) { diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider_test.go b/pkg/kv/kvclient/kvstreamer/requests_provider_test.go index 4bcdcd558989..3b8e9b75a060 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider_test.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider_test.go @@ -33,8 +33,8 @@ func TestInOrderRequestsProvider(t *testing.T) { requests := make([]singleRangeBatch, rng.Intn(maxNumRequests)+1) priorities := make([]int, len(requests)) for i := range requests { - requests[i].priority = rng.Intn(maxNumRequests) - priorities[i] = requests[i].priority + requests[i].positions = []int{rng.Intn(maxNumRequests)} + priorities[i] = requests[i].priority() } sort.Ints(priorities) @@ -47,17 +47,17 @@ func TestInOrderRequestsProvider(t *testing.T) { first := p.firstLocked() p.removeFirstLocked() p.Unlock() - require.Equal(t, priorities[0], first.priority) + require.Equal(t, priorities[0], first.priority()) priorities = priorities[1:] // With 50% probability simulate that a resume request with random // priority is added. if rng.Float64() < 0.5 { - // Note that in reality the priority of the resume request cannot + // Note that in reality the position of the resume request cannot // have lower value than of the original request, but it's ok for // the test. - first.priority = rng.Intn(maxNumRequests) + first.positions[0] = rng.Intn(maxNumRequests) p.add(first) - priorities = append(priorities, first.priority) + priorities = append(priorities, first.priority()) sort.Ints(priorities) } } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index 1113417d1eda..c4f368d30045 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -207,13 +207,16 @@ func (b *resultsBufferBase) error() error { return b.err } -func resultsToString(results []Result) string { +func resultsToString(results []Result, printSubRequestIdx bool) string { result := "results for positions " for i, r := range results { if i > 0 { result += ", " } result += fmt.Sprintf("%d", r.Position) + if printSubRequestIdx { + result += fmt.Sprintf(" (%d)", r.subRequestIdx) + } } return result } @@ -312,23 +315,41 @@ type inOrderResultsBuffer struct { // headOfLinePosition is the Position value of the next Result to be // returned. headOfLinePosition int + // headOfLineSubRequestIdx is the sub-request ordinal of the next Result to + // be returned. This value only matters when the original Scan request spans + // multiple ranges - in such a scenario, multiple Results are created. For + // Get requests and for single-range Scan requests this will always stay at + // zero. + headOfLineSubRequestIdx int // buffered contains all buffered Results, regardless of whether they are // stored in-memory or on disk. buffered []inOrderBufferedResult diskBuffer ResultDiskBuffer + + // addCounter tracks the number of times add() has been called. See + // inOrderBufferedResult.addEpoch for why this is needed. + addCounter int + + // singleRowLookup is the value of Hints.SingleRowLookup. Only used for + // debug messages. + // TODO(yuzefovich): remove this when debug messages are removed. + singleRowLookup bool } var _ resultsBuffer = &inOrderResultsBuffer{} var _ heap.Interface = &inOrderResultsBuffer{} -func newInOrderResultsBuffer(budget *budget, diskBuffer ResultDiskBuffer) resultsBuffer { +func newInOrderResultsBuffer( + budget *budget, diskBuffer ResultDiskBuffer, singleRowLookup bool, +) resultsBuffer { if diskBuffer == nil { panic(errors.AssertionFailedf("diskBuffer is nil")) } return &inOrderResultsBuffer{ resultsBufferBase: newResultsBufferBase(budget), diskBuffer: diskBuffer, + singleRowLookup: singleRowLookup, } } @@ -337,7 +358,12 @@ func (b *inOrderResultsBuffer) Len() int { } func (b *inOrderResultsBuffer) Less(i, j int) bool { - return b.buffered[i].Position < b.buffered[j].Position + posI, posJ := b.buffered[i].Position, b.buffered[j].Position + subI, subJ := b.buffered[i].subRequestIdx, b.buffered[j].subRequestIdx + epochI, epochJ := b.buffered[i].addEpoch, b.buffered[j].addEpoch + return posI < posJ || // results for different requests + (posI == posJ && subI < subJ) || // results for the same Scan request, but for different ranges + (posI == posJ && subI == subJ && epochI < epochJ) // results for the same Scan request, for the same range } func (b *inOrderResultsBuffer) Swap(i, j int) { @@ -362,6 +388,8 @@ func (b *inOrderResultsBuffer) init(ctx context.Context, numExpectedResponses in return err } b.headOfLinePosition = 0 + b.headOfLineSubRequestIdx = 0 + b.addCounter = 0 if err := b.diskBuffer.Reset(ctx); err != nil { b.setErrorLocked(err) return err @@ -378,12 +406,16 @@ func (b *inOrderResultsBuffer) add(results []Result) { foundHeadOfLine := false for _, r := range results { if debug { - fmt.Printf("adding a result for position %d of size %d\n", r.Position, r.memoryTok.toRelease) + subRequestIdx := "" + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) + } + fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) } // All the Results have already been registered with the budget, so // we're keeping them in-memory. - heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false}) - if r.Position == b.headOfLinePosition { + heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) + if r.Position == b.headOfLinePosition && r.subRequestIdx == b.headOfLineSubRequestIdx { foundHeadOfLine = true } } @@ -393,12 +425,9 @@ func (b *inOrderResultsBuffer) add(results []Result) { } b.signal() } + b.addCounter++ } -// TODO(yuzefovich): this doesn't work for Scan responses spanning multiple -// Results, fix it once lookup joins are supported. In particular, it can -// reorder responses for a single ScanRequest since all of them have the same -// Position value. func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) { // Whenever a result is picked up from disk, we need to make the memory // reservation for it, so we acquire the budget's mutex. @@ -410,7 +439,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) if debug { fmt.Printf("attempting to get results, current headOfLinePosition = %d\n", b.headOfLinePosition) } - for len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition { + for len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx { result, toConsume, err := b.buffered[0].get(ctx, b) if err != nil { b.setErrorLocked(err) @@ -441,10 +470,16 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) } res = append(res, result) heap.Remove(b, 0) + if result.subRequestDone { + // Only advance the sub-request index if we're done with all parts + // of the sub-request. + b.headOfLineSubRequestIdx++ + } if result.GetResp != nil || result.ScanResp.Complete { // If the current Result is complete, then we need to advance the // head-of-the-line position. b.headOfLinePosition++ + b.headOfLineSubRequestIdx = 0 } } // Now all the Results in res are no longer "buffered" and become @@ -452,7 +487,8 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) b.numUnreleasedResults += len(res) if debug { if len(res) > 0 { - fmt.Printf("returning %s to the client, headOfLinePosition is now %d\n", resultsToString(res), b.headOfLinePosition) + printSubRequestIdx := !b.singleRowLookup + fmt.Printf("returning %s to the client, headOfLinePosition is now %d\n", resultsToString(res, printSubRequestIdx), b.headOfLinePosition) } } // All requests are complete IFF we have received the complete responses for @@ -472,7 +508,14 @@ func (b *inOrderResultsBuffer) stringLocked() string { if b.buffered[i].onDisk { onDiskInfo = " (on disk)" } - result += fmt.Sprintf("[%d]%s: size %d", b.buffered[i].Position, onDiskInfo, b.buffered[i].memoryTok.toRelease) + var subRequestIdx string + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", b.buffered[i].subRequestIdx) + } + result += fmt.Sprintf( + "[%d%s]%s: size %d", b.buffered[i].Position, subRequestIdx, + onDiskInfo, b.buffered[i].memoryTok.toRelease, + ) } return result } @@ -516,8 +559,8 @@ func (b *inOrderResultsBuffer) spill( if r := &b.buffered[idx]; !r.onDisk && r.Position > spillingPriority { if debug { fmt.Printf( - "spilling a result for position %d which will free up %d bytes\n", - r.Position, r.memoryTok.toRelease, + "spilling a result for position %d (%d) which will free up %d bytes\n", + r.Position, r.subRequestIdx, r.memoryTok.toRelease, ) } diskResultID, err := b.diskBuffer.Serialize(ctx, &b.buffered[idx].Result) @@ -553,8 +596,24 @@ func (b *inOrderResultsBuffer) close(ctx context.Context) { // of where it is stored (in-memory or on disk). type inOrderBufferedResult struct { // If onDisk is true, then only Result.ScanResp.Complete, Result.memoryTok, - // and Result.Position are set. + // Result.Position, Result.subRequestIdx, and Result.subRequestDone are set. Result + // addEpoch indicates the value of addCounter variable when this result was + // added to the buffer. This "epoch" allows us to order correctly two + // partial Results that came for the same original Scan request from the + // same range when one of the Results was the "ResumeSpan" Result to + // another. + // + // Consider the following example: we have the original Scan request as + // Scan(a, c) which goes to a single range [a, c) containing keys 'a' and + // 'b'. Imagine that the Scan response can only contain a single key, so we + // first get a partial ScanResponse('a') with ResumeSpan(b, c), and then we + // get a partial ScanResponse('b') with an empty ResumeSpan. The first + // response will be added to the buffer when addCounter is 0, so its "epoch" + // is 0 whereas the second response is added during "epoch" 1 - thus, we + // can correctly return 'a' before 'b' although the priority and + // subRequestIdx of two Results are the same. + addEpoch int // If onDisk is true, then the serialized Result is stored on disk in the // ResultDiskBuffer, identified by diskResultID. onDisk bool @@ -566,7 +625,13 @@ type inOrderBufferedResult struct { func (r *inOrderBufferedResult) spill(diskResultID int) { isScanComplete := r.ScanResp.Complete *r = inOrderBufferedResult{ - Result: Result{memoryTok: r.memoryTok, Position: r.Position}, + Result: Result{ + memoryTok: r.memoryTok, + Position: r.Position, + subRequestIdx: r.subRequestIdx, + subRequestDone: r.subRequestDone, + }, + addEpoch: r.addEpoch, onDisk: true, diskResultID: diskResultID, } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index 528a29dd97a2..efef68515f77 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -29,7 +29,8 @@ import ( ) // TestInOrderResultsBuffer verifies that the inOrderResultsBuffer returns the -// results in the correct order (with increasing 'Position' values). +// results in the correct order (with increasing 'Position' values, or with +// increasing 'subRequestIdx' values when 'Position' values are equal). // Additionally, it randomly asks the buffer to spill to disk. func TestInOrderResultsBuffer(t *testing.T) { defer leaktest.AfterTest(t)() @@ -59,7 +60,7 @@ func TestInOrderResultsBuffer(t *testing.T) { budget := newBudget(nil /* acc */, math.MaxInt /* limitBytes */) diskBuffer := TestResultDiskBufferConstructor(tempEngine, diskMonitor) - b := newInOrderResultsBuffer(budget, diskBuffer) + b := newInOrderResultsBuffer(budget, diskBuffer, false /* singleRowLookup */) defer b.close(ctx) for run := 0; run < 10; run++ { @@ -68,33 +69,42 @@ func TestInOrderResultsBuffer(t *testing.T) { // Generate a set of results. var results []Result - var addOrder []int for i := 0; i < numExpectedResponses; i++ { // Randomly choose between Get and Scan responses. if rng.Float64() < 0.5 { get := makeResultWithGetResp(rng, rng.Float64() < 0.1 /* empty */) + get.memoryTok.toRelease = rng.Int63n(100) get.Position = i results = append(results, get) } else { - scan := makeResultWithScanResp(rng) - // TODO(yuzefovich): once lookup joins are supported, make Scan - // responses spanning multiple ranges for a single original Scan - // request. - scan.ScanResp.Complete = true - scan.Position = i - results = append(results, scan) + // Randomize the number of ranges the original Scan request + // touches. + numRanges := 1 + if rng.Float64() < 0.5 { + numRanges = rng.Intn(10) + 1 + } + for j := 0; j < numRanges; j++ { + scan := makeResultWithScanResp(rng) + scan.ScanResp.Complete = j+1 == numRanges + scan.memoryTok.toRelease = rng.Int63n(100) + scan.Position = i + scan.subRequestIdx = j + scan.subRequestDone = true + results = append(results, scan) + } } - results[len(results)-1].memoryTok.toRelease = rng.Int63n(100) - addOrder = append(addOrder, i) } // Randomize the order in which the results are added into the buffer. + addOrder := make([]int, len(results)) + for i := range addOrder { + addOrder[i] = i + } rng.Shuffle(len(addOrder), func(i, j int) { addOrder[i], addOrder[j] = addOrder[j], addOrder[i] }) var received []Result - var addOrderIdx int for { r, allComplete, err := b.get(ctx) require.NoError(t, err) @@ -103,11 +113,11 @@ func TestInOrderResultsBuffer(t *testing.T) { break } - numToAdd := rng.Intn(len(results)-addOrderIdx) + 1 + numToAdd := rng.Intn(len(addOrder)) + 1 toAdd := make([]Result, numToAdd) for i := range toAdd { - toAdd[i] = results[addOrder[addOrderIdx]] - addOrderIdx++ + toAdd[i] = results[addOrder[0]] + addOrder = addOrder[1:] } b.add(toAdd) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 38ecd080ca8b..71a05b4e1d14 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -118,6 +118,20 @@ type Result struct { // TODO(yuzefovich): this might need to be []int when non-unique requests // are supported. Position int + // subRequestIdx allows us to order two Results that come for the same + // original Scan request but from different ranges. It is non-zero only in + // InOrder mode when Hints.SingleRowLookup is false, in all other cases it + // will remain zero. See singleRangeBatch.subRequestIdx for more details. + subRequestIdx int + // subRequestDone is true if the current Result is the last one for the + // corresponding sub-request. For all Get requests and for Scan requests + // contained within a single range, it is always true since those can only + // have a single sub-request. + // + // Note that for correctness, it is only necessary that this value is set + // properly if this Result is a Scan response and Hints.SingleRowLookup is + // false. + subRequestDone bool } // Hints provides different hints to the Streamer for optimization purposes. @@ -241,12 +255,18 @@ type Streamer struct { avgResponseEstimator avgResponseEstimator - // numRangesLeftPerScanRequest tracks how many ranges a particular - // originally enqueued ScanRequest touches, but scanning of those ranges - // isn't complete. It is allocated lazily if Hints.SingleRowLookup is - // false when the first ScanRequest is encountered in Enqueue. + // In OutOfOrder mode, numRangesPerScanRequest tracks how many + // ranges a particular originally enqueued ScanRequest touches, but + // scanning of those ranges isn't complete. + // + // In InOrder mode, it tracks how many ranges a particular originally + // enqueued ScanRequest touches. In other words, it contains how many + // "sub-requests" the original Scan request was broken down into. + // + // It is allocated lazily if Hints.SingleRowLookup is false when the + // first ScanRequest is encountered in Enqueue. // TODO(yuzefovich): perform memory accounting for this. - numRangesLeftPerScanRequest []int + numRangesPerScanRequest []int // numRequestsInFlight tracks the number of single-range batches that // are currently being served asynchronously (i.e. those that have @@ -348,7 +368,7 @@ func (s *Streamer) Init( s.results = newOutOfOrderResultsBuffer(s.budget) } else { s.requestsToServe = newInOrderRequestsProvider() - s.results = newInOrderResultsBuffer(s.budget, diskBuffer) + s.results = newInOrderResultsBuffer(s.budget, diskBuffer, hints.SingleRowLookup) } if !hints.UniqueRequests { panic(errors.AssertionFailedf("only unique requests are currently supported")) @@ -466,28 +486,35 @@ func (s *Streamer) Enqueue( if err != nil { return err } + var subRequestIdx []int if !s.hints.SingleRowLookup { - for _, pos := range positions { + for i, pos := range positions { if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan { if firstScanRequest { // We have some ScanRequests, and each might touch // multiple ranges, so we have to set up - // numRangesLeftPerScanRequest. + // numRangesPerScanRequest. streamerLocked = true s.mu.Lock() - if cap(s.mu.numRangesLeftPerScanRequest) < len(reqs) { - s.mu.numRangesLeftPerScanRequest = make([]int, len(reqs)) + if cap(s.mu.numRangesPerScanRequest) < len(reqs) { + s.mu.numRangesPerScanRequest = make([]int, len(reqs)) } else { - // We can reuse numRangesLeftPerScanRequest - // allocated on the previous call to Enqueue after - // we zero it out. - s.mu.numRangesLeftPerScanRequest = s.mu.numRangesLeftPerScanRequest[:len(reqs)] - for n := 0; n < len(s.mu.numRangesLeftPerScanRequest); { - n += copy(s.mu.numRangesLeftPerScanRequest[n:], zeroIntSlice) + // We can reuse numRangesPerScanRequest allocated on + // the previous call to Enqueue after we zero it + // out. + s.mu.numRangesPerScanRequest = s.mu.numRangesPerScanRequest[:len(reqs)] + for n := 0; n < len(s.mu.numRangesPerScanRequest); { + n += copy(s.mu.numRangesPerScanRequest[n:], zeroIntSlice) } } } - s.mu.numRangesLeftPerScanRequest[pos]++ + if s.mode == InOrder { + if subRequestIdx == nil { + subRequestIdx = make([]int, len(singleRangeReqs)) + } + subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos] + } + s.mu.numRangesPerScanRequest[pos]++ firstScanRequest = false } } @@ -500,8 +527,8 @@ func (s *Streamer) Enqueue( r := singleRangeBatch{ reqs: singleRangeReqs, positions: positions, + subRequestIdx: subRequestIdx, reqsReservedBytes: requestsMemUsage(singleRangeReqs), - priority: positions[0], } totalReqsMemUsage += r.reqsReservedBytes @@ -571,7 +598,8 @@ func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { if len(results) > 0 || allComplete || err != nil { if debug { if len(results) > 0 { - fmt.Printf("returning %s to the client\n", resultsToString(results)) + printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup + fmt.Printf("returning %s to the client\n", resultsToString(results, printSubRequestIdx)) } else { suffix := "all requests have been responded to" if !allComplete { @@ -685,7 +713,7 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) { // The first request has the highest urgency among all current // requests to serve, so we use its priority to spill everything // with less urgency when necessary to free up the budget. - spillingPriority = w.s.requestsToServe.firstLocked().priority + spillingPriority = w.s.requestsToServe.firstLocked().priority() } w.s.requestsToServe.Unlock() @@ -1229,12 +1257,11 @@ func (w *workerCoordinator) processSingleRangeResults( // We have to allocate the new slice for requests, but we can reuse the // positions slice. resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests) - // numIncompleteRequests will never exceed the number of requests in req. - resumeReq.positions = req.positions[:numIncompleteRequests] + resumeReq.positions = req.positions[:0] + resumeReq.subRequestIdx = req.subRequestIdx[:0] // We've already reconciled the budget with the actual reservation for the // requests with the ResumeSpans. resumeReq.reqsReservedBytes = resumeReqsMemUsage - resumeReq.priority = math.MaxInt gets := make([]struct { req roachpb.GetRequest union roachpb.RequestUnion_Get @@ -1257,6 +1284,10 @@ func (w *workerCoordinator) processSingleRangeResults( if w.s.enqueueKeys != nil { enqueueKey = w.s.enqueueKeys[position] } + var subRequestIdx int + if req.subRequestIdx != nil { + subRequestIdx = req.subRequestIdx[i] + } reply := resp.GetInner() switch origRequest := req.reqs[i].GetInner().(type) { case *roachpb.GetRequest: @@ -1271,13 +1302,13 @@ func (w *workerCoordinator) processSingleRangeResults( newGet.req.KeyLocking = origRequest.KeyLocking newGet.union.Get = &newGet.req resumeReq.reqs[resumeReqIdx].Value = &newGet.union - resumeReq.positions[resumeReqIdx] = req.positions[i] + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) + } if resumeReq.minTargetBytes == 0 { resumeReq.minTargetBytes = get.ResumeNextBytes } - if position < resumeReq.priority { - resumeReq.priority = position - } resumeReqIdx++ } else { // This Get was completed. @@ -1292,6 +1323,8 @@ func (w *workerCoordinator) processSingleRangeResults( // unique. EnqueueKeysSatisfied: []int{enqueueKey}, Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: true, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = getResponseSize(get) @@ -1325,6 +1358,8 @@ func (w *workerCoordinator) processSingleRangeResults( // are unique. EnqueueKeysSatisfied: []int{enqueueKey}, Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: scan.ResumeSpan == nil, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = scanResponseSize(scan) @@ -1349,13 +1384,13 @@ func (w *workerCoordinator) processSingleRangeResults( newScan.req.KeyLocking = origRequest.KeyLocking newScan.union.Scan = &newScan.req resumeReq.reqs[resumeReqIdx].Value = &newScan.union - resumeReq.positions[resumeReqIdx] = req.positions[i] + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) + } if resumeReq.minTargetBytes == 0 { resumeReq.minTargetBytes = scan.ResumeNextBytes } - if position < resumeReq.priority { - resumeReq.priority = position - } resumeReqIdx++ if w.s.hints.SingleRowLookup { @@ -1450,10 +1485,21 @@ func (w *workerCoordinator) finalizeSingleRangeResults( if results[i].ScanResp.ScanResponse != nil { if results[i].ScanResp.ResumeSpan == nil { // The scan within the range is complete. - w.s.mu.numRangesLeftPerScanRequest[results[i].Position]-- - if w.s.mu.numRangesLeftPerScanRequest[results[i].Position] == 0 { - // The scan across all ranges is now complete too. - results[i].ScanResp.Complete = true + if w.s.mode == OutOfOrder { + w.s.mu.numRangesPerScanRequest[results[i].Position]-- + if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 { + // The scan across all ranges is now complete too. + results[i].ScanResp.Complete = true + } + } else { + // In InOrder mode, the scan is marked as complete when + // the last sub-request is satisfied. Note that it is ok + // if the previous sub-requests haven't been satisfied + // yet - the inOrderResultsBuffer will not emit this + // Result until the previous sub-requests are responded + // to. + numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position] + results[i].ScanResp.Complete = results[i].subRequestIdx+1 == numSubRequests } } else { // Unset the ResumeSpan on the result in order to not @@ -1472,7 +1518,8 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // partial one. Think more about this. w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) if debug { - fmt.Printf("created %s with total size %d\n", resultsToString(results), actualMemoryReservation) + printSubRequestIdx := w.s.mode == InOrder && !w.s.hints.SingleRowLookup + fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation) } w.s.results.add(results) } diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index f550418ccc9c..c8dfab68773d 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" @@ -462,5 +463,67 @@ func TestStreamerEmptyScans(t *testing.T) { }) } -// TODO(yuzefovich): once lookup joins are supported, add a test for InOrder -// mode where Scan requests span multiple ranges. +// TestStreamerMultiRangeScan verifies that the Streamer correctly handles scan +// requests that span multiple ranges. +func TestStreamerMultiRangeScan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + rng, _ := randutil.NewTestRand() + numRows := rng.Intn(100) + 2 + + // We set up two tables such that we'll use the value from the smaller one + // to lookup into a secondary index in the larger table. + _, err := db.Exec("CREATE TABLE small (n PRIMARY KEY) AS SELECT 1") + require.NoError(t, err) + _, err = db.Exec("CREATE TABLE large (k INT PRIMARY KEY, n INT, s STRING, INDEX (n, k) STORING (s))") + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("INSERT INTO large SELECT i, 1, repeat('a', i) FROM generate_series(1, %d) AS i", numRows)) + require.NoError(t, err) + + // Split the range for the secondary index into multiple. + numRanges := 2 + if numRows > 2 { + numRanges = rng.Intn(numRows-2) + 2 + } + kValues := make([]int, numRows) + for i := range kValues { + kValues[i] = i + 1 + } + rng.Shuffle(numRows, func(i, j int) { + kValues[i], kValues[j] = kValues[j], kValues[i] + }) + splitKValues := kValues[:numRanges] + for _, kValue := range splitKValues { + _, err = db.Exec(fmt.Sprintf("ALTER INDEX large_n_k_idx SPLIT AT VALUES (1, %d)", kValue)) + require.NoError(t, err) + } + + // Populate the range cache. + _, err = db.Exec("SELECT * FROM large@large_n_k_idx") + require.NoError(t, err) + + // The crux of the test - run a query that performs a lookup join when + // ordering needs to be maintained and then confirm that the results of the + // parallel lookups are served in the right order. + r := db.QueryRow("SELECT array_agg(s) FROM small INNER LOOKUP JOIN large ON small.n = large.n GROUP BY small.n ORDER BY small.n") + var result string + err = r.Scan(&result) + require.NoError(t, err) + // The expected result is of the form: {a,aa,aaa,...}. + expected := "{" + for i := 1; i <= numRows; i++ { + if i > 1 { + expected += "," + } + for j := 0; j < i; j++ { + expected += "a" + } + } + expected += "}" + require.Equal(t, expected, result) +} diff --git a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go index dbcdd976dd62..118ccc8ce5bc 100644 --- a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go +++ b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go @@ -147,7 +147,8 @@ func (b *kvStreamerResultDiskBuffer) Close(ctx context.Context) { // kvstreamer.Result that is spilled to disk. // // It contains all the information except for 'ScanResp.Complete', 'memoryTok', -// and 'Position' fields which are kept in-memory (because they are allocated in +// 'Position', 'subRequestIdx', and 'subRequestDone' fields which are kept +// in-memory (because they are allocated in // kvstreamer.inOrderBufferedResult.Result anyway). var inOrderResultsBufferSpillTypeSchema = []*types.T{ types.Bool, // isGet