Skip to content

Commit

Permalink
Merge #68370 #71739
Browse files Browse the repository at this point in the history
68370: kvserver: avoid exceeding `TargetBytes` r=nvanbenschoten,yuzefovich a=erikgrinaker

**kvserver: avoid exceeding `TargetBytes`**

Previously, `Header.TargetBytes` would always overshoot the target, i.e.
a scan would terminate only after the result set hit or exceeded
`TargetBytes`.

This patch changes the behavior to never overshoot, except for when the
first result exceeds `TargetBytes` (to make sure clients can always make
progress). However, this is not backwards-compatible with the 21.2
`DistSender` logic, which relies on responses overshooting `TargetBytes`
when enforcing limits for split batches. It therefore adds a
corresponding `TargetBytesAvoidExcess` version gate that only enables
this for 22.1, and `MVCCScanOptions.TargetBytesAvoidExcess` to vary the
scan behavior based on it.

Release note: None

**kvserver: add `TargetBytesAllowEmpty` option**

The `Header.TargetBytes` request limit will be overshot if the first
result exceeds it, to ensure the client can make progress. However, an
upcoming [parallel get/scan streaming library][1] needs to be able to
disable this behavior to avoid exceeding memory budgets when dispatching
parallel requests.

This patch adds a field `TargetBytesAllowEmpty` which will allow an
empty response if the first result exceeds `TargetBytes`. This only has
an effect for `Get` and `Scan` requests, and only on 22.1 clusters since
21.2 `DistSender` logic relies on the limit always being overshot. It
is not supported for `Export` requests where it will return an error.

[1]: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md

Resolves #68050.

Release note: None

**kvserver: add `ResponseHeader.ResumeNextBytes`**

This patch adds the field `ResponseHeader.ResumeNextBytes` which may be
populated with the size of the next result in the `ResumeSpan` if
`ResumeReason` is `RESUME_BYTE_LIMIT` (i.e. if `TargetBytes` was
exceeded). This will be needed by an upcoming [parallel get/scan
streaming library][1] to better manage its memory budget.

The field is best-effort, and may be omitted in some cases where the
result size exactly equals `TargetBytes` and we would have to do
additional work to obtain `ResumeNextBytes` (e.g. at the end of a range,
where we would have to send an additional RPC to the next range).

[1]: https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20210617_index_lookups_memory_limits.md

Release note: None

**kvclient: rename replyResults to replyKeys**

Release note: None

71739: server: fix panic on invalid session cookie. r=azhng a=dhartunian

A recent change (#70792) added some code to deal with duplicate cookies
in the HTTP header on DB Console requests to account for situations
where someone may have multiple `session` cookies in addition to the
CRDB one. That change failed to properly check for a dangling `err`
value outside of a loop before proceeding.

The tests have been modified to exercise the code that resulted in a
panic. The bad cookie header is now also appended to unauthenticated
requests in the test to manufacture a situation where only invalid
cookies are in the header.

Resolves #71728

Release note: None

Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: David Hartunian <[email protected]>
  • Loading branch information
3 people committed Oct 20, 2021
3 parents 6d207a7 + d754b4d + 65fa563 commit 300e4b0
Show file tree
Hide file tree
Showing 29 changed files with 1,449 additions and 770 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 22.1-100 set the active cluster version in the format '<major>.<minor>'
version version 22.1-102 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-100</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>22.1-102</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
10 changes: 10 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,12 @@ const (
// Start22_1 demarcates work towards CockroachDB v22.1.
Start22_1

// TargetBytesAvoidExcess prevents exceeding BatchRequest.Header.TargetBytes
// except when there is a single value in the response. 21.2 DistSender logic
// requires the limit to always be overshot in order to properly enforce
// limits when splitting requests.
TargetBytesAvoidExcess

// *************************************************
// Step (1): Add new versions here.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -505,6 +511,10 @@ var versionsSingleton = keyedVersions{
Key: Start22_1,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 100},
},
{
Key: TargetBytesAvoidExcess,
Version: roachpb.Version{Major: 22, Minor: 1, Internal: 102},
},

