Skip to content

Commit

Permalink
kvstreamer: fully support InOrder mode
Browse files Browse the repository at this point in the history
This commit extends the Streamer library to support Scan requests that
can span multiple ranges in the InOrder mode which allows us to use the
Streamer for the lookup joins when ordering needs to be maintained. As
with the lookup joins w/o ordering, this is a "poor man's" support which
relies on the de-duplication of requests in the join reader's span
generators as well as doesn't remove the disk-backed row container that
buffers all looked up rows in the join reader ordering strategy. Those
caveats will be addressed in the follow-up commits.

The contribution of this commit is such that the in-order results buffer
can now correctly return the results when a single original Scan request
touches multiple ranges as well as when each sub-request (against
a single range) can get an arbitrary number of partial responses.

Previously, we only had one axis for ordering the results - `Position`
values which identify the original request that a particular Result is
a response to. This was sufficient for index joins (as well as lookup
joins when `SingleRowLookup` hint is `true`); however, when a Scan
request can touch multiple ranges, that single axis is no longer
sufficient since the results buffer could order two Results for a single
Scan request arbitrarily. We go around this limitation by introducing
a second dimension for ordering - "sub-request index" which is the
ordinal of a particular single-range request within the multi-range Scan
request.

Consider the following example: original Scan request is `Scan(b, f)`,
and we have three ranges: `[a, c)`, `[c, e)`, `[e, g)`. In
`Streamer.Enqueue`, the original Scan is broken down into three
single-range Scan requests:
```
  singleRangeReq[0]:
    reqs          = [Scan(b, c)]
    positions     = [0]
    subRequestIdx = [0]
  singleRangeReq[1]:
    reqs          = [Scan(c, e)]
    positions     = [0]
    subRequestIdx = [1]
  singleRangeReq[2]:
    reqs          = [Scan(e, f)]
    positions     = [0]
    subRequestIdx = [2]
```
Note that `positions` values are the same (indicating that each
single-range request is a part of the same original multi-range request),
but values of `subRequestIdx` are different - they will allow us to order
the responses to these single-range requests (which might come back in
any order) correctly when returning the results. This information is
plumbed into the requests as well as the results.

There is yet another complication though - what if a single-range Scan
request results in multiple partial responses? To make sure that these
partial results are ordered correctly, we need yet another dimension,
but at least that dimension can be fully hidden inside of the in-order
results buffer. This is possible due to the fact that partial response
for the same single-range Scan request will be added into the buffer at
different times, so we'll assign the results "add epochs".

Consider the following example: we have the original Scan request as
`Scan(a, c)` which goes to a single range `[a, c)` containing keys `a`
and `b`. Imagine that the Scan response can only contain a single key,
so we first get a partial `ScanResponse('a')` with `ResumeSpan(b, c)`,
and then we get a partial `ScanResponse('b')` with an empty `ResumeSpan`.
The first response will be added to the buffer when during the first
`add` call, so its "epoch" is 0 whereas the second response is added
during "epoch" 1 - thus, we can correctly return `a` before `b`
although the `Position` and sub-request values of two `Result`s are the
same.

Release note: None
  • Loading branch information
yuzefovich committed May 23, 2022
1 parent 38086c8 commit 5349fe4
Show file tree
Hide file tree
Showing 8 changed files with 347 additions and 88 deletions.
11 changes: 10 additions & 1 deletion pkg/kv/kvclient/kvstreamer/large_keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func TestLargeKeys(t *testing.T) {
name: "lookup join, no ordering",
query: "SELECT * FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob",
},
{
name: "lookup join, with ordering",
query: "SELECT max(length(blob)) FROM bar INNER LOOKUP JOIN foo ON lookup_blob = pk_blob GROUP BY pk_blob",
},
}

