From 9e11347a053601609ff7edbcbc1bf8f1103f619b Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 24 Feb 2020 15:08:59 +0100 Subject: [PATCH 1/4] kv: finish up impl of TargetBytes scan limit This PR finishes up the work initiated in #44925 to allow (forward and reverse) scans to specify a TargetBytes hint which (mod overshooting by one row) restricts the size of the responses. The plan is to use it in kvfetcher, however this is left as a separate commit - here we focus on testing the functionality only. Release note: None --- pkg/kv/dist_sender.go | 21 +++- pkg/roachpb/api.go | 1 + pkg/server/server_test.go | 107 ++++++++++++++---- pkg/storage/engine/mvcc.go | 2 +- .../testdata/mvcc_histories/target_bytes | 10 ++ pkg/storage/replica_evaluate.go | 11 ++ pkg/storage/replica_evaluate_test.go | 50 +++++++- 7 files changed, 173 insertions(+), 29 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index fdd9024e9a49..2a80b1e931b4 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -558,7 +558,7 @@ func (ds *DistSender) initAndVerifyBatch( return roachpb.NewErrorf("empty batch") } - if ba.MaxSpanRequestKeys != 0 { + if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 { // Verify that the batch contains only specific range requests or the // EndTxnRequest. Verify that a batch with a ReverseScan only contains // ReverseScan range requests. @@ -672,10 +672,11 @@ func (ds *DistSender) Send( splitET = true } parts := splitBatchAndCheckForRefreshSpans(ba, splitET) - if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 { + if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) { // We already verified above that the batch contains only scan requests of the same type. // Such a batch should never need splitting. - panic("batch with MaxSpanRequestKeys needs splitting") + log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting", + log.Safe(ba.MaxSpanRequestKeys), log.Safe(ba.TargetBytes)) } errIdxOffset := 0 @@ -1152,7 +1153,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( } }() - canParallelize := ba.Header.MaxSpanRequestKeys == 0 + canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 if ba.IsSingleCheckConsistencyRequest() { // Don't parallelize full checksum requests as they have to touch the // entirety of each replica of each range they touch. @@ -1213,12 +1214,14 @@ func (ds *DistSender) divideAndSendBatchToRanges( ba.UpdateTxn(resp.reply.Txn) } - mightStopEarly := ba.MaxSpanRequestKeys > 0 + mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 // Check whether we've received enough responses to exit query loop. if mightStopEarly { var replyResults int64 + var replyBytes int64 for _, r := range resp.reply.Responses { replyResults += r.GetInner().Header().NumKeys + replyBytes += r.GetInner().Header().NumBytes } // Update MaxSpanRequestKeys, if applicable. Note that ba might be // passed recursively to further divideAndSendBatchToRanges() calls. @@ -1235,6 +1238,14 @@ func (ds *DistSender) divideAndSendBatchToRanges( return } } + if ba.TargetBytes > 0 { + ba.TargetBytes -= replyBytes + if ba.TargetBytes <= 0 { + couldHaveSkippedResponses = true + resumeReason = roachpb.RESUME_KEY_LIMIT + return + } + } } } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index edb0885f2ee1..8d0c55d4c4fe 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -232,6 +232,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error { rh.ResumeSpan = otherRH.ResumeSpan rh.ResumeReason = otherRH.ResumeReason rh.NumKeys += otherRH.NumKeys + rh.NumBytes += otherRH.NumBytes rh.RangeInfos = append(rh.RangeInfos, otherRH.RangeInfos...) return nil } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 2312d9240332..01ee7f037cd6 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -51,6 +51,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var nodeTestBaseContext = testutils.NewNodeTestBaseContext() @@ -415,9 +416,10 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { } } -// TestMultiRangeScanWithMaxResults tests that commands which access multiple -// ranges with MaxResults parameter are carried out properly. -func TestMultiRangeScanWithMaxResults(t *testing.T) { +// TestMultiRangeScanWithPagination tests that specifying MaxSpanResultKeys +// and/or TargetBytes to break up result sets works properly, even across +// ranges. +func TestMultiRangeScanWithPagination(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { splitKeys []roachpb.Key @@ -430,7 +432,7 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) { roachpb.Key("r"), roachpb.Key("w"), roachpb.Key("y")}}, } - for i, tc := range testCases { + for _, tc := range testCases { t.Run("", func(t *testing.T) { ctx := context.Background() s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) @@ -451,26 +453,87 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) { } } - // Try every possible ScanRequest startKey. - for start := 0; start < len(tc.keys); start++ { - // Try every possible maxResults, from 1 to beyond the size of key array. - for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ { - scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next()) - reply, err := client.SendWrappedWith( - ctx, tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan, - ) - if err != nil { - t.Fatal(err) - } - rows := reply.(*roachpb.ScanResponse).Rows - if start+maxResults <= len(tc.keys) && len(rows) != maxResults { - t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows)) - } else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 { - t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows)) - } - } + // The maximum TargetBytes to use in this test. We use the bytes in + // all kvs in this test case as a ceiling. Nothing interesting + // happens above this. + var maxTargetBytes int64 + { + resp, pErr := client.SendWrapped(ctx, tds, roachpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next())) + require.Nil(t, pErr) + maxTargetBytes = resp.Header().NumBytes } + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + // Iterate through MaxSpanRequestKeys=1..n and TargetBytes=1..m + // and (where n and m are chosen to reveal the full result set + // in one page). At each(*) combination, paginate both the + // forward and reverse scan and make sure we get the right + // result. + // + // (*) we don't increase the limits when there's only one page, + // but short circuit to something more interesting instead. + msrq := int64(1) + for targetBytes := int64(1); ; targetBytes++ { + var numPages int + t.Run(fmt.Sprintf("targetBytes=%d,maxSpanRequestKeys=%d", targetBytes, msrq), func(t *testing.T) { + req := func(span roachpb.Span) roachpb.Request { + if reverse { + return roachpb.NewReverseScan(span.Key, span.EndKey) + } + return roachpb.NewScan(span.Key, span.EndKey) + } + // Paginate. + resumeSpan := &roachpb.Span{Key: tc.keys[0], EndKey: tc.keys[len(tc.keys)-1].Next()} + var keys []roachpb.Key + for { + numPages++ + scan := req(*resumeSpan) + var ba roachpb.BatchRequest + ba.Add(scan) + ba.Header.TargetBytes = targetBytes + ba.Header.MaxSpanRequestKeys = msrq + br, pErr := tds.Send(ctx, ba) + require.Nil(t, pErr) + var rows []roachpb.KeyValue + if reverse { + rows = br.Responses[0].GetReverseScan().Rows + } else { + rows = br.Responses[0].GetScan().Rows + } + for _, kv := range rows { + keys = append(keys, kv.Key) + } + resumeSpan = br.Responses[0].GetInner().Header().ResumeSpan + t.Logf("page #%d: scan %v -> keys (after) %v resume %v", scan.Header().Span(), numPages, keys, resumeSpan) + if resumeSpan == nil { + // Done with this pagination. + break + } + } + if reverse { + for i, n := 0, len(keys); i < n-i-1; i++ { + keys[i], keys[n-i-1] = keys[n-i-1], keys[i] + } + } + require.Equal(t, tc.keys, keys) + if targetBytes == 1 || msrq < int64(len(tc.keys)) { + // Definitely more than one page in this case. + require.Less(t, 1, numPages) + } + if targetBytes >= maxTargetBytes && msrq >= int64(len(tc.keys)) { + // Definitely one page if limits are larger than result set. + require.Equal(t, 1, numPages) + } + }) + if targetBytes >= maxTargetBytes || numPages == 1 { + if msrq >= int64(len(tc.keys)) { + return + } + targetBytes = 0 + msrq++ + } + } + }) }) } } diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 3beda21be4f2..f9133dde5160 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -2317,7 +2317,7 @@ func mvccScanToBytes( if err := opts.validate(); err != nil { return MVCCScanResult{}, err } - if opts.MaxKeys < 0 { + if opts.MaxKeys < 0 || opts.TargetBytes < 0 { resumeSpan := &roachpb.Span{Key: key, EndKey: endKey} return MVCCScanResult{ResumeSpan: resumeSpan}, nil } diff --git a/pkg/storage/engine/testdata/mvcc_histories/target_bytes b/pkg/storage/engine/testdata/mvcc_histories/target_bytes index 148da0306a47..e2048338aa9e 100644 --- a/pkg/storage/engine/testdata/mvcc_histories/target_bytes +++ b/pkg/storage/engine/testdata/mvcc_histories/target_bytes @@ -69,6 +69,16 @@ scan: "c" -> /BYTES/ghijkllkjihg @0.000000123,45 scan: "a" -> /BYTES/abcdef @0.000000123,45 scan: 108 bytes (target 10000000) +# Scans with target size -1 return no results. +run ok +with ts=300,0 k=a end=z targetbytes=-1 + scan + scan reverse=true +---- +scan: resume span ["a","z") +scan: "a"-"z" -> +scan: resume span ["a","z") +scan: "a"-"z" -> run ok # Target size one byte returns one result (overshooting instead of returning nothing). diff --git a/pkg/storage/replica_evaluate.go b/pkg/storage/replica_evaluate.go index 61a0d3ea0861..b9567b7fa315 100644 --- a/pkg/storage/replica_evaluate.go +++ b/pkg/storage/replica_evaluate.go @@ -331,6 +331,17 @@ func evaluateBatch( baHeader.MaxSpanRequestKeys = -1 } } + // Same as for MaxSpanRequestKeys above, keep track of the limit and + // make sure to fall through to -1 instead of hitting zero (which + // means no limit). + if baHeader.TargetBytes > 0 { + retBytes := reply.Header().NumBytes + if baHeader.TargetBytes > retBytes { + baHeader.TargetBytes -= retBytes + } else { + baHeader.TargetBytes = -1 + } + } // If transactional, we use ba.Txn for each individual command and // accumulate updates to it. Once accumulated, we then remove the Txn diff --git a/pkg/storage/replica_evaluate_test.go b/pkg/storage/replica_evaluate_test.go index d41d43ba7e4f..3c6b896b8406 100644 --- a/pkg/storage/replica_evaluate_test.go +++ b/pkg/storage/replica_evaluate_test.go @@ -30,6 +30,9 @@ func TestEvaluateBatch(t *testing.T) { defer leaktest.AfterTest(t)() tcs := []testCase{ + // + // Test suite for MaxRequestSpans. + // { // We should never evaluate empty batches, but here's what would happen // if we did. @@ -196,7 +199,52 @@ func TestEvaluateBatch(t *testing.T) { require.NoError(t, err) require.Equal(t, "value-e", string(b)) }, - }} + }, + // + // Test suite for TargetBytes. + // + { + // Two scans and a target bytes limit that saturates during the + // first. + name: "scans with TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + d.ba.Add(scanArgsString("a", "c")) + d.ba.Add(getArgsString("e")) + d.ba.Add(scanArgsString("c", "e")) + d.ba.TargetBytes = 1 + // Also set a nontrivial MaxSpanRequestKeys, just to make sure + // there's no weird interaction (like it overriding TargetBytes). + // The stricter one ought to win. + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"a"}, []string{"e"}, nil) + verifyResumeSpans(t, r, "b-c", "", "c-e") + b, err := r.br.Responses[1].GetGet().Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "value-e", string(b)) + }, + }, { + // Ditto in reverse. + name: "reverse scans with TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + d.ba.Add(revScanArgsString("c", "e")) + d.ba.Add(getArgsString("e")) + d.ba.Add(revScanArgsString("a", "c")) + d.ba.TargetBytes = 1 + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"d"}, []string{"e"}, nil) + verifyResumeSpans(t, r, "c-c\x00", "", "a-c") + b, err := r.br.Responses[1].GetGet().Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "value-e", string(b)) + }, + }, + } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { From ef8ec0f40d573a5bcfd1c0d51a8e82726ac2dfa2 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 24 Feb 2020 15:20:01 +0100 Subject: [PATCH 2/4] row: set TargetBytes for kvfetcher Fixes #33660. Release note (bug fix): Significantly reduce the amount of memory allocated while scanning tables with a large average row size. --- pkg/sql/row/kv_batch_fetcher.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 42a9e1de7bfd..6348e0fa3c4f 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -231,6 +231,16 @@ func makeKVBatchFetcherWithSendFunc( func (f *txnKVFetcher) fetch(ctx context.Context) error { var ba roachpb.BatchRequest ba.Header.MaxSpanRequestKeys = f.getBatchSize() + if ba.Header.MaxSpanRequestKeys > 0 { + // If this kvfetcher limits the number of rows returned, also use + // target bytes to guard against the case in which the average row + // is very large. + // If no limit is set, the assumption is that SQL *knows* that there + // is only a "small" amount of data to be read, and wants to preserve + // concurrency for this request inside of DistSender, which setting + // TargetBytes would interfere with. + ba.Header.TargetBytes = 10 * (1 << 20) + } ba.Header.ReturnRangeInfo = f.returnRangeInfo ba.Requests = make([]roachpb.RequestUnion, len(f.spans)) if f.reverse { From 8c1ea4c871c0bc4b88c2bc971fba83976684d507 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 24 Feb 2020 15:29:14 +0100 Subject: [PATCH 3/4] roachtest: reset hotspotsplits to 512kb writes We had lowered this because the lack of kvfetcher-level result size limiting caused OOMs (the lower limit still caused OOMs, though more rarely). Now kvfetcher does bound the size of its result sets, so this should just work. Release note: None --- pkg/cmd/roachtest/hotspotsplits.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/cmd/roachtest/hotspotsplits.go b/pkg/cmd/roachtest/hotspotsplits.go index f3008125cb05..0a5be474548b 100644 --- a/pkg/cmd/roachtest/hotspotsplits.go +++ b/pkg/cmd/roachtest/hotspotsplits.go @@ -42,9 +42,7 @@ func registerHotSpotSplits(r *testRegistry) { m.Go(func() error { t.l.Printf("starting load generator\n") - // TODO(rytaft): reset this to 1 << 19 (512 KB) once we can dynamically - // size kv batches. - const blockSize = 1 << 18 // 256 KB + const blockSize = 1 << 19 // 512 KB return c.RunE(ctx, appNode, fmt.Sprintf( "./workload run kv --read-percent=0 --tolerate-errors --concurrency=%d "+ "--min-block-bytes=%d --max-block-bytes=%d --duration=%s {pgurl:1-3}", From de24cbdb80a22364c48986e1fe42ce69c42782a7 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 24 Feb 2020 15:47:18 +0100 Subject: [PATCH 4/4] roachtest: adapt hotspotsplits to new default range size 660b3e7e0d05d31ab1cf6ec001f6f5121bd20fa6 bumped the default range size by a factor of 8. Do the same in this test. Addresses one failure mode of #33660. Release note: None --- pkg/cmd/roachtest/hotspotsplits.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/roachtest/hotspotsplits.go b/pkg/cmd/roachtest/hotspotsplits.go index 0a5be474548b..61d3212fb887 100644 --- a/pkg/cmd/roachtest/hotspotsplits.go +++ b/pkg/cmd/roachtest/hotspotsplits.go @@ -51,7 +51,7 @@ func registerHotSpotSplits(r *testRegistry) { m.Go(func() error { t.Status(fmt.Sprintf("starting checks for range sizes")) - const sizeLimit = 3 * (1 << 26) // 192 MB + const sizeLimit = 3 * (1 << 29) // 3*512 MB (512 mb is default size) db := c.Conn(ctx, 1) defer db.Close()