Skip to content

Commit

Permalink
distsender: fix ResumeSpan for Gets
Browse files Browse the repository at this point in the history
The distsender is not fully implementing the ResumeSpan contract for
Get; the promise is that a nil span is set when the operation has
successfully completed.

A Get that reaches a kvserver but that is not executed has the
ResumeSpan set on the server side. But if the requests span multiple
ranges, a subset of the requests will not make it to any kvserver. Any
Gets that weren't executed and don't have a ResumeSpan will not be
executed again (it looks to the upper layer as if the Get was executed
and found no key). The end result is that scans can miss rows.

This change fills in the missing case and improves the
TestMultiRangeScanWithPagination test to allow running mixed
operations.

Fixes #74736.

Release note (bug fix): In particular cases, some queries that involve
a scan which returns many results and which includes lookups for
individual keys were not returning all results from the table.
  • Loading branch information
RaduBerinde committed Jan 25, 2022
1 parent 72c5471 commit a913c71
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 64 deletions.
120 changes: 82 additions & 38 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1736,56 +1736,100 @@ func fillSkippedResponses(
scratchBA.Requests[0].MustSetInner(req)
br.Responses[i] = scratchBA.CreateReply().Responses[0]
}
// Set the ResumeSpan for future batch requests.

// Set or correct the ResumeSpan as necessary.
isReverse := ba.IsReverse()
for i, resp := range br.Responses {
req := ba.Requests[i].GetInner()
if !roachpb.IsRange(req) {
continue
}
hdr := resp.GetInner().Header()
origSpan := req.Header().Span()
maybeSetResumeSpan(req, &hdr, nextKey, isReverse)
if hdr.ResumeSpan != nil {
hdr.ResumeReason = resumeReason
}
br.Responses[i].GetInner().SetHeader(hdr)
}
}

// maybeSetResumeSpan sets or corrects the ResumeSpan in the response header, if
// necessary.
//
// nextKey is the first key that was not processed.
func maybeSetResumeSpan(
req roachpb.Request, hdr *roachpb.ResponseHeader, nextKey roachpb.RKey, isReverse bool,
) {
if _, ok := req.(*roachpb.GetRequest); ok {
// This is a Get request. There are three possibilities:
//
// 1. The request was completed. In this case we don't want a ResumeSpan.
//
// 2. The request was not completed but it was part of a request that made
// it to a kvserver (i.e. it was part of the last range we operated on).
// In this case the ResumeSpan should be set by the kvserver and we can
// leave it alone.
//
// 3. The request was not completed and was not sent to a kvserver (it was
// beyond the last range we operated on). In this case we need to set
// the ResumeSpan here.
if hdr.ResumeSpan != nil {
// Case 2.
return
}
key := req.Header().Span().Key
if isReverse {
if hdr.ResumeSpan != nil {
// The ResumeSpan.Key might be set to the StartKey of a range;
// correctly set it to the Key of the original request span.
hdr.ResumeSpan.Key = origSpan.Key
} else if roachpb.RKey(origSpan.Key).Less(nextKey) {
if !nextKey.Less(roachpb.RKey(key)) {
// key <= nextKey, so this request was not completed (case 3).
hdr.ResumeSpan = &roachpb.Span{Key: key}
}
} else {
if !roachpb.RKey(key).Less(nextKey) {
// key >= nextKey, so this request was not completed (case 3).
hdr.ResumeSpan = &roachpb.Span{Key: key}
}
}
return
}

if !roachpb.IsRange(req) {
return
}

origSpan := req.Header().Span()
if isReverse {
if hdr.ResumeSpan != nil {
// The ResumeSpan.Key might be set to the StartKey of a range;
// correctly set it to the Key of the original request span.
hdr.ResumeSpan.Key = origSpan.Key
} else if roachpb.RKey(origSpan.Key).Less(nextKey) {
// Some keys have yet to be processed.
hdr.ResumeSpan = new(roachpb.Span)
*hdr.ResumeSpan = origSpan
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
// The original span has been partially processed.
hdr.ResumeSpan.EndKey = nextKey.AsRawKey()
}
}
} else {
if hdr.ResumeSpan != nil {
// The ResumeSpan.EndKey might be set to the EndKey of a range because
// that's what a store will set it to when the limit is reached; it
// doesn't know any better). In that case, we correct it to the EndKey
// of the original request span. Note that this doesn't touch
// ResumeSpan.Key, which is really the important part of the ResumeSpan.
hdr.ResumeSpan.EndKey = origSpan.EndKey
} else {
// The request might have been fully satisfied, in which case it doesn't
// need a ResumeSpan, or it might not have. Figure out if we're in the
// latter case.
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
// Some keys have yet to be processed.
hdr.ResumeSpan = new(roachpb.Span)
*hdr.ResumeSpan = origSpan
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
if roachpb.RKey(origSpan.Key).Less(nextKey) {
// The original span has been partially processed.
hdr.ResumeSpan.EndKey = nextKey.AsRawKey()
}
}
} else {
if hdr.ResumeSpan != nil {
// The ResumeSpan.EndKey might be set to the EndKey of a range because
// that's what a store will set it to when the limit is reached; it
// doesn't know any better). In that case, we correct it to the EndKey
// of the original request span. Note that this doesn't touch
// ResumeSpan.Key, which is really the important part of the ResumeSpan.
hdr.ResumeSpan.EndKey = origSpan.EndKey
} else {
// The request might have been fully satisfied, in which case it doesn't
// need a ResumeSpan, or it might not have. Figure out if we're in the
// latter case.
if nextKey.Less(roachpb.RKey(origSpan.EndKey)) {
// Some keys have yet to be processed.
hdr.ResumeSpan = new(roachpb.Span)
*hdr.ResumeSpan = origSpan
if roachpb.RKey(origSpan.Key).Less(nextKey) {
// The original span has been partially processed.
hdr.ResumeSpan.Key = nextKey.AsRawKey()
}
hdr.ResumeSpan.Key = nextKey.AsRawKey()
}
}
}
if hdr.ResumeSpan != nil {
hdr.ResumeReason = resumeReason
}
br.Responses[i].GetInner().SetHeader(hdr)
}
}

