Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70571: kvserver: add `RESUME_BYTE_LIMIT` and `RESUME_INTENT_LIMIT` r=nvanbenschoten,yuzefovich a=erikgrinaker

This is essentially the two first commits from cockroachdb#68370, with some minor non-code tweaks. Splitting it off to work on cockroachdb#70564 in concert with cockroachdb#68370.

### roachpb: move `ResumeReason` enum out of `ResponseHeader`

The `ResumeReason` enum constants won't necessarily be used only in the
context of a response header, e.g. we want to return it from `MVCCScan`
as well. This patch therefore moves the enum to the `roachpb` root.

Release note: None

### kvserver: add `RESUME_BYTE_LIMIT` and `RESUME_INTENT_LIMIT`

The `roachpb.ResumeReason` enum only contained the single reason
`RESUME_KEY_LIMIT` (in addition to the `RESUME_UNKNOWN` zero value).
This was outdated since iteration can be halted by byte limits and
intent limits as well.

This patch adds the new resume reasons `RESUME_BYTE_LIMIT` and
`RESUME_INTENT_LIMIT`, adds `MVCCScanResult.ResumeReason` to plumb it
through to the scan iteration, and updates relevant code.

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Oct 1, 2021
2 parents 5f53feb + 239a222 commit 177ed2b
Show file tree
Hide file tree
Showing 18 changed files with 817 additions and 791 deletions.
12 changes: 4 additions & 8 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,10 @@ func (b *Batch) fillResults(ctx context.Context) {
}
}
// Fill up the resume span.
if result.Err == nil && reply != nil && reply.Header().ResumeSpan != nil {
result.ResumeSpan = reply.Header().ResumeSpan
result.ResumeReason = reply.Header().ResumeReason
// The ResumeReason might be missing when talking to a 1.1 node; assume
// it's the key limit (which was the only reason why 1.1 would return a
// resume span). This can be removed in 2.1.
if result.ResumeReason == roachpb.RESUME_UNKNOWN {
result.ResumeReason = roachpb.RESUME_KEY_LIMIT
if result.Err == nil && reply != nil {
if h := reply.Header(); h.ResumeSpan != nil {
result.ResumeSpan = h.ResumeSpan
result.ResumeReason = h.ResumeReason
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type Result struct {
ResumeSpan *roachpb.Span
// When ResumeSpan is populated, this specifies the reason why the operation
// wasn't completed and needs to be resumed.
ResumeReason roachpb.ResponseHeader_ResumeReason
ResumeReason roachpb.ResumeReason
}

// ResumeSpanAsValue returns the resume span as a value if one is set,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// If couldHaveSkippedResponses is set, resumeReason indicates the reason why
// the ResumeSpan is necessary. This reason is common to all individual
// responses that carry a ResumeSpan.
var resumeReason roachpb.ResponseHeader_ResumeReason
var resumeReason roachpb.ResumeReason
defer func() {
if r := recover(); r != nil {
// If we're in the middle of a panic, don't wait on responseChs.
Expand Down Expand Up @@ -1370,7 +1370,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.TargetBytes -= replyBytes
if ba.TargetBytes <= 0 {
couldHaveSkippedResponses = true
resumeReason = roachpb.RESUME_KEY_LIMIT
resumeReason = roachpb.RESUME_BYTE_LIMIT
return
}
}
Expand Down Expand Up @@ -1694,7 +1694,7 @@ func fillSkippedResponses(
ba roachpb.BatchRequest,
br *roachpb.BatchResponse,
nextKey roachpb.RKey,
resumeReason roachpb.ResponseHeader_ResumeReason,
resumeReason roachpb.ResumeReason,
) {
// Some requests might have no response at all if we used a batch-wide
// limit; simply create trivial responses for those. Note that any type
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func evalExport(
targetSize = curSizeOfExportedSSTs
}
reply.NumBytes = targetSize
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
// NB: This condition means that we will allow another SST to be created
// even if we have less room in our TargetBytes than the target size of
// the next SST. In the worst case this could lead to us exceeding our
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,12 @@ INTO
expect(t, res7, 2, 100, 2, 100)
latestRespHeader := roachpb.ResponseHeader{
ResumeSpan: nil,
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
allRespHeader := roachpb.ResponseHeader{
ResumeSpan: nil,
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
expectResponseHeader(t, res7, latestRespHeader, allRespHeader)
Expand All @@ -341,15 +341,15 @@ INTO
Key: []byte("/Table/53/1/2"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
allRespHeader = roachpb.ResponseHeader{
ResumeSpan: &roachpb.Span{
Key: []byte("/Table/53/1/2"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
expectResponseHeader(t, res7, latestRespHeader, allRespHeader)
Expand All @@ -365,15 +365,15 @@ INTO
Key: []byte("/Table/53/1/3/0"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
allRespHeader = roachpb.ResponseHeader{
ResumeSpan: &roachpb.Span{
Key: []byte("/Table/53/1/3/0"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
expectResponseHeader(t, res7, latestRespHeader, allRespHeader)
Expand All @@ -388,15 +388,15 @@ INTO
Key: []byte("/Table/53/1/100/0"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
allRespHeader = roachpb.ResponseHeader{
ResumeSpan: &roachpb.Span{
Key: []byte("/Table/53/1/100/0"),
EndKey: []byte("/Max"),
},
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: maxResponseSSTBytes,
}
expectResponseHeader(t, res7, latestRespHeader, allRespHeader)
Expand All @@ -410,12 +410,12 @@ INTO
expect(t, res7, 100, 100, 100, 100)
latestRespHeader = roachpb.ResponseHeader{
ResumeSpan: nil,
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: 100 * kvByteSize,
}
allRespHeader = roachpb.ResponseHeader{
ResumeSpan: nil,
ResumeReason: 2,
ResumeReason: roachpb.RESUME_BYTE_LIMIT,
NumBytes: 100 * kvByteSize,
}
expectResponseHeader(t, res7, latestRespHeader, allRespHeader)
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func Get(
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
if h.MaxSpanRequestKeys < 0 {
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if h.TargetBytes < 0 {
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result.Result{}, nil
}
var val *roachpb.Value
Expand Down
108 changes: 53 additions & 55 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,28 @@ package batcheval

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestGetResumeSpan tests that a GetRequest with a target bytes or max span
// request keys is properly handled by returning no result with a resume span.
// request keys is properly handled.
func TestGetResumeSpan(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
resp := &roachpb.GetResponse{}
resp := roachpb.GetResponse{}
key := roachpb.Key([]byte{'a'})
value := roachpb.MakeValueFromString("woohoo")

db := storage.NewDefaultInMemForTesting()
defer db.Close()

// This has a size of 11 bytes.
_, err := Put(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Expand All @@ -41,60 +43,56 @@ func TestGetResumeSpan(t *testing.T) {
},
Value: value,
},
}, resp)
assert.NoError(t, err)
}, &resp)
require.NoError(t, err)

// Case 1: Check that a negative TargetBytes causes a resume span.
_, err = Get(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{TargetBytes: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)

assert.NotNil(t, resp.ResumeSpan)
assert.Equal(t, key, resp.ResumeSpan.Key)
assert.Nil(t, resp.ResumeSpan.EndKey)
assert.Nil(t, resp.Value)

resp = &roachpb.GetResponse{}
// Case 2: Check that a negative MaxSpanRequestKeys causes a resume span.
_, err = Get(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: -1},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)
testCases := []struct {
maxKeys int64
targetBytes int64
expectResume bool
expectReason roachpb.ResumeReason
}{
{maxKeys: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT},
{maxKeys: 0, expectResume: false},
{maxKeys: 1, expectResume: false},

assert.NotNil(t, resp.ResumeSpan)
assert.Equal(t, key, resp.ResumeSpan.Key)
assert.Nil(t, resp.ResumeSpan.EndKey)
assert.Nil(t, resp.Value)
{targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_BYTE_LIMIT},
{targetBytes: 0, expectResume: false},
{targetBytes: 1, expectResume: false},
{targetBytes: 11, expectResume: false},
{targetBytes: 12, expectResume: false},

resp = &roachpb.GetResponse{}
// Case 3: Check that a positive limit causes a normal return.
_, err = Get(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{MaxSpanRequestKeys: 10, TargetBytes: 100},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
},
}, resp)
assert.NoError(t, err)
{maxKeys: -1, targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT},
{maxKeys: 10, targetBytes: 100, expectResume: false},
}
for _, tc := range testCases {
name := fmt.Sprintf("maxKeys=%d targetBytes=%d", tc.maxKeys, tc.targetBytes)
t.Run(name, func(t *testing.T) {
resp := roachpb.GetResponse{}
_, err := Get(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
Header: roachpb.Header{
MaxSpanRequestKeys: tc.maxKeys,
TargetBytes: tc.targetBytes,
},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
},
}, &resp)
require.NoError(t, err)

assert.Nil(t, resp.ResumeSpan)
assert.NotNil(t, resp.Value)
assert.Equal(t, resp.Value.RawBytes, value.RawBytes)
assert.Equal(t, 1, int(resp.NumKeys))
assert.Equal(t, len(resp.Value.RawBytes), int(resp.NumBytes))
if tc.expectResume {
require.NotNil(t, resp.ResumeSpan)
require.Equal(t, &roachpb.Span{Key: key}, resp.ResumeSpan)
require.Equal(t, tc.expectReason, resp.ResumeReason)
require.Nil(t, resp.Value)
} else {
require.Nil(t, resp.ResumeSpan)
require.NotNil(t, resp.Value)
require.Equal(t, resp.Value.RawBytes, value.RawBytes)
require.EqualValues(t, 1, resp.NumKeys)
require.Len(t, resp.Value.RawBytes, int(resp.NumBytes))
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func ResolveIntentRange(
if resumeSpan != nil {
update.EndKey = resumeSpan.Key
reply.ResumeSpan = resumeSpan
// The given MaxSpanRequestKeys really specifies the number of intents
// resolved, not the number of keys scanned. We could return
// RESUME_INTENT_LIMIT here, but since the given limit is a key limit we
// return RESUME_KEY_LIMIT for symmetry.
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func ReverseScan(

if scanRes.ResumeSpan != nil {
reply.ResumeSpan = scanRes.ResumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
reply.ResumeReason = scanRes.ResumeReason
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func Scan(

if scanRes.ResumeSpan != nil {
reply.ResumeSpan = scanRes.ResumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
reply.ResumeReason = scanRes.ResumeReason
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
Expand Down
Loading

0 comments on commit 177ed2b

Please sign in to comment.