Skip to content

Commit

Permalink
Merge #45323
Browse files Browse the repository at this point in the history
45323: row: set TargetBytes for kvfetcher r=nvanbenschoten a=tbg

This finishes up the work in #44925 and completes the TargetBytes functionality. This is then used in kvfetcher, which henceforth aims to return no more than ~1mb per request. Additional commits restore the hotspotsplits roachtest and fix it. Reverting the relevant commit from this PR, the test failed nine out of ten times). With all commits, it passed ten times.

The one question I have whether TargetBytes should be set to a value higher than 1mb to avoid a performance regression (where should I look?).

Fixes #33660.

Co-authored-by: Tobias Schottdorf <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Feb 28, 2020
2 parents cc6d152 + de24cbd commit dcf8c44
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 33 deletions.
6 changes: 2 additions & 4 deletions pkg/cmd/roachtest/hotspotsplits.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ func registerHotSpotSplits(r *testRegistry) {
m.Go(func() error {
t.l.Printf("starting load generator\n")

// TODO(rytaft): reset this to 1 << 19 (512 KB) once we can dynamically
// size kv batches.
const blockSize = 1 << 18 // 256 KB
const blockSize = 1 << 19 // 512 KB
return c.RunE(ctx, appNode, fmt.Sprintf(
"./workload run kv --read-percent=0 --tolerate-errors --concurrency=%d "+
"--min-block-bytes=%d --max-block-bytes=%d --duration=%s {pgurl:1-3}",
Expand All @@ -53,7 +51,7 @@ func registerHotSpotSplits(r *testRegistry) {

m.Go(func() error {
t.Status(fmt.Sprintf("starting checks for range sizes"))
const sizeLimit = 3 * (1 << 26) // 192 MB
const sizeLimit = 3 * (1 << 29) // 3*512 MB (512 mb is default size)

db := c.Conn(ctx, 1)
defer db.Close()
Expand Down
21 changes: 16 additions & 5 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (ds *DistSender) initAndVerifyBatch(
return roachpb.NewErrorf("empty batch")
}

if ba.MaxSpanRequestKeys != 0 {
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
// Verify that the batch contains only specific range requests or the
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
// ReverseScan range requests.
Expand Down Expand Up @@ -672,10 +672,11 @@ func (ds *DistSender) Send(
splitET = true
}
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 {
if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
panic("batch with MaxSpanRequestKeys needs splitting")
log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting",
log.Safe(ba.MaxSpanRequestKeys), log.Safe(ba.TargetBytes))
}

errIdxOffset := 0
Expand Down Expand Up @@ -1152,7 +1153,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}
}()

canParallelize := ba.Header.MaxSpanRequestKeys == 0
canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0
if ba.IsSingleCheckConsistencyRequest() {
// Don't parallelize full checksum requests as they have to touch the
// entirety of each replica of each range they touch.
Expand Down Expand Up @@ -1213,12 +1214,14 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0
mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
var replyResults int64
var replyBytes int64
for _, r := range resp.reply.Responses {
replyResults += r.GetInner().Header().NumKeys
replyBytes += r.GetInner().Header().NumBytes
}
// Update MaxSpanRequestKeys, if applicable. Note that ba might be
// passed recursively to further divideAndSendBatchToRanges() calls.
Expand All @@ -1235,6 +1238,14 @@ func (ds *DistSender) divideAndSendBatchToRanges(
return
}
}
if ba.TargetBytes > 0 {
ba.TargetBytes -= replyBytes
if ba.TargetBytes <= 0 {
couldHaveSkippedResponses = true
resumeReason = roachpb.RESUME_KEY_LIMIT
return
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error {
rh.ResumeSpan = otherRH.ResumeSpan
rh.ResumeReason = otherRH.ResumeReason
rh.NumKeys += otherRH.NumKeys
rh.NumBytes += otherRH.NumBytes
rh.RangeInfos = append(rh.RangeInfos, otherRH.RangeInfos...)
return nil
}
Expand Down
107 changes: 85 additions & 22 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var nodeTestBaseContext = testutils.NewNodeTestBaseContext()
Expand Down Expand Up @@ -415,9 +416,10 @@ func TestMultiRangeScanDeleteRange(t *testing.T) {
}
}

// TestMultiRangeScanWithMaxResults tests that commands which access multiple
// ranges with MaxResults parameter are carried out properly.
func TestMultiRangeScanWithMaxResults(t *testing.T) {
// TestMultiRangeScanWithPagination tests that specifying MaxSpanResultKeys
// and/or TargetBytes to break up result sets works properly, even across
// ranges.
func TestMultiRangeScanWithPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
testCases := []struct {
splitKeys []roachpb.Key
Expand All @@ -430,7 +432,7 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) {
roachpb.Key("r"), roachpb.Key("w"), roachpb.Key("y")}},
}

for i, tc := range testCases {
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
Expand All @@ -451,26 +453,87 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) {
}
}

// Try every possible ScanRequest startKey.
for start := 0; start < len(tc.keys); start++ {
// Try every possible maxResults, from 1 to beyond the size of key array.
for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ {
scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next())
reply, err := client.SendWrappedWith(
ctx, tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan,
)
if err != nil {
t.Fatal(err)
}
rows := reply.(*roachpb.ScanResponse).Rows
if start+maxResults <= len(tc.keys) && len(rows) != maxResults {
t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows))
} else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 {
t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows))
}
}
// The maximum TargetBytes to use in this test. We use the bytes in
// all kvs in this test case as a ceiling. Nothing interesting
// happens above this.
var maxTargetBytes int64
{
resp, pErr := client.SendWrapped(ctx, tds, roachpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next()))
require.Nil(t, pErr)
maxTargetBytes = resp.Header().NumBytes
}

testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) {
// Iterate through MaxSpanRequestKeys=1..n and TargetBytes=1..m
// and (where n and m are chosen to reveal the full result set
// in one page). At each(*) combination, paginate both the
// forward and reverse scan and make sure we get the right
// result.
//
// (*) we don't increase the limits when there's only one page,
// but short circuit to something more interesting instead.
msrq := int64(1)
for targetBytes := int64(1); ; targetBytes++ {
var numPages int
t.Run(fmt.Sprintf("targetBytes=%d,maxSpanRequestKeys=%d", targetBytes, msrq), func(t *testing.T) {
req := func(span roachpb.Span) roachpb.Request {
if reverse {
return roachpb.NewReverseScan(span.Key, span.EndKey)
}
return roachpb.NewScan(span.Key, span.EndKey)
}
// Paginate.
resumeSpan := &roachpb.Span{Key: tc.keys[0], EndKey: tc.keys[len(tc.keys)-1].Next()}
var keys []roachpb.Key
for {
numPages++
scan := req(*resumeSpan)
var ba roachpb.BatchRequest
ba.Add(scan)
ba.Header.TargetBytes = targetBytes
ba.Header.MaxSpanRequestKeys = msrq
br, pErr := tds.Send(ctx, ba)
require.Nil(t, pErr)
var rows []roachpb.KeyValue
if reverse {
rows = br.Responses[0].GetReverseScan().Rows
} else {
rows = br.Responses[0].GetScan().Rows
}
for _, kv := range rows {
keys = append(keys, kv.Key)
}
resumeSpan = br.Responses[0].GetInner().Header().ResumeSpan
t.Logf("page #%d: scan %v -> keys (after) %v resume %v", scan.Header().Span(), numPages, keys, resumeSpan)
if resumeSpan == nil {
// Done with this pagination.
break
}
}
if reverse {
for i, n := 0, len(keys); i < n-i-1; i++ {
keys[i], keys[n-i-1] = keys[n-i-1], keys[i]
}
}
require.Equal(t, tc.keys, keys)
if targetBytes == 1 || msrq < int64(len(tc.keys)) {
// Definitely more than one page in this case.
require.Less(t, 1, numPages)
}
if targetBytes >= maxTargetBytes && msrq >= int64(len(tc.keys)) {
// Definitely one page if limits are larger than result set.
require.Equal(t, 1, numPages)
}
})
if targetBytes >= maxTargetBytes || numPages == 1 {
if msrq >= int64(len(tc.keys)) {
return
}
targetBytes = 0
msrq++
}
}
})
})
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ func makeKVBatchFetcherWithSendFunc(
func (f *txnKVFetcher) fetch(ctx context.Context) error {
var ba roachpb.BatchRequest
ba.Header.MaxSpanRequestKeys = f.getBatchSize()
if ba.Header.MaxSpanRequestKeys > 0 {
// If this kvfetcher limits the number of rows returned, also use
// target bytes to guard against the case in which the average row
// is very large.
// If no limit is set, the assumption is that SQL *knows* that there
// is only a "small" amount of data to be read, and wants to preserve
// concurrency for this request inside of DistSender, which setting
// TargetBytes would interfere with.
ba.Header.TargetBytes = 10 * (1 << 20)
}
ba.Header.ReturnRangeInfo = f.returnRangeInfo
ba.Requests = make([]roachpb.RequestUnion, len(f.spans))
if f.reverse {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2317,7 +2317,7 @@ func mvccScanToBytes(
if err := opts.validate(); err != nil {
return MVCCScanResult{}, err
}
if opts.MaxKeys < 0 {
if opts.MaxKeys < 0 || opts.TargetBytes < 0 {
resumeSpan := &roachpb.Span{Key: key, EndKey: endKey}
return MVCCScanResult{ResumeSpan: resumeSpan}, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/engine/testdata/mvcc_histories/target_bytes
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ scan: "c" -> /BYTES/ghijkllkjihg @0.000000123,45
scan: "a" -> /BYTES/abcdef @0.000000123,45
scan: 108 bytes (target 10000000)

# Scans with target size -1 return no results.
run ok
with ts=300,0 k=a end=z targetbytes=-1
scan
scan reverse=true
----
scan: resume span ["a","z")
scan: "a"-"z" -> <no data>
scan: resume span ["a","z")
scan: "a"-"z" -> <no data>

run ok
# Target size one byte returns one result (overshooting instead of returning nothing).
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ func evaluateBatch(
baHeader.MaxSpanRequestKeys = -1
}
}
// Same as for MaxSpanRequestKeys above, keep track of the limit and
// make sure to fall through to -1 instead of hitting zero (which
// means no limit).
if baHeader.TargetBytes > 0 {
retBytes := reply.Header().NumBytes
if baHeader.TargetBytes > retBytes {
baHeader.TargetBytes -= retBytes
} else {
baHeader.TargetBytes = -1
}
}

// If transactional, we use ba.Txn for each individual command and
// accumulate updates to it. Once accumulated, we then remove the Txn
Expand Down
50 changes: 49 additions & 1 deletion pkg/storage/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func TestEvaluateBatch(t *testing.T) {
defer leaktest.AfterTest(t)()

tcs := []testCase{
//
// Test suite for MaxRequestSpans.
//
{
// We should never evaluate empty batches, but here's what would happen
// if we did.
Expand Down Expand Up @@ -196,7 +199,52 @@ func TestEvaluateBatch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
}}
},
//
// Test suite for TargetBytes.
//
{
// Two scans and a target bytes limit that saturates during the
// first.
name: "scans with TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(scanArgsString("a", "c"))
d.ba.Add(getArgsString("e"))
d.ba.Add(scanArgsString("c", "e"))
d.ba.TargetBytes = 1
// Also set a nontrivial MaxSpanRequestKeys, just to make sure
// there's no weird interaction (like it overriding TargetBytes).
// The stricter one ought to win.
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "b-c", "", "c-e")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
}, {
// Ditto in reverse.
name: "reverse scans with TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(revScanArgsString("c", "e"))
d.ba.Add(getArgsString("e"))
d.ba.Add(revScanArgsString("a", "c"))
d.ba.TargetBytes = 1
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "c-c\x00", "", "a-c")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit dcf8c44

Please sign in to comment.