Expand Down
130 changes: 104 additions & 26 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,16 +1244,69 @@ func TestMultiRangeScanDeleteRange(t *testing.T) {
func TestMultiRangeScanWithPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

keys := func(strs ...string) []roachpb.Key {
r := make([]roachpb.Key, len(strs))
for i, s := range strs {
r[i] = roachpb.Key(s)
}
return r
}

scan := func(startKey, endKey string) roachpb.Span {
return roachpb.Span{
Key: roachpb.Key(startKey),
EndKey: roachpb.Key(endKey).Next(),
}
}

get := func(key string) roachpb.Span {
return roachpb.Span{
Key: roachpb.Key(key),
}
}

// Each testcase defines range split keys, a set of keys, and a set of
// operations (scans / gets) that together should return all the keys in
// order.
testCases := []struct {
splitKeys []roachpb.Key
keys []roachpb.Key
// An operation is either a Scan or a Get, depending if the span EndKey is
// set.
operations []roachpb.Span
}{
{[]roachpb.Key{}, []roachpb.Key{roachpb.Key("a"), roachpb.Key("j"), roachpb.Key("z")}},
{[]roachpb.Key{roachpb.Key("m")},
[]roachpb.Key{roachpb.Key("a"), roachpb.Key("z")}},
{[]roachpb.Key{roachpb.Key("h"), roachpb.Key("q")},
[]roachpb.Key{roachpb.Key("b"), roachpb.Key("f"), roachpb.Key("k"),
roachpb.Key("r"), roachpb.Key("w"), roachpb.Key("y")}},
{
splitKeys: keys(),
keys: keys("a", "j", "z"),
operations: []roachpb.Span{scan("a", "z")},
},

{
splitKeys: keys("m"),
keys: keys("a", "z"),
operations: []roachpb.Span{scan("a", "z")},
},

{
splitKeys: keys("h", "q"),
keys: keys("b", "f", "k", "r", "w", "y"),
operations: []roachpb.Span{scan("b", "y")},
},

{
splitKeys: keys("h", "q"),
keys: keys("b", "f", "k", "r", "w", "y"),
operations: []roachpb.Span{scan("b", "k"), scan("p", "z")},
},

{
// This test mixes Scans and Gets, used to make sure that the ResumeSpan
// is set correctly for Gets that were not performed (see #74736).
splitKeys: keys("c", "f"),
keys: keys("a", "b", "c", "d", "e", "f", "g", "h", "i"),
operations: []roachpb.Span{scan("a", "d"), get("d1"), get("e"), scan("f", "h"), get("i")},
},
}

for _, tc := range testCases {
Expand Down Expand Up @@ -1303,39 +1356,64 @@ func TestMultiRangeScanWithPagination(t *testing.T) {
for targetBytes := int64(1); ; targetBytes++ {
var numPages int
t.Run(fmt.Sprintf("targetBytes=%d,maxSpanRequestKeys=%d", targetBytes, msrq), func(t *testing.T) {
req := func(span roachpb.Span) roachpb.Request {
if reverse {
return roachpb.NewReverseScan(span.Key, span.EndKey, false)

// Paginate.
operations := tc.operations
if reverse {
operations = make([]roachpb.Span, len(operations))
for i := range operations {
operations[i] = tc.operations[len(operations)-i-1]
}
return roachpb.NewScan(span.Key, span.EndKey, false)
}
// Paginate.
resumeSpan := &roachpb.Span{Key: tc.keys[0], EndKey: tc.keys[len(tc.keys)-1].Next()}
var keys []roachpb.Key
for {
numPages++
scan := req(*resumeSpan)

// Build the batch.
var ba roachpb.BatchRequest
ba.Add(scan)
for _, span := range operations {
var req roachpb.Request
switch {
case span.EndKey == nil:
req = roachpb.NewGet(span.Key, false /* forUpdate */)
case reverse:
req = roachpb.NewReverseScan(span.Key, span.EndKey, false /* forUpdate */)
default:
req = roachpb.NewScan(span.Key, span.EndKey, false /* forUpdate */)
}
ba.Add(req)
}

ba.Header.TargetBytes = targetBytes
ba.Header.MaxSpanRequestKeys = msrq
ba.Header.ReturnOnRangeBoundary = returnOnRangeBoundary
br, pErr := tds.Send(ctx, ba)
require.Nil(t, pErr)
var rows []roachpb.KeyValue
if reverse {
rows = br.Responses[0].GetReverseScan().Rows
} else {
rows = br.Responses[0].GetScan().Rows
}
for _, kv := range rows {
keys = append(keys, kv.Key)
for i := range operations {
resp := br.Responses[i]
if getResp := resp.GetGet(); getResp != nil {
if getResp.Value != nil {
keys = append(keys, operations[i].Key)
}
continue
}
var rows []roachpb.KeyValue
if reverse {
rows = resp.GetReverseScan().Rows
} else {
rows = resp.GetScan().Rows
}
for _, kv := range rows {
keys = append(keys, kv.Key)
}
}
resumeSpan = br.Responses[0].GetInner().Header().ResumeSpan
if log.V(1) {
t.Logf("page #%d: scan %v -> keys (after) %v resume %v", scan.Header().Span(), numPages, keys, resumeSpan)
operations = nil
for _, resp := range br.Responses {
if resumeSpan := resp.GetInner().Header().ResumeSpan; resumeSpan != nil {
operations = append(operations, *resumeSpan)
}
}
if resumeSpan == nil {
if len(operations) == 0 {
// Done with this pagination.
break
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ func (f *txnKVFetcher) nextBatch(
"we're only scanning non-overlapping spans. New spans: %s",
catalogkeys.PrettySpans(nil, f.spans, 0 /* skip */))
}

// Any requests that were not fully completed will have the ResumeSpan set.
// Here we accumulate all of them.
if resumeSpan := header.ResumeSpan; resumeSpan != nil {
f.spansScratch[f.newFetchSpansIdx] = *resumeSpan
f.newFetchSpansIdx++
Expand Down

0 comments on commit a913c71

Please sign in to comment.