// *************************************************
// Step (2): Add new versions here.
Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/kv/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ func (b *Batch) fillResults(ctx context.Context) {
if h := reply.Header(); h.ResumeSpan != nil {
result.ResumeSpan = h.ResumeSpan
result.ResumeReason = h.ResumeReason
result.ResumeNextBytes = h.ResumeNextBytes
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ type Result struct {
// When ResumeSpan is populated, this specifies the reason why the operation
// wasn't completed and needs to be resumed.
ResumeReason roachpb.ResumeReason
// ResumeNextBytes is the size of the next result when ResumeSpan is populated.
ResumeNextBytes int64
}

// ResumeSpanAsValue returns the resume span as a value if one is set,
Expand Down
24 changes: 15 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1345,20 +1345,26 @@ func (ds *DistSender) divideAndSendBatchToRanges(
mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
var replyResults int64
var replyKeys int64
var replyBytes int64
for _, r := range resp.reply.Responses {
replyResults += r.GetInner().Header().NumKeys
replyBytes += r.GetInner().Header().NumBytes
h := r.GetInner().Header()
replyKeys += h.NumKeys
replyBytes += h.NumBytes
if h.ResumeSpan != nil {
couldHaveSkippedResponses = true
resumeReason = h.ResumeReason
return
}
}
// Update MaxSpanRequestKeys, if applicable. Note that ba might be
// passed recursively to further divideAndSendBatchToRanges() calls.
// Update MaxSpanRequestKeys and TargetBytes, if applicable, since ba
// might be passed recursively to further divideAndSendBatchToRanges()
// calls.
if ba.MaxSpanRequestKeys > 0 {
if replyResults > ba.MaxSpanRequestKeys {
log.Fatalf(ctx, "received %d results, limit was %d",
replyResults, ba.MaxSpanRequestKeys)
if replyKeys > ba.MaxSpanRequestKeys {
log.Fatalf(ctx, "received %d results, limit was %d", replyKeys, ba.MaxSpanRequestKeys)
}
ba.MaxSpanRequestKeys -= replyResults
ba.MaxSpanRequestKeys -= replyKeys
// Exiting; any missing responses will be filled in via defer().
if ba.MaxSpanRequestKeys == 0 {
couldHaveSkippedResponses = true
Expand Down
22 changes: 16 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,13 @@ func checkResumeSpanScanResults(
i, spans[i], res.ResumeReason)
}
if rowLen == 0 {
if resumeKey != spans[i][0] {
t.Fatalf("scan %d: expected resume %s, got: %s",
i, spans[i][0], resumeKey)
if resumeKey < spans[i][0] {
t.Fatalf("scan %d: expected resume %s to be at or above scan start %s",
i, resumeKey, spans[i][0])
}
if resumeKey >= spans[i][1] {
t.Fatalf("scan %d: expected resume %s to be below scan end %s",
i, resumeKey, spans[i][1])
}
} else {
lastRes := expResults[i][rowLen-1]
Expand Down Expand Up @@ -277,9 +281,13 @@ func checkResumeSpanReverseScanResults(
i, spans[i], res.ResumeReason)
}
if rowLen == 0 {
if resumeKey != spans[i][1] {
t.Fatalf("scan %d (%s) expected resume %s, got: %s",
i, spans[i], spans[i][1], resumeKey)
if resumeKey <= spans[i][0] {
t.Fatalf("scan %d: expected resume %s to be at or above scan start %s",
i, resumeKey, spans[i][0])
}
if resumeKey > spans[i][1] {
t.Fatalf("scan %d: expected resume %s to be at or below scan end %s",
i, resumeKey, spans[i][1])
}
} else {
lastRes := expResults[i][rowLen-1]
Expand All @@ -305,6 +313,7 @@ func checkScanResults(
expSatisfied map[int]struct{},
opt checkOptions,
) {
t.Helper()
checkSpanResults(t, spans, results, expResults, expSatisfied, opt)
checkResumeSpanScanResults(t, spans, results, expResults, expSatisfied, opt)
}
Expand All @@ -318,6 +327,7 @@ func checkReverseScanResults(
expSatisfied map[int]struct{},
opt checkOptions,
) {
t.Helper()
checkSpanResults(t, spans, results, expResults, expSatisfied, opt)
checkResumeSpanReverseScanResults(t, spans, results, expResults, expSatisfied, opt)
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func Get(
}
return result.Result{}, nil
}

var val *roachpb.Value
var intent *roachpb.Intent
var err error
Expand All @@ -62,8 +63,17 @@ func Get(
return result.Result{}, err
}
if val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if h.TargetBytes > 0 && h.TargetBytesAllowEmpty && numBytes > h.TargetBytes {
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
reply.ResumeNextBytes = numBytes
return result.Result{}, nil
}
reply.NumKeys = 1
reply.NumBytes = int64(len(val.RawBytes))
reply.NumBytes = numBytes
}
var intents []roachpb.Intent
if intent != nil {
Expand Down
45 changes: 34 additions & 11 deletions pkg/kv/kvserver/batcheval/cmd_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -47,33 +49,53 @@ func TestGetResumeSpan(t *testing.T) {
require.NoError(t, err)

testCases := []struct {
maxKeys int64
targetBytes int64
expectResume bool
expectReason roachpb.ResumeReason
maxKeys int64
targetBytes int64
allowEmpty bool
avoidExcess bool
expectResume bool
expectReason roachpb.ResumeReason
expectNextBytes int64
}{
{maxKeys: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT},
{maxKeys: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT, expectNextBytes: 0},
{maxKeys: 0, expectResume: false},
{maxKeys: 1, expectResume: false},
{maxKeys: 1, allowEmpty: true, expectResume: false},

{targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_BYTE_LIMIT},
{targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_BYTE_LIMIT, expectNextBytes: 0},
{targetBytes: 0, expectResume: false},
{targetBytes: 1, expectResume: false},
{targetBytes: 11, expectResume: false},
{targetBytes: 12, expectResume: false},
// allowEmpty takes precedence over avoidExcess at the RPC level, since
// callers have no control over avoidExcess.
{targetBytes: 1, allowEmpty: true, avoidExcess: false, expectResume: true, expectReason: roachpb.RESUME_BYTE_LIMIT, expectNextBytes: 11},
{targetBytes: 11, allowEmpty: true, expectResume: false},
{targetBytes: 12, allowEmpty: true, expectResume: false},
{targetBytes: 1, allowEmpty: true, avoidExcess: true, expectResume: true, expectReason: roachpb.RESUME_BYTE_LIMIT, expectNextBytes: 11},
{targetBytes: 11, allowEmpty: true, avoidExcess: true, expectResume: false},
{targetBytes: 12, allowEmpty: true, avoidExcess: true, expectResume: false},

{maxKeys: -1, targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT},
{maxKeys: -1, targetBytes: -1, expectResume: true, expectReason: roachpb.RESUME_KEY_LIMIT, expectNextBytes: 0},
{maxKeys: 10, targetBytes: 100, expectResume: false},
}
for _, tc := range testCases {
name := fmt.Sprintf("maxKeys=%d targetBytes=%d", tc.maxKeys, tc.targetBytes)
name := fmt.Sprintf("maxKeys=%d targetBytes=%d allowEmpty=%t avoidExcess=%t",
tc.maxKeys, tc.targetBytes, tc.allowEmpty, tc.avoidExcess)
t.Run(name, func(t *testing.T) {
version := clusterversion.TestingBinaryVersion
if !tc.avoidExcess {
version = clusterversion.ByKey(clusterversion.TargetBytesAvoidExcess - 1)
}
settings := cluster.MakeTestingClusterSettingsWithVersions(version, clusterversion.TestingBinaryMinSupportedVersion, true)

resp := roachpb.GetResponse{}
_, err := Get(ctx, db, CommandArgs{
EvalCtx: (&MockEvalCtx{}).EvalContext(),
EvalCtx: (&MockEvalCtx{ClusterSettings: settings}).EvalContext(),
Header: roachpb.Header{
MaxSpanRequestKeys: tc.maxKeys,
TargetBytes: tc.targetBytes,
MaxSpanRequestKeys: tc.maxKeys,
TargetBytes: tc.targetBytes,
TargetBytesAllowEmpty: tc.allowEmpty,
},
Args: &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
Expand All @@ -85,6 +107,7 @@ func TestGetResumeSpan(t *testing.T) {
require.NotNil(t, resp.ResumeSpan)
require.Equal(t, &roachpb.Span{Key: key}, resp.ResumeSpan)
require.Equal(t, tc.expectReason, resp.ResumeReason)
require.Equal(t, tc.expectNextBytes, resp.ResumeNextBytes)
require.Nil(t, resp.Value)
} else {
require.Nil(t, resp.ResumeSpan)
Expand Down
22 changes: 14 additions & 8 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,15 +40,19 @@ func ReverseScan(
var scanRes storage.MVCCScanResult
var err error

avoidExcess := cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx,
clusterversion.TargetBytesAvoidExcess)
opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
TargetBytesAvoidExcess: h.TargetBytesAllowEmpty || avoidExcess, // AllowEmpty takes precedence
TargetBytesAllowEmpty: h.TargetBytesAllowEmpty,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down Expand Up @@ -75,6 +80,7 @@ func ReverseScan(
if scanRes.ResumeSpan != nil {
reply.ResumeSpan = scanRes.ResumeSpan
reply.ResumeReason = scanRes.ResumeReason
reply.ResumeNextBytes = scanRes.ResumeNextBytes
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
Expand Down
24 changes: 15 additions & 9 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -39,16 +40,20 @@ func Scan(
var scanRes storage.MVCCScanResult
var err error

avoidExcess := cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx,
clusterversion.TargetBytesAvoidExcess)
opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
Txn: h.Txn,
LocalUncertaintyLimit: cArgs.LocalUncertaintyLimit,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
TargetBytesAvoidExcess: h.TargetBytesAllowEmpty || avoidExcess, // AllowEmpty takes precedence
TargetBytesAllowEmpty: h.TargetBytesAllowEmpty,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
}

switch args.ScanFormat {
Expand Down Expand Up @@ -76,6 +81,7 @@ func Scan(
if scanRes.ResumeSpan != nil {
reply.ResumeSpan = scanRes.ResumeSpan
reply.ResumeReason = scanRes.ResumeReason
reply.ResumeNextBytes = scanRes.ResumeNextBytes
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
Expand Down
Loading

0 comments on commit 300e4b0

Please sign in to comment.