rng, _ := randutil.NewTestRand()
Expand Down Expand Up @@ -132,7 +136,7 @@ func TestLargeKeys(t *testing.T) {
INDEX (attribute)%s
);`, familiesSuffix))
require.NoError(t, err)
_, err = db.Exec("CREATE TABLE bar (lookup_blob STRING)")
_, err = db.Exec("CREATE TABLE bar (lookup_blob STRING PRIMARY KEY)")
require.NoError(t, err)

// Insert some number of rows.
Expand Down Expand Up @@ -193,6 +197,11 @@ func TestLargeKeys(t *testing.T) {
func(t *testing.T) {
_, err = db.Exec(tc.query)
if err != nil {
// Make sure to discard the trace of the
// query that resulted in an error. If
// we don't do this, then the next test
// case will hang.
<-recCh
t.Fatal(err)
}
// Now examine the trace and count the async
Expand Down
82 changes: 73 additions & 9 deletions pkg/kv/kvclient/kvstreamer/requests_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,61 @@ import (
"sync"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// singleRangeBatch contains parts of the originally enqueued requests that have
// been truncated to be within a single range. All requests within the
// singleRangeBatch will be issued as a single BatchRequest.
// TODO(yuzefovich): perform memory accounting for slices other than reqs in
// singleRangeBatch.
type singleRangeBatch struct {
reqs []roachpb.RequestUnion
// positions is a 1-to-1 mapping with reqs to indicate which ordinal among
// the originally enqueued requests a particular reqs[i] corresponds to. In
// other words, if reqs[i] is (or a part of) enqueuedReqs[j], then
// positions[i] = j.
//
// In the InOrder mode, positions[0] is treated as the priority of this
// singleRangeBatch where the smaller the value is, the sooner the Result
// will be needed, so batches with the smallest priority value have the
// highest "urgency". We look specifically at the 0th position because, by
// construction, values in positions slice are increasing.
// TODO(yuzefovich): this might need to be [][]int when non-unique requests
// are supported.
positions []int
// subRequestIdx, if non-nil, is a 1-to-1 mapping with positions which
// indicates the ordinal of the corresponding reqs[i] among all sub-requests
// that comprise a single originally enqueued Scan request. This ordinal
// allows us to maintain the order of these sub-requests, each going to a
// different range. If reqs[i] is a Get request, then subRequestIdx[i] is 0.
//
// Consider the following example: original Scan request is Scan(b, f), and
// we have three ranges: [a, c), [c, e), [e, g). In Streamer.Enqueue, the
// original Scan is broken down into three single-range Scan requests:
// singleRangeReq[0]:
// reqs = [Scan(b, c)]
// positions = [0]
// subRequestIdx = [0]
// singleRangeReq[1]:
// reqs = [Scan(c, e)]
// positions = [0]
// subRequestIdx = [1]
// singleRangeReq[2]:
// reqs = [Scan(e, f)]
// positions = [0]
// subRequestIdx = [2]
// Note that positions values are the same (indicating that each
// single-range request is a part of the same original multi-range request),
// but values of subRequestIdx are different - they will allow us to order
// the responses to these single-range requests (which might come back in
// any order) correctly.
//
// subRequestIdx is only allocated in InOrder mode when
// Hints.SingleRowLookup is false and some Scan requests were enqueued.
subRequestIdx []int
// reqsReservedBytes tracks the memory reservation against the budget for
// the memory usage of reqs.
reqsReservedBytes int64
Expand All @@ -41,14 +80,6 @@ type singleRangeBatch struct {
// not be empty. Note that TargetBytes of at least minTargetBytes is
// necessary but might not be sufficient for the response to be non-empty.
minTargetBytes int64
// priority is the smallest number in positions. It is the priority of this
// singleRangeBatch where the smaller the value is, the sooner the Result
// will be needed, so batches with the smallest priority value has the
// highest "urgency".
// TODO(yuzefovich): once lookup joins are supported, we'll need a way to
// order singleRangeBatches that contain parts of a single ScanRequest
// spanning multiple ranges.
priority int
}

var _ sort.Interface = &singleRangeBatch{}
Expand All @@ -60,6 +91,9 @@ func (r *singleRangeBatch) Len() int {
func (r *singleRangeBatch) Swap(i, j int) {
r.reqs[i], r.reqs[j] = r.reqs[j], r.reqs[i]
r.positions[i], r.positions[j] = r.positions[j], r.positions[i]
if r.subRequestIdx != nil {
r.subRequestIdx[i], r.subRequestIdx[j] = r.subRequestIdx[j], r.subRequestIdx[i]
}
}

// Less returns true if r.reqs[i]'s key comes before r.reqs[j]'s key.
Expand All @@ -69,6 +103,24 @@ func (r *singleRangeBatch) Less(i, j int) bool {
return r.reqs[i].GetInner().Header().Key.Compare(r.reqs[j].GetInner().Header().Key) < 0
}

// priority returns the priority value of this batch.
//
// It is invalid to call this method on a batch with no requests.
func (r singleRangeBatch) priority() int {
return r.positions[0]
}

// subPriority returns the "sub-priority" value of this batch that should be
// compared when two batches have the same priority value.
//
// It is invalid to call this method on a batch with no requests.
func (r singleRangeBatch) subPriority() int {
if r.subRequestIdx == nil {
return 0
}
return r.subRequestIdx[0]
}

func reqsToString(reqs []singleRangeBatch) string {
result := "requests for positions "
for i, r := range reqs {
Expand Down Expand Up @@ -245,7 +297,19 @@ func (p *inOrderRequestsProvider) Len() int {
}

func (p *inOrderRequestsProvider) Less(i, j int) bool {
return p.requests[i].priority < p.requests[j].priority
rI, rJ := p.requests[i], p.requests[j]
if buildutil.CrdbTestBuild {
if rI.priority() == rJ.priority() {
subI, subJ := rI.subRequestIdx, rJ.subRequestIdx
if (subI != nil && subJ == nil) || (subI == nil && subJ != nil) {
panic(errors.AssertionFailedf(
"unexpectedly only one subRequestIdx is non-nil when priorities are the same",
))
}
}
}
return rI.priority() < rJ.priority() ||
(rI.priority() == rJ.priority() && rI.subPriority() < rJ.subPriority())
}

func (p *inOrderRequestsProvider) Swap(i, j int) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvclient/kvstreamer/requests_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func TestInOrderRequestsProvider(t *testing.T) {
requests := make([]singleRangeBatch, rng.Intn(maxNumRequests)+1)
priorities := make([]int, len(requests))
for i := range requests {
requests[i].priority = rng.Intn(maxNumRequests)
priorities[i] = requests[i].priority
requests[i].positions = []int{rng.Intn(maxNumRequests)}
priorities[i] = requests[i].priority()
}
sort.Ints(priorities)

Expand All @@ -47,17 +47,17 @@ func TestInOrderRequestsProvider(t *testing.T) {
first := p.firstLocked()
p.removeFirstLocked()
p.Unlock()
require.Equal(t, priorities[0], first.priority)
require.Equal(t, priorities[0], first.priority())
priorities = priorities[1:]
// With 50% probability simulate that a resume request with random
// priority is added.
if rng.Float64() < 0.5 {
// Note that in reality the priority of the resume request cannot
// Note that in reality the position of the resume request cannot
// have lower value than of the original request, but it's ok for
// the test.
first.priority = rng.Intn(maxNumRequests)
first.positions[0] = rng.Intn(maxNumRequests)
p.add(first)
priorities = append(priorities, first.priority)
priorities = append(priorities, first.priority())
sort.Ints(priorities)
}
}
Expand Down
Loading

0 comments on commit 5349fe4

Please sign in to comment.