diff --git a/pkg/kv/kvclient/kvstreamer/large_keys_test.go b/pkg/kv/kvclient/kvstreamer/large_keys_test.go index 998c86ce8db3..adf01a2aa555 100644 --- a/pkg/kv/kvclient/kvstreamer/large_keys_test.go +++ b/pkg/kv/kvclient/kvstreamer/large_keys_test.go @@ -52,6 +52,10 @@ func TestLargeKeys(t *testing.T) { name: "lookup join, no ordering", query: "SELECT * FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob", }, + { + name: "lookup join, with ordering", + query: "SELECT max(length(blob)) FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob GROUP BY pk_blob", + }, } rng, _ := randutil.NewTestRand() @@ -132,7 +136,7 @@ func TestLargeKeys(t *testing.T) { INDEX (attribute)%s );`, familiesSuffix)) require.NoError(t, err) - _, err = db.Exec("CREATE TABLE bar (lookup_blob STRING)") + _, err = db.Exec("CREATE TABLE bar (lookup_blob STRING PRIMARY KEY)") require.NoError(t, err) // Insert some number of rows. @@ -193,6 +197,11 @@ func TestLargeKeys(t *testing.T) { func(t *testing.T) { _, err = db.Exec(tc.query) if err != nil { + // Make sure to discard the trace of the + // query that resulted in an error. If + // we don't do this, then the next test + // case will hang. + <-recCh t.Fatal(err) } // Now examine the trace and count the async diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider.go b/pkg/kv/kvclient/kvstreamer/requests_provider.go index a4f1c977192f..e24fe07e3d6e 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider.go @@ -17,6 +17,7 @@ import ( "sync" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" ) @@ -24,15 +25,53 @@ import ( // singleRangeBatch contains parts of the originally enqueued requests that have // been truncated to be within a single range. All requests within the // singleRangeBatch will be issued as a single BatchRequest. +// TODO(yuzefovich): perform memory accounting for slices other than reqs in +// singleRangeBatch. type singleRangeBatch struct { reqs []roachpb.RequestUnion // positions is a 1-to-1 mapping with reqs to indicate which ordinal among // the originally enqueued requests a particular reqs[i] corresponds to. In // other words, if reqs[i] is (or a part of) enqueuedReqs[j], then // positions[i] = j. + // + // In the InOrder mode, positions[0] is treated as the priority of this + // singleRangeBatch where the smaller the value is, the sooner the Result + // will be needed, so batches with the smallest priority value have the + // highest "urgency". We look specifically at the 0th position because, by + // construction, values in positions slice are increasing. // TODO(yuzefovich): this might need to be [][]int when non-unique requests // are supported. positions []int + // subRequestIdx, if non-nil, is a 1-to-1 mapping with positions which + // indicates the ordinal of the corresponding reqs[i] among all sub-requests + // that comprise a single originally enqueued Scan request. This ordinal + // allows us to maintain the order of these sub-requests, each going to a + // different range. If reqs[i] is a Get request, then subRequestIdx[i] is 0. + // + // Consider the following example: original Scan request is Scan(b, f), and + // we have three ranges: [a, c), [c, e), [e, g). In Streamer.Enqueue, the + // original Scan is broken down into three single-range Scan requests: + // singleRangeReq[0]: + // reqs = [Scan(b, c)] + // positions = [0] + // subRequestIdx = [0] + // singleRangeReq[1]: + // reqs = [Scan(c, e)] + // positions = [0] + // subRequestIdx = [1] + // singleRangeReq[2]: + // reqs = [Scan(e, f)] + // positions = [0] + // subRequestIdx = [2] + // Note that positions values are the same (indicating that each + // single-range request is a part of the same original multi-range request), + // but values of subRequestIdx are different - they will allow us to order + // the responses to these single-range requests (which might come back in + // any order) correctly. + // + // subRequestIdx is only allocated in InOrder mode when + // Hints.SingleRowLookup is false and some Scan requests were enqueued. + subRequestIdx []int // reqsReservedBytes tracks the memory reservation against the budget for // the memory usage of reqs. reqsReservedBytes int64 @@ -41,14 +80,6 @@ type singleRangeBatch struct { // not be empty. Note that TargetBytes of at least minTargetBytes is // necessary but might not be sufficient for the response to be non-empty. minTargetBytes int64 - // priority is the smallest number in positions. It is the priority of this - // singleRangeBatch where the smaller the value is, the sooner the Result - // will be needed, so batches with the smallest priority value has the - // highest "urgency". - // TODO(yuzefovich): once lookup joins are supported, we'll need a way to - // order singleRangeBatches that contain parts of a single ScanRequest - // spanning multiple ranges. - priority int } var _ sort.Interface = &singleRangeBatch{} @@ -60,6 +91,9 @@ func (r *singleRangeBatch) Len() int { func (r *singleRangeBatch) Swap(i, j int) { r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i] r.positions[i], r.positions[j] = r.positions[j], r.positions[i] + if r.subRequestIdx != nil { + r.subRequestIdx[i], r.subRequestIdx[j] = r.subRequestIdx[j], r.subRequestIdx[i] + } } // Less returns true if r.reqs[i]'s key comes before r.reqs[j]'s key. @@ -69,6 +103,24 @@ func (r *singleRangeBatch) Less(i, j int) bool { return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0 } +// priority returns the priority value of this batch. +// +// It is invalid to call this method on a batch with no requests. +func (r singleRangeBatch) priority() int { + return r.positions[0] +} + +// subPriority returns the "sub-priority" value of this batch that should be +// compared when two batches have the same priority value. +// +// It is invalid to call this method on a batch with no requests. +func (r singleRangeBatch) subPriority() int { + if r.subRequestIdx == nil { + return 0 + } + return r.subRequestIdx[0] +} + func reqsToString(reqs []singleRangeBatch) string { result := "requests for positions " for i, r := range reqs { @@ -245,7 +297,19 @@ func (p *inOrderRequestsProvider) Len() int { } func (p *inOrderRequestsProvider) Less(i, j int) bool { - return p.requests[i].priority < p.requests[j].priority + rI, rJ := p.requests[i], p.requests[j] + if buildutil.CrdbTestBuild { + if rI.priority() == rJ.priority() { + subI, subJ := rI.subRequestIdx, rJ.subRequestIdx + if (subI != nil && subJ == nil) || (subI == nil && subJ != nil) { + panic(errors.AssertionFailedf( + "unexpectedly only one subRequestIdx is non-nil when priorities are the same", + )) + } + } + } + return rI.priority() < rJ.priority() || + (rI.priority() == rJ.priority() && rI.subPriority() < rJ.subPriority()) } func (p *inOrderRequestsProvider) Swap(i, j int) { diff --git a/pkg/kv/kvclient/kvstreamer/requests_provider_test.go b/pkg/kv/kvclient/kvstreamer/requests_provider_test.go index 4bcdcd558989..3b8e9b75a060 100644 --- a/pkg/kv/kvclient/kvstreamer/requests_provider_test.go +++ b/pkg/kv/kvclient/kvstreamer/requests_provider_test.go @@ -33,8 +33,8 @@ func TestInOrderRequestsProvider(t *testing.T) { requests := make([]singleRangeBatch, rng.Intn(maxNumRequests)+1) priorities := make([]int, len(requests)) for i := range requests { - requests[i].priority = rng.Intn(maxNumRequests) - priorities[i] = requests[i].priority + requests[i].positions = []int{rng.Intn(maxNumRequests)} + priorities[i] = requests[i].priority() } sort.Ints(priorities) @@ -47,17 +47,17 @@ func TestInOrderRequestsProvider(t *testing.T) { first := p.firstLocked() p.removeFirstLocked() p.Unlock() - require.Equal(t, priorities[0], first.priority) + require.Equal(t, priorities[0], first.priority()) priorities = priorities[1:] // With 50% probability simulate that a resume request with random // priority is added. if rng.Float64() < 0.5 { - // Note that in reality the priority of the resume request cannot + // Note that in reality the position of the resume request cannot // have lower value than of the original request, but it's ok for // the test. - first.priority = rng.Intn(maxNumRequests) + first.positions[0] = rng.Intn(maxNumRequests) p.add(first) - priorities = append(priorities, first.priority) + priorities = append(priorities, first.priority()) sort.Ints(priorities) } } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer.go b/pkg/kv/kvclient/kvstreamer/results_buffer.go index 1113417d1eda..c4f368d30045 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer.go @@ -207,13 +207,16 @@ func (b *resultsBufferBase) error() error { return b.err } -func resultsToString(results []Result) string { +func resultsToString(results []Result, printSubRequestIdx bool) string { result := "results for positions " for i, r := range results { if i > 0 { result += ", " } result += fmt.Sprintf("%d", r.Position) + if printSubRequestIdx { + result += fmt.Sprintf(" (%d)", r.subRequestIdx) + } } return result } @@ -312,23 +315,41 @@ type inOrderResultsBuffer struct { // headOfLinePosition is the Position value of the next Result to be // returned. headOfLinePosition int + // headOfLineSubRequestIdx is the sub-request ordinal of the next Result to + // be returned. This value only matters when the original Scan request spans + // multiple ranges - in such a scenario, multiple Results are created. For + // Get requests and for single-range Scan requests this will always stay at + // zero. + headOfLineSubRequestIdx int // buffered contains all buffered Results, regardless of whether they are // stored in-memory or on disk. buffered []inOrderBufferedResult diskBuffer ResultDiskBuffer + + // addCounter tracks the number of times add() has been called. See + // inOrderBufferedResult.addEpoch for why this is needed. + addCounter int + + // singleRowLookup is the value of Hints.SingleRowLookup. Only used for + // debug messages. + // TODO(yuzefovich): remove this when debug messages are removed. + singleRowLookup bool } var _ resultsBuffer = &inOrderResultsBuffer{} var _ heap.Interface = &inOrderResultsBuffer{} -func newInOrderResultsBuffer(budget *budget, diskBuffer ResultDiskBuffer) resultsBuffer { +func newInOrderResultsBuffer( + budget *budget, diskBuffer ResultDiskBuffer, singleRowLookup bool, +) resultsBuffer { if diskBuffer == nil { panic(errors.AssertionFailedf("diskBuffer is nil")) } return &inOrderResultsBuffer{ resultsBufferBase: newResultsBufferBase(budget), diskBuffer: diskBuffer, + singleRowLookup: singleRowLookup, } } @@ -337,7 +358,12 @@ func (b *inOrderResultsBuffer) Len() int { } func (b *inOrderResultsBuffer) Less(i, j int) bool { - return b.buffered[i].Position < b.buffered[j].Position + posI, posJ := b.buffered[i].Position, b.buffered[j].Position + subI, subJ := b.buffered[i].subRequestIdx, b.buffered[j].subRequestIdx + epochI, epochJ := b.buffered[i].addEpoch, b.buffered[j].addEpoch + return posI < posJ || // results for different requests + (posI == posJ && subI < subJ) || // results for the same Scan request, but for different ranges + (posI == posJ && subI == subJ && epochI < epochJ) // results for the same Scan request, for the same range } func (b *inOrderResultsBuffer) Swap(i, j int) { @@ -362,6 +388,8 @@ func (b *inOrderResultsBuffer) init(ctx context.Context, numExpectedResponses in return err } b.headOfLinePosition = 0 + b.headOfLineSubRequestIdx = 0 + b.addCounter = 0 if err := b.diskBuffer.Reset(ctx); err != nil { b.setErrorLocked(err) return err @@ -378,12 +406,16 @@ func (b *inOrderResultsBuffer) add(results []Result) { foundHeadOfLine := false for _, r := range results { if debug { - fmt.Printf("adding a result for position %d of size %d\n", r.Position, r.memoryTok.toRelease) + subRequestIdx := "" + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", r.subRequestIdx) + } + fmt.Printf("adding a result for position %d%s of size %d\n", r.Position, subRequestIdx, r.memoryTok.toRelease) } // All the Results have already been registered with the budget, so // we're keeping them in-memory. - heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false}) - if r.Position == b.headOfLinePosition { + heap.Push(b, inOrderBufferedResult{Result: r, onDisk: false, addEpoch: b.addCounter}) + if r.Position == b.headOfLinePosition && r.subRequestIdx == b.headOfLineSubRequestIdx { foundHeadOfLine = true } } @@ -393,12 +425,9 @@ func (b *inOrderResultsBuffer) add(results []Result) { } b.signal() } + b.addCounter++ } -// TODO(yuzefovich): this doesn't work for Scan responses spanning multiple -// Results, fix it once lookup joins are supported. In particular, it can -// reorder responses for a single ScanRequest since all of them have the same -// Position value. func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) { // Whenever a result is picked up from disk, we need to make the memory // reservation for it, so we acquire the budget's mutex. @@ -410,7 +439,7 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) if debug { fmt.Printf("attempting to get results, current headOfLinePosition = %d\n", b.headOfLinePosition) } - for len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition { + for len(b.buffered) > 0 && b.buffered[0].Position == b.headOfLinePosition && b.buffered[0].subRequestIdx == b.headOfLineSubRequestIdx { result, toConsume, err := b.buffered[0].get(ctx, b) if err != nil { b.setErrorLocked(err) @@ -441,10 +470,16 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) } res = append(res, result) heap.Remove(b, 0) + if result.subRequestDone { + // Only advance the sub-request index if we're done with all parts + // of the sub-request. + b.headOfLineSubRequestIdx++ + } if result.GetResp != nil || result.ScanResp.Complete { // If the current Result is complete, then we need to advance the // head-of-the-line position. b.headOfLinePosition++ + b.headOfLineSubRequestIdx = 0 } } // Now all the Results in res are no longer "buffered" and become @@ -452,7 +487,8 @@ func (b *inOrderResultsBuffer) get(ctx context.Context) ([]Result, bool, error) b.numUnreleasedResults += len(res) if debug { if len(res) > 0 { - fmt.Printf("returning %s to the client, headOfLinePosition is now %d\n", resultsToString(res), b.headOfLinePosition) + printSubRequestIdx := !b.singleRowLookup + fmt.Printf("returning %s to the client, headOfLinePosition is now %d\n", resultsToString(res, printSubRequestIdx), b.headOfLinePosition) } } // All requests are complete IFF we have received the complete responses for @@ -472,7 +508,14 @@ func (b *inOrderResultsBuffer) stringLocked() string { if b.buffered[i].onDisk { onDiskInfo = " (on disk)" } - result += fmt.Sprintf("[%d]%s: size %d", b.buffered[i].Position, onDiskInfo, b.buffered[i].memoryTok.toRelease) + var subRequestIdx string + if !b.singleRowLookup { + subRequestIdx = fmt.Sprintf(" (%d)", b.buffered[i].subRequestIdx) + } + result += fmt.Sprintf( + "[%d%s]%s: size %d", b.buffered[i].Position, subRequestIdx, + onDiskInfo, b.buffered[i].memoryTok.toRelease, + ) } return result } @@ -516,8 +559,8 @@ func (b *inOrderResultsBuffer) spill( if r := &b.buffered[idx]; !r.onDisk && r.Position > spillingPriority { if debug { fmt.Printf( - "spilling a result for position %d which will free up %d bytes\n", - r.Position, r.memoryTok.toRelease, + "spilling a result for position %d (%d) which will free up %d bytes\n", + r.Position, r.subRequestIdx, r.memoryTok.toRelease, ) } diskResultID, err := b.diskBuffer.Serialize(ctx, &b.buffered[idx].Result) @@ -553,8 +596,24 @@ func (b *inOrderResultsBuffer) close(ctx context.Context) { // of where it is stored (in-memory or on disk). type inOrderBufferedResult struct { // If onDisk is true, then only Result.ScanResp.Complete, Result.memoryTok, - // and Result.Position are set. + // Result.Position, Result.subRequestIdx, and Result.subRequestDone are set. Result + // addEpoch indicates the value of addCounter variable when this result was + // added to the buffer. This "epoch" allows us to order correctly two + // partial Results that came for the same original Scan request from the + // same range when one of the Results was the "ResumeSpan" Result to + // another. + // + // Consider the following example: we have the original Scan request as + // Scan(a, c) which goes to a single range [a, c) containing keys 'a' and + // 'b'. Imagine that the Scan response can only contain a single key, so we + // first get a partial ScanResponse('a') with ResumeSpan(b, c), and then we + // get a partial ScanResponse('b') with an empty ResumeSpan. The first + // response will be added to the buffer when addCounter is 0, so its "epoch" + // is 0 whereas the second response is added during "epoch" 1 - thus, we + // can correctly return 'a' before 'b' although the priority and + // subRequestIdx of two Results are the same. + addEpoch int // If onDisk is true, then the serialized Result is stored on disk in the // ResultDiskBuffer, identified by diskResultID. onDisk bool @@ -566,7 +625,13 @@ type inOrderBufferedResult struct { func (r *inOrderBufferedResult) spill(diskResultID int) { isScanComplete := r.ScanResp.Complete *r = inOrderBufferedResult{ - Result: Result{memoryTok: r.memoryTok, Position: r.Position}, + Result: Result{ + memoryTok: r.memoryTok, + Position: r.Position, + subRequestIdx: r.subRequestIdx, + subRequestDone: r.subRequestDone, + }, + addEpoch: r.addEpoch, onDisk: true, diskResultID: diskResultID, } diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index 528a29dd97a2..efef68515f77 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -29,7 +29,8 @@ import ( ) // TestInOrderResultsBuffer verifies that the inOrderResultsBuffer returns the -// results in the correct order (with increasing 'Position' values). +// results in the correct order (with increasing 'Position' values, or with +// increasing 'subRequestIdx' values when 'Position' values are equal). // Additionally, it randomly asks the buffer to spill to disk. func TestInOrderResultsBuffer(t *testing.T) { defer leaktest.AfterTest(t)() @@ -59,7 +60,7 @@ func TestInOrderResultsBuffer(t *testing.T) { budget := newBudget(nil /* acc */, math.MaxInt /* limitBytes */) diskBuffer := TestResultDiskBufferConstructor(tempEngine, diskMonitor) - b := newInOrderResultsBuffer(budget, diskBuffer) + b := newInOrderResultsBuffer(budget, diskBuffer, false /* singleRowLookup */) defer b.close(ctx) for run := 0; run < 10; run++ { @@ -68,33 +69,42 @@ func TestInOrderResultsBuffer(t *testing.T) { // Generate a set of results. var results []Result - var addOrder []int for i := 0; i < numExpectedResponses; i++ { // Randomly choose between Get and Scan responses. if rng.Float64() < 0.5 { get := makeResultWithGetResp(rng, rng.Float64() < 0.1 /* empty */) + get.memoryTok.toRelease = rng.Int63n(100) get.Position = i results = append(results, get) } else { - scan := makeResultWithScanResp(rng) - // TODO(yuzefovich): once lookup joins are supported, make Scan - // responses spanning multiple ranges for a single original Scan - // request. - scan.ScanResp.Complete = true - scan.Position = i - results = append(results, scan) + // Randomize the number of ranges the original Scan request + // touches. + numRanges := 1 + if rng.Float64() < 0.5 { + numRanges = rng.Intn(10) + 1 + } + for j := 0; j < numRanges; j++ { + scan := makeResultWithScanResp(rng) + scan.ScanResp.Complete = j+1 == numRanges + scan.memoryTok.toRelease = rng.Int63n(100) + scan.Position = i + scan.subRequestIdx = j + scan.subRequestDone = true + results = append(results, scan) + } } - results[len(results)-1].memoryTok.toRelease = rng.Int63n(100) - addOrder = append(addOrder, i) } // Randomize the order in which the results are added into the buffer. + addOrder := make([]int, len(results)) + for i := range addOrder { + addOrder[i] = i + } rng.Shuffle(len(addOrder), func(i, j int) { addOrder[i], addOrder[j] = addOrder[j], addOrder[i] }) var received []Result - var addOrderIdx int for { r, allComplete, err := b.get(ctx) require.NoError(t, err) @@ -103,11 +113,11 @@ func TestInOrderResultsBuffer(t *testing.T) { break } - numToAdd := rng.Intn(len(results)-addOrderIdx) + 1 + numToAdd := rng.Intn(len(addOrder)) + 1 toAdd := make([]Result, numToAdd) for i := range toAdd { - toAdd[i] = results[addOrder[addOrderIdx]] - addOrderIdx++ + toAdd[i] = results[addOrder[0]] + addOrder = addOrder[1:] } b.add(toAdd) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 38ecd080ca8b..71a05b4e1d14 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -118,6 +118,20 @@ type Result struct { // TODO(yuzefovich): this might need to be []int when non-unique requests // are supported. Position int + // subRequestIdx allows us to order two Results that come for the same + // original Scan request but from different ranges. It is non-zero only in + // InOrder mode when Hints.SingleRowLookup is false, in all other cases it + // will remain zero. See singleRangeBatch.subRequestIdx for more details. + subRequestIdx int + // subRequestDone is true if the current Result is the last one for the + // corresponding sub-request. For all Get requests and for Scan requests + // contained within a single range, it is always true since those can only + // have a single sub-request. + // + // Note that for correctness, it is only necessary that this value is set + // properly if this Result is a Scan response and Hints.SingleRowLookup is + // false. + subRequestDone bool } // Hints provides different hints to the Streamer for optimization purposes. @@ -241,12 +255,18 @@ type Streamer struct { avgResponseEstimator avgResponseEstimator - // numRangesLeftPerScanRequest tracks how many ranges a particular - // originally enqueued ScanRequest touches, but scanning of those ranges - // isn't complete. It is allocated lazily if Hints.SingleRowLookup is - // false when the first ScanRequest is encountered in Enqueue. + // In OutOfOrder mode, numRangesPerScanRequest tracks how many + // ranges a particular originally enqueued ScanRequest touches, but + // scanning of those ranges isn't complete. + // + // In InOrder mode, it tracks how many ranges a particular originally + // enqueued ScanRequest touches. In other words, it contains how many + // "sub-requests" the original Scan request was broken down into. + // + // It is allocated lazily if Hints.SingleRowLookup is false when the + // first ScanRequest is encountered in Enqueue. // TODO(yuzefovich): perform memory accounting for this. - numRangesLeftPerScanRequest []int + numRangesPerScanRequest []int // numRequestsInFlight tracks the number of single-range batches that // are currently being served asynchronously (i.e. those that have @@ -348,7 +368,7 @@ func (s *Streamer) Init( s.results = newOutOfOrderResultsBuffer(s.budget) } else { s.requestsToServe = newInOrderRequestsProvider() - s.results = newInOrderResultsBuffer(s.budget, diskBuffer) + s.results = newInOrderResultsBuffer(s.budget, diskBuffer, hints.SingleRowLookup) } if !hints.UniqueRequests { panic(errors.AssertionFailedf("only unique requests are currently supported")) @@ -466,28 +486,35 @@ func (s *Streamer) Enqueue( if err != nil { return err } + var subRequestIdx []int if !s.hints.SingleRowLookup { - for _, pos := range positions { + for i, pos := range positions { if _, isScan := reqs[pos].GetInner().(*roachpb.ScanRequest); isScan { if firstScanRequest { // We have some ScanRequests, and each might touch // multiple ranges, so we have to set up - // numRangesLeftPerScanRequest. + // numRangesPerScanRequest. streamerLocked = true s.mu.Lock() - if cap(s.mu.numRangesLeftPerScanRequest) < len(reqs) { - s.mu.numRangesLeftPerScanRequest = make([]int, len(reqs)) + if cap(s.mu.numRangesPerScanRequest) < len(reqs) { + s.mu.numRangesPerScanRequest = make([]int, len(reqs)) } else { - // We can reuse numRangesLeftPerScanRequest - // allocated on the previous call to Enqueue after - // we zero it out. - s.mu.numRangesLeftPerScanRequest = s.mu.numRangesLeftPerScanRequest[:len(reqs)] - for n := 0; n < len(s.mu.numRangesLeftPerScanRequest); { - n += copy(s.mu.numRangesLeftPerScanRequest[n:], zeroIntSlice) + // We can reuse numRangesPerScanRequest allocated on + // the previous call to Enqueue after we zero it + // out. + s.mu.numRangesPerScanRequest = s.mu.numRangesPerScanRequest[:len(reqs)] + for n := 0; n < len(s.mu.numRangesPerScanRequest); { + n += copy(s.mu.numRangesPerScanRequest[n:], zeroIntSlice) } } } - s.mu.numRangesLeftPerScanRequest[pos]++ + if s.mode == InOrder { + if subRequestIdx == nil { + subRequestIdx = make([]int, len(singleRangeReqs)) + } + subRequestIdx[i] = s.mu.numRangesPerScanRequest[pos] + } + s.mu.numRangesPerScanRequest[pos]++ firstScanRequest = false } } @@ -500,8 +527,8 @@ func (s *Streamer) Enqueue( r := singleRangeBatch{ reqs: singleRangeReqs, positions: positions, + subRequestIdx: subRequestIdx, reqsReservedBytes: requestsMemUsage(singleRangeReqs), - priority: positions[0], } totalReqsMemUsage += r.reqsReservedBytes @@ -571,7 +598,8 @@ func (s *Streamer) GetResults(ctx context.Context) ([]Result, error) { if len(results) > 0 || allComplete || err != nil { if debug { if len(results) > 0 { - fmt.Printf("returning %s to the client\n", resultsToString(results)) + printSubRequestIdx := s.mode == InOrder && !s.hints.SingleRowLookup + fmt.Printf("returning %s to the client\n", resultsToString(results, printSubRequestIdx)) } else { suffix := "all requests have been responded to" if !allComplete { @@ -685,7 +713,7 @@ func (w *workerCoordinator) mainLoop(ctx context.Context) { // The first request has the highest urgency among all current // requests to serve, so we use its priority to spill everything // with less urgency when necessary to free up the budget. - spillingPriority = w.s.requestsToServe.firstLocked().priority + spillingPriority = w.s.requestsToServe.firstLocked().priority() } w.s.requestsToServe.Unlock() @@ -1229,12 +1257,11 @@ func (w *workerCoordinator) processSingleRangeResults( // We have to allocate the new slice for requests, but we can reuse the // positions slice. resumeReq.reqs = make([]roachpb.RequestUnion, numIncompleteRequests) - // numIncompleteRequests will never exceed the number of requests in req. - resumeReq.positions = req.positions[:numIncompleteRequests] + resumeReq.positions = req.positions[:0] + resumeReq.subRequestIdx = req.subRequestIdx[:0] // We've already reconciled the budget with the actual reservation for the // requests with the ResumeSpans. resumeReq.reqsReservedBytes = resumeReqsMemUsage - resumeReq.priority = math.MaxInt gets := make([]struct { req roachpb.GetRequest union roachpb.RequestUnion_Get @@ -1257,6 +1284,10 @@ func (w *workerCoordinator) processSingleRangeResults( if w.s.enqueueKeys != nil { enqueueKey = w.s.enqueueKeys[position] } + var subRequestIdx int + if req.subRequestIdx != nil { + subRequestIdx = req.subRequestIdx[i] + } reply := resp.GetInner() switch origRequest := req.reqs[i].GetInner().(type) { case *roachpb.GetRequest: @@ -1271,13 +1302,13 @@ func (w *workerCoordinator) processSingleRangeResults( newGet.req.KeyLocking = origRequest.KeyLocking newGet.union.Get = &newGet.req resumeReq.reqs[resumeReqIdx].Value = &newGet.union - resumeReq.positions[resumeReqIdx] = req.positions[i] + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) + } if resumeReq.minTargetBytes == 0 { resumeReq.minTargetBytes = get.ResumeNextBytes } - if position < resumeReq.priority { - resumeReq.priority = position - } resumeReqIdx++ } else { // This Get was completed. @@ -1292,6 +1323,8 @@ func (w *workerCoordinator) processSingleRangeResults( // unique. EnqueueKeysSatisfied: []int{enqueueKey}, Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: true, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = getResponseSize(get) @@ -1325,6 +1358,8 @@ func (w *workerCoordinator) processSingleRangeResults( // are unique. EnqueueKeysSatisfied: []int{enqueueKey}, Position: position, + subRequestIdx: subRequestIdx, + subRequestDone: scan.ResumeSpan == nil, } result.memoryTok.streamer = w.s result.memoryTok.toRelease = scanResponseSize(scan) @@ -1349,13 +1384,13 @@ func (w *workerCoordinator) processSingleRangeResults( newScan.req.KeyLocking = origRequest.KeyLocking newScan.union.Scan = &newScan.req resumeReq.reqs[resumeReqIdx].Value = &newScan.union - resumeReq.positions[resumeReqIdx] = req.positions[i] + resumeReq.positions = append(resumeReq.positions, position) + if req.subRequestIdx != nil { + resumeReq.subRequestIdx = append(resumeReq.subRequestIdx, subRequestIdx) + } if resumeReq.minTargetBytes == 0 { resumeReq.minTargetBytes = scan.ResumeNextBytes } - if position < resumeReq.priority { - resumeReq.priority = position - } resumeReqIdx++ if w.s.hints.SingleRowLookup { @@ -1450,10 +1485,21 @@ func (w *workerCoordinator) finalizeSingleRangeResults( if results[i].ScanResp.ScanResponse != nil { if results[i].ScanResp.ResumeSpan == nil { // The scan within the range is complete. - w.s.mu.numRangesLeftPerScanRequest[results[i].Position]-- - if w.s.mu.numRangesLeftPerScanRequest[results[i].Position] == 0 { - // The scan across all ranges is now complete too. - results[i].ScanResp.Complete = true + if w.s.mode == OutOfOrder { + w.s.mu.numRangesPerScanRequest[results[i].Position]-- + if w.s.mu.numRangesPerScanRequest[results[i].Position] == 0 { + // The scan across all ranges is now complete too. + results[i].ScanResp.Complete = true + } + } else { + // In InOrder mode, the scan is marked as complete when + // the last sub-request is satisfied. Note that it is ok + // if the previous sub-requests haven't been satisfied + // yet - the inOrderResultsBuffer will not emit this + // Result until the previous sub-requests are responded + // to. + numSubRequests := w.s.mu.numRangesPerScanRequest[results[i].Position] + results[i].ScanResp.Complete = results[i].subRequestIdx+1 == numSubRequests } } else { // Unset the ResumeSpan on the result in order to not @@ -1472,7 +1518,8 @@ func (w *workerCoordinator) finalizeSingleRangeResults( // partial one. Think more about this. w.s.mu.avgResponseEstimator.update(actualMemoryReservation, int64(len(results))) if debug { - fmt.Printf("created %s with total size %d\n", resultsToString(results), actualMemoryReservation) + printSubRequestIdx := w.s.mode == InOrder && !w.s.hints.SingleRowLookup + fmt.Printf("created %s with total size %d\n", resultsToString(results, printSubRequestIdx), actualMemoryReservation) } w.s.results.add(results) } diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index f550418ccc9c..c8dfab68773d 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/dustin/go-humanize" "github.com/stretchr/testify/require" @@ -462,5 +463,67 @@ func TestStreamerEmptyScans(t *testing.T) { }) } -// TODO(yuzefovich): once lookup joins are supported, add a test for InOrder -// mode where Scan requests span multiple ranges. +// TestStreamerMultiRangeScan verifies that the Streamer correctly handles scan +// requests that span multiple ranges. +func TestStreamerMultiRangeScan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + rng, _ := randutil.NewTestRand() + numRows := rng.Intn(100) + 2 + + // We set up two tables such that we'll use the value from the smaller one + // to lookup into a secondary index in the larger table. + _, err := db.Exec("CREATE TABLE small (n PRIMARY KEY) AS SELECT 1") + require.NoError(t, err) + _, err = db.Exec("CREATE TABLE large (k INT PRIMARY KEY, n INT, s STRING, INDEX (n, k) STORING (s))") + require.NoError(t, err) + _, err = db.Exec(fmt.Sprintf("INSERT INTO large SELECT i, 1, repeat('a', i) FROM generate_series(1, %d) AS i", numRows)) + require.NoError(t, err) + + // Split the range for the secondary index into multiple. + numRanges := 2 + if numRows > 2 { + numRanges = rng.Intn(numRows-2) + 2 + } + kValues := make([]int, numRows) + for i := range kValues { + kValues[i] = i + 1 + } + rng.Shuffle(numRows, func(i, j int) { + kValues[i], kValues[j] = kValues[j], kValues[i] + }) + splitKValues := kValues[:numRanges] + for _, kValue := range splitKValues { + _, err = db.Exec(fmt.Sprintf("ALTER INDEX large_n_k_idx SPLIT AT VALUES (1, %d)", kValue)) + require.NoError(t, err) + } + + // Populate the range cache. + _, err = db.Exec("SELECT * FROM large@large_n_k_idx") + require.NoError(t, err) + + // The crux of the test - run a query that performs a lookup join when + // ordering needs to be maintained and then confirm that the results of the + // parallel lookups are served in the right order. + r := db.QueryRow("SELECT array_agg(s) FROM small INNER LOOKUP JOIN large ON small.n = large.n GROUP BY small.n ORDER BY small.n") + var result string + err = r.Scan(&result) + require.NoError(t, err) + // The expected result is of the form: {a,aa,aaa,...}. + expected := "{" + for i := 1; i <= numRows; i++ { + if i > 1 { + expected += "," + } + for j := 0; j < i; j++ { + expected += "a" + } + } + expected += "}" + require.Equal(t, expected, result) +} diff --git a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go index dbcdd976dd62..118ccc8ce5bc 100644 --- a/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go +++ b/pkg/sql/rowcontainer/kvstreamer_result_disk_buffer.go @@ -147,7 +147,8 @@ func (b *kvStreamerResultDiskBuffer) Close(ctx context.Context) { // kvstreamer.Result that is spilled to disk. // // It contains all the information except for 'ScanResp.Complete', 'memoryTok', -// and 'Position' fields which are kept in-memory (because they are allocated in +// 'Position', 'subRequestIdx', and 'subRequestDone' fields which are kept +// in-memory (because they are allocated in // kvstreamer.inOrderBufferedResult.Result anyway). var inOrderResultsBufferSpillTypeSchema = []*types.T{ types.Bool, // isGet