From 93b20e7931ae27b10dc5e8bf95405bc972fed5da Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Wed, 19 Jan 2022 10:35:59 +1100 Subject: [PATCH 1/5] sql: fix call of `FindIndexWithName` In e89093faf7225a635ddb5d4f260b20a4d8dd64e2, we added `tableDesc.FindIndexWithName(name.String())`. However, `name.String()` can return a `EncodeRestrictedSQLIdent` version of the string. This fixes the call to use `string(name)`. I couldn't produce an error with this at the moment, mostly because there is some other check preventing it from happening from the combinations I've tried. Release note: None --- pkg/sql/alter_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 02034c8d1acc..07bd58ea400d 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -1613,7 +1613,7 @@ func validateConstraintNameIsNotUsed( if name == "" { return false, nil } - idx, _ := tableDesc.FindIndexWithName(name.String()) + idx, _ := tableDesc.FindIndexWithName(string(name)) if idx == nil { return false, nil } From d47f932e9b609b9835da8c7a6dcc90178982c2bb Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Wed, 19 Jan 2022 09:57:00 +0000 Subject: [PATCH 2/5] kvserver: add `SSTTimestamp` parameter for `AddSSTable` This patch adds an `SSTTimestamp` parameter for `AddSSTable`. When set, the client promises that all MVCC timestamps in the given SST are equal to `SSTTimestamp`. When used together with `WriteAtRequestTimestamp`, this can avoid the cost of rewriting the SST timestamps if the `SSTTimestamp` already equals the request `Timestamp`. Release note: None --- pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_add_sstable.go | 38 ++++++++++++- .../batcheval/cmd_add_sstable_test.go | 57 ++++++++++++++----- pkg/roachpb/api.proto | 16 +++++- 4 files changed, 92 insertions(+), 20 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 093634e70587..fd98007f876a 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -140,6 +140,7 @@ go_test( "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 7340b0871c85..88c639b291a5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -56,8 +57,15 @@ func EvalAddSSTable( // If requested, rewrite the SST's MVCC timestamps to the request timestamp. // This ensures the writes comply with the timestamp cache and closed // timestamp, i.e. by not writing to timestamps that have already been - // observed or closed. - if args.WriteAtRequestTimestamp { + // observed or closed. If the race detector is enabled, also assert that + // the provided SST only contains the expected timestamps. + if util.RaceEnabled && !args.SSTTimestamp.IsEmpty() { + if err := assertSSTTimestamp(sst, args.SSTTimestamp); err != nil { + return result.Result{}, err + } + } + if args.WriteAtRequestTimestamp && + (args.SSTTimestamp.IsEmpty() || h.Timestamp != args.SSTTimestamp) { sst, err = storage.UpdateSSTTimestamps(sst, h.Timestamp) if err != nil { return result.Result{}, errors.Wrap(err, "updating SST timestamps") @@ -260,3 +268,29 @@ func EvalAddSSTable( }, }, nil } + +func assertSSTTimestamp(sst []byte, ts hlc.Timestamp) error { + iter, err := storage.NewMemSSTIterator(sst, true) + if err != nil { + return err + } + defer iter.Close() + + iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}) + for { + ok, err := iter.Valid() + if err != nil { + return err + } + if !ok { + return nil + } + + key := iter.UnsafeKey() + if key.Timestamp != ts { + return errors.AssertionFailedf("incorrect timestamp %s for SST key %s (expected %s)", + key.Timestamp, key.Key, ts) + } + iter.Next() + } +} diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 1344a30b13ab..b0f73a384f72 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -70,15 +71,17 @@ func TestEvalAddSSTable(t *testing.T) { // These are run with IngestAsWrites both disabled and enabled. testcases := map[string]struct { - data []mvccKV - sst []mvccKV - atReqTS int64 // WriteAtRequestTimestamp with given timestamp - noConflict bool // DisallowConflicts - noShadow bool // DisallowShadowing - noShadowBelow int64 // DisallowShadowingBelow - expect []mvccKV - expectErr interface{} // error type, substring, or true (any error) - expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats + data []mvccKV + sst []mvccKV + sstTimestamp int64 // SSTTimestamp set to given timestamp + atReqTS int64 // WriteAtRequestTimestamp with given timestamp + noConflict bool // DisallowConflicts + noShadow bool // DisallowShadowing + noShadowBelow int64 // DisallowShadowingBelow + expect []mvccKV + expectErr interface{} // error type, substring, or true (any error) + expectErrUnderRace interface{} + expectStatsEst bool // expect MVCCStats.ContainsEstimates, don't check stats }{ // Blind writes. "blind writes below existing": { @@ -610,6 +613,25 @@ func TestEvalAddSSTable(t *testing.T) { sst: []mvccKV{{"a", 7, "a8"}}, expectErr: &roachpb.WriteTooOldError{}, }, + + // SSTTimestamp + "SSTTimestamp works with WriteAtRequestTimestamp": { + atReqTS: 7, + data: []mvccKV{{"a", 6, "a6"}}, + sst: []mvccKV{{"a", 7, "a7"}}, + sstTimestamp: 7, + expect: []mvccKV{{"a", 7, "a7"}, {"a", 6, "a6"}}, + expectStatsEst: true, + }, + "SSTTimestamp doesn't rewrite with incorrect timestamp, but errors under race": { + atReqTS: 8, + data: []mvccKV{{"a", 6, "a6"}}, + sst: []mvccKV{{"a", 7, "a7"}}, + sstTimestamp: 8, + expect: []mvccKV{{"a", 7, "a7"}, {"a", 6, "a6"}}, + expectErrUnderRace: `incorrect timestamp 0.000000007,0 for SST key "a" (expected 0.000000008,0)`, + expectStatsEst: true, + }, } testutils.RunTrueAndFalse(t, "IngestAsWrites", func(t *testing.T, ingestAsWrites bool) { for name, tc := range testcases { @@ -657,20 +679,25 @@ func TestEvalAddSSTable(t *testing.T) { DisallowShadowing: tc.noShadow, DisallowShadowingBelow: hlc.Timestamp{WallTime: tc.noShadowBelow}, WriteAtRequestTimestamp: tc.atReqTS != 0, + SSTTimestamp: hlc.Timestamp{WallTime: tc.sstTimestamp}, IngestAsWrites: ingestAsWrites, }, }, resp) - if tc.expectErr != nil { + expectErr := tc.expectErr + if expectErr == nil && tc.expectErrUnderRace != nil && util.RaceEnabled { + expectErr = tc.expectErrUnderRace + } + if expectErr != nil { require.Error(t, err) - if b, ok := tc.expectErr.(bool); ok && b { + if b, ok := expectErr.(bool); ok && b { // any error is fine - } else if expectMsg, ok := tc.expectErr.(string); ok { + } else if expectMsg, ok := expectErr.(string); ok { require.Contains(t, err.Error(), expectMsg) - } else if expectErr, ok := tc.expectErr.(error); ok { - require.True(t, errors.HasType(err, expectErr), "expected %T, got %v", expectErr, err) + } else if e, ok := expectErr.(error); ok { + require.True(t, errors.HasType(err, e), "expected %T, got %v", e, err) } else { - require.Fail(t, "invalid expectErr", "expectErr=%v", tc.expectErr) + require.Fail(t, "invalid expectErr", "expectErr=%v", expectErr) } return } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index eb8e377b5c02..89191599ced1 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1572,9 +1572,9 @@ message AdminVerifyProtectedTimestampResponse { } // AddSSTableRequest contains arguments to the AddSSTable method, which links an -// SST file into the Pebble log-structured merge-tree. The SST should only -// contain committed versioned values with non-zero MVCC timestamps (no intents -// or inline values) and no tombstones, but this is only fully enforced when +// SST file into the Pebble log-structured merge-tree. The SST must only contain +// committed versioned values with non-zero MVCC timestamps (no intents or +// inline values) and no tombstones, but this is only fully enforced when // WriteAtRequestTimestamp is enabled, for performance. It cannot be used in a // transaction, cannot be split across ranges, and must be alone in a batch. // @@ -1633,6 +1633,16 @@ message AddSSTableRequest { // Added in 22.1, so check the MVCCAddSSTable version gate before using. bool write_at_request_timestamp = 6; + // SSTTimestamp is a promise from the client that all MVCC timestamps in the + // SST equal the provided timestamp, and that there are no inline values, + // intents, or tombstones. When used together with WriteAtRequestTimestamp, + // this can avoid an SST rewrite (and the associated overhead) if the SST + // timestamp equals the request timestamp (i.e. if it was provided by the + // client and the request was not pushed due to e.g. the closed timestamp or + // contention). + util.hlc.Timestamp sst_timestamp = 9 + [(gogoproto.customname) = "SSTTimestamp", (gogoproto.nullable) = false]; + // DisallowConflicts will check for MVCC conflicts with existing keys, i.e. // scan for existing keys with a timestamp at or above the SST key and // return WriteTooOldError (possibly retrying). It also ensures MVCC From 03e59517f5e97f06b455cd2cb0186f361d0db94a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 10 Dec 2021 20:29:11 +0000 Subject: [PATCH 3/5] roachpb: rename `TargetBytesAllowEmpty` to `AllowEmpty` The `TargetBytesAllowEmpty` parameter currently only applies to `TargetBytes` limits, but will shortly apply to `MaxSpanRequestKeys` as well (when the scan API becomes SQL row-aware and may discard the first row if it's incomplete). This patch therefore renames the parameter to `AllowEmpty`, to make it limit-agnostic. There are no behavioral changes. Release note: None --- pkg/kv/kvclient/kvstreamer/streamer.go | 2 +- pkg/kv/kvserver/batcheval/cmd_get.go | 2 +- pkg/kv/kvserver/batcheval/cmd_get_test.go | 6 +++--- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 4 ++-- pkg/kv/kvserver/batcheval/cmd_scan.go | 4 ++-- pkg/kv/kvserver/batcheval/cmd_scan_test.go | 6 +++--- pkg/roachpb/api.proto | 9 +++++---- pkg/storage/metamorphic/operations.go | 8 ++++---- pkg/storage/mvcc.go | 10 +++++----- pkg/storage/mvcc_history_test.go | 2 +- pkg/storage/pebble_mvcc_scanner.go | 6 +++--- 11 files changed, 30 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvclient/kvstreamer/streamer.go b/pkg/kv/kvclient/kvstreamer/streamer.go index 08aa4b00fea6..ab3894d6f915 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer.go +++ b/pkg/kv/kvclient/kvstreamer/streamer.go @@ -901,7 +901,7 @@ func (w *workerCoordinator) performRequestAsync( var ba roachpb.BatchRequest ba.Header.WaitPolicy = w.lockWaitPolicy ba.Header.TargetBytes = targetBytes - ba.Header.TargetBytesAllowEmpty = !headOfLine + ba.Header.AllowEmpty = !headOfLine // TODO(yuzefovich): consider setting MaxSpanRequestKeys whenever // applicable (#67885). ba.AdmissionHeader = w.requestAdmissionHeader diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index e87309af6210..248f60df0b11 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -66,7 +66,7 @@ func Get( // NB: This calculation is different from Scan, since Scan responses include // the key/value pair while Get only includes the value. numBytes := int64(len(val.RawBytes)) - if h.TargetBytes > 0 && h.TargetBytesAllowEmpty && numBytes > h.TargetBytes { + if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes { reply.ResumeSpan = &roachpb.Span{Key: args.Key} reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT reply.ResumeNextBytes = numBytes diff --git a/pkg/kv/kvserver/batcheval/cmd_get_test.go b/pkg/kv/kvserver/batcheval/cmd_get_test.go index 25a291aa1901..b0ddc170244d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_get_test.go @@ -93,9 +93,9 @@ func TestGetResumeSpan(t *testing.T) { _, err := Get(ctx, db, CommandArgs{ EvalCtx: (&MockEvalCtx{ClusterSettings: settings}).EvalContext(), Header: roachpb.Header{ - MaxSpanRequestKeys: tc.maxKeys, - TargetBytes: tc.targetBytes, - TargetBytesAllowEmpty: tc.allowEmpty, + MaxSpanRequestKeys: tc.maxKeys, + TargetBytes: tc.targetBytes, + AllowEmpty: tc.allowEmpty, }, Args: &roachpb.GetRequest{ RequestHeader: roachpb.RequestHeader{Key: key}, diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 4c21bdf12343..89e69e813ee2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -48,8 +48,8 @@ func ReverseScan( MaxKeys: h.MaxSpanRequestKeys, MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, - TargetBytesAvoidExcess: h.TargetBytesAllowEmpty || avoidExcess, // AllowEmpty takes precedence - TargetBytesAllowEmpty: h.TargetBytesAllowEmpty, + TargetBytesAvoidExcess: h.AllowEmpty || avoidExcess, // AllowEmpty takes precedence + AllowEmpty: h.AllowEmpty, FailOnMoreRecent: args.KeyLocking != lock.None, Reverse: true, MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 5036dc62435d..84965069cc0c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -49,8 +49,8 @@ func Scan( MaxKeys: h.MaxSpanRequestKeys, MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, - TargetBytesAvoidExcess: h.TargetBytesAllowEmpty || avoidExcess, // AllowEmpty takes precedence - TargetBytesAllowEmpty: h.TargetBytesAllowEmpty, + TargetBytesAvoidExcess: h.AllowEmpty || avoidExcess, // AllowEmpty takes precedence + AllowEmpty: h.AllowEmpty, FailOnMoreRecent: args.KeyLocking != lock.None, Reverse: false, MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), diff --git a/pkg/kv/kvserver/batcheval/cmd_scan_test.go b/pkg/kv/kvserver/batcheval/cmd_scan_test.go index 53013c9c9948..f3f03f043573 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan_test.go @@ -117,9 +117,9 @@ func testScanReverseScanInner( cArgs := CommandArgs{ Args: req, Header: roachpb.Header{ - Timestamp: ts, - TargetBytes: tb, - TargetBytesAllowEmpty: allowEmpty, + Timestamp: ts, + TargetBytes: tb, + AllowEmpty: allowEmpty, }, EvalCtx: (&MockEvalCtx{ClusterSettings: settings}).EvalContext(), } diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index eb8e377b5c02..f3c4f3e467e6 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -2251,10 +2251,11 @@ message Header { // supported requests from max_span_request_keys apply to the target_bytes // option as well. int64 target_bytes = 15; - // If true, allow returning an empty result when the first result exceeds - // target_bytes. Only effective on 22.1 clusters with TargetBytesAvoidExcess - // cluster version enabled. Only supported by Get, Scan, and ReverseScan. - bool target_bytes_allow_empty = 23; + // If true, allow returning an empty result when the first result exceeds a + // limit (e.g. TargetBytes). Only effective on 22.1 clusters with + // TargetBytesAvoidExcess version gate enabled. Only supported by Get, Scan, + // and ReverseScan. + bool allow_empty = 23; // If true, DistSender returns partial non-empty results when encountering a // range boundary, with an appropriate resume span and reason // RESUME_RANGE_BOUNDARY. This will disable parallelism of DistSender diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 21dc31404d11..bfe5b4f9f4c6 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -126,7 +126,7 @@ func generateMVCCScan( maxKeys := int64(m.floatGenerator.parse(args[3]) * 32) targetBytes := int64(m.floatGenerator.parse(args[4]) * (1 << 20)) targetBytesAvoidExcess := m.boolGenerator.parse(args[5]) - targetBytesAllowEmpty := m.boolGenerator.parse(args[6]) + allowEmpty := m.boolGenerator.parse(args[6]) return &mvccScanOp{ m: m, key: key.Key, @@ -138,7 +138,7 @@ func generateMVCCScan( maxKeys: maxKeys, targetBytes: targetBytes, targetBytesAvoidExcess: targetBytesAvoidExcess, - targetBytesAllowEmpty: targetBytesAllowEmpty, + allowEmpty: allowEmpty, } } @@ -368,7 +368,7 @@ type mvccScanOp struct { maxKeys int64 targetBytes int64 targetBytesAvoidExcess bool - targetBytesAllowEmpty bool + allowEmpty bool } func (m mvccScanOp) run(ctx context.Context) string { @@ -390,7 +390,7 @@ func (m mvccScanOp) run(ctx context.Context) string { MaxKeys: m.maxKeys, TargetBytes: m.targetBytes, TargetBytesAvoidExcess: m.targetBytesAvoidExcess, - TargetBytesAllowEmpty: m.targetBytesAllowEmpty, + AllowEmpty: m.allowEmpty, }) if err != nil { return fmt.Sprintf("error: %s", err) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 2aa19c7e04e0..b0af2d43bdef 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2366,7 +2366,7 @@ func mvccScanToBytes( maxKeys: opts.MaxKeys, targetBytes: opts.TargetBytes, targetBytesAvoidExcess: opts.TargetBytesAvoidExcess, - targetBytesAllowEmpty: opts.TargetBytesAllowEmpty, + allowEmpty: opts.AllowEmpty, maxIntents: opts.MaxIntents, inconsistent: opts.Inconsistent, tombstones: opts.Tombstones, @@ -2482,7 +2482,7 @@ type MVCCScanOptions struct { // memory during a Scan operation. Once the target is satisfied (i.e. met or // exceeded) by the emitted KV pairs, iteration stops (with a ResumeSpan as // appropriate). In particular, at least one kv pair is returned (when one - // exists), unless TargetBytesAllowEmpty is set. + // exists), unless AllowEmpty is set. // // The number of bytes a particular kv pair accrues depends on internal data // structures, but it is guaranteed to exceed that of the bytes stored in @@ -2496,9 +2496,9 @@ type MVCCScanOptions struct { // TODO(erikgrinaker): This option exists for backwards compatibility with // 21.2 RPC clients, in 22.2 it should always be enabled. TargetBytesAvoidExcess bool - // TargetBytesAllowEmpty will return an empty result if the first kv pair - // exceeds the TargetBytes limit and TargetBytesAvoidExcess is set. - TargetBytesAllowEmpty bool + // AllowEmpty will return an empty result if the first kv pair exceeds the + // TargetBytes limit and TargetBytesAvoidExcess is set. + AllowEmpty bool // MaxIntents is a maximum number of intents collected by scanner in // consistent mode before returning WriteIntentError. // diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 4ec9b513863f..ec4825805afe 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -792,7 +792,7 @@ func cmdScan(e *evalCtx) error { opts.TargetBytesAvoidExcess = true } if e.hasArg("allowEmpty") { - opts.TargetBytesAllowEmpty = true + opts.AllowEmpty = true } res, err := MVCCScan(e.ctx, e.engine, key, endKey, ts, opts) // NB: the error is returned below. This ensures the test can diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index 243526d186fa..13387e2797ff 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -135,7 +135,7 @@ type pebbleMVCCScanner struct { targetBytesAvoidExcess bool // If true, return an empty result if the first result exceeds targetBytes // and targetBytesAvoidExcess is true. - targetBytesAllowEmpty bool + allowEmpty bool // Stop adding intents and abort scan once maxIntents threshold is reached. // This limit is only applicable to consistent scans since they return // intents as an error. @@ -715,8 +715,8 @@ func (p *pebbleMVCCScanner) addAndAdvance(ctx context.Context, rawKey []byte, va // to include tombstones in the results. if len(val) > 0 || p.tombstones { // Check if we should apply the targetBytes limit at all. We do this either - // if this is not the first result or if targetBytesAllowEmpty is true. - if p.targetBytes > 0 && (p.results.count > 0 || p.targetBytesAllowEmpty) { + // if this is not the first result or if allowEmpty is true. + if p.targetBytes > 0 && (p.results.count > 0 || p.allowEmpty) { size := p.results.bytes nextSize := int64(p.results.sizeOf(len(rawKey), len(val))) // Check if we actually exceeded the limit. From 44afbb43e1aa8add842e66ed3737808fa9aad684 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Mon, 3 Jan 2022 20:28:27 +0000 Subject: [PATCH 4/5] storage: add MVCC benchmark support for SQL row data Release note: None --- pkg/storage/BUILD.bazel | 4 ++ pkg/storage/bench_pebble_test.go | 30 +++++++++++ pkg/storage/bench_test.go | 85 ++++++++++++++++++++++++-------- 3 files changed, 98 insertions(+), 21 deletions(-) diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index 597d281530c8..b7cbc7dbfb2d 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -128,6 +128,10 @@ go_test( "//pkg/kv/kvserver/uncertainty", "//pkg/roachpb:with-mocks", "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", "//pkg/storage/enginepb", "//pkg/storage/fs", "//pkg/testutils", diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index 27f96c6e482c..544556918547 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -79,6 +79,36 @@ func BenchmarkMVCCScan_Pebble(b *testing.B) { } } +func BenchmarkMVCCScan_PebbleSQLRows(b *testing.B) { + skip.UnderShort(b) + ctx := context.Background() + for _, numRows := range []int{1, 10, 100, 1000, 10000} { + b.Run(fmt.Sprintf("rows=%d", numRows), func(b *testing.B) { + for _, numColumnFamilies := range []int{1, 3, 10} { + b.Run(fmt.Sprintf("columnFamilies=%d", numColumnFamilies), func(b *testing.B) { + for _, numVersions := range []int{1} { + b.Run(fmt.Sprintf("versions=%d", numVersions), func(b *testing.B) { + for _, valueSize := range []int{8, 64, 512} { + b.Run(fmt.Sprintf("valueSize=%d", valueSize), func(b *testing.B) { + runMVCCScan(ctx, b, setupMVCCPebble, benchScanOptions{ + benchDataOptions: benchDataOptions{ + numColumnFamilies: numColumnFamilies, + numVersions: numVersions, + valueBytes: valueSize, + }, + numRows: numRows, + reverse: false, + }) + }) + } + }) + } + }) + } + }) + } +} + func BenchmarkMVCCReverseScan_Pebble(b *testing.B) { skip.UnderShort(b) ctx := context.Background() diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index f21199757fcb..06a5fdce8893 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -23,6 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -565,9 +569,10 @@ const overhead = 48 // Per key/value overhead (empirically determined) type engineMaker func(testing.TB, string) Engine type benchDataOptions struct { - numVersions int - numKeys int - valueBytes int + numVersions int + numKeys int + valueBytes int + numColumnFamilies int // In transactional mode, data is written by writing and later resolving // intents. In non-transactional mode, data is written directly, without @@ -678,14 +683,15 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int // skip more historical versions; later timestamps mean scans which // skip fewer. // -// The creation of the database is time consuming, especially for larger -// numbers of versions. The database is persisted between runs and stored in -// the current directory as "mvcc_scan___" (which -// is also returned). +// The creation of the database is time consuming, especially for larger numbers +// of versions. The database is persisted between runs and stored in the current +// directory as "mvcc_scan____" +// (which is also returned). func setupMVCCData( ctx context.Context, b *testing.B, emk engineMaker, opts benchDataOptions, ) (Engine, string) { - loc := fmt.Sprintf("mvcc_data_%d_%d_%d", opts.numVersions, opts.numKeys, opts.valueBytes) + loc := fmt.Sprintf("mvcc_data_%d_%d_%d_%d", + opts.numVersions, opts.numKeys, opts.numColumnFamilies, opts.valueBytes) if opts.transactional { loc += "_txn" } @@ -693,8 +699,8 @@ func setupMVCCData( exists := true if _, err := os.Stat(loc); oserror.IsNotExist(err) { exists = false - } else if err != nil { - b.Fatal(err) + } else { + require.NoError(b, err) } eng := emk(b, loc) @@ -709,10 +715,16 @@ func setupMVCCData( // Generate the same data every time. rng := rand.New(rand.NewSource(1449168817)) - keys := make([]roachpb.Key, opts.numKeys) + keySlice := make([]roachpb.Key, opts.numKeys) var order []int + var cf uint32 for i := 0; i < opts.numKeys; i++ { - keys[i] = roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(i))) + if opts.numColumnFamilies > 0 { + keySlice[i] = makeBenchRowKey(b, nil, i/opts.numColumnFamilies, cf) + cf = (cf + 1) % uint32(opts.numColumnFamilies) + } else { + keySlice[i] = roachpb.Key(encoding.EncodeUvarintAscending([]byte("key-"), uint64(i))) + } keyVersions := rng.Intn(opts.numVersions) + 1 for j := 0; j < keyVersions; j++ { order = append(order, i) @@ -734,7 +746,7 @@ func setupMVCCData( } writeKey := func(batch Batch, idx int) { - key := keys[idx] + key := keySlice[idx] value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, opts.valueBytes)) value.InitChecksum(key) counts[idx]++ @@ -749,7 +761,7 @@ func setupMVCCData( } resolveLastIntent := func(batch Batch, idx int) { - key := keys[idx] + key := keySlice[idx] txnMeta := txn.TxnMeta txnMeta.WriteTimestamp = hlc.Timestamp{WallTime: int64(counts[idx]) * 5} if _, err := MVCCResolveWriteIntent(ctx, batch, nil /* ms */, roachpb.LockUpdate{ @@ -795,7 +807,7 @@ func setupMVCCData( if opts.transactional { // If we were writing transactionally, we need to do one last round of // intent resolution. Just stuff it all into the last batch. - for idx := range keys { + for idx := range keySlice { resolveLastIntent(batch, idx) } } @@ -842,17 +854,25 @@ func runMVCCScan(ctx context.Context, b *testing.B, emk engineMaker, opts benchS iter.Close() } + var startKey, endKey roachpb.Key + startKeyBuf := append(make([]byte, 0, 1024), []byte("key-")...) + endKeyBuf := append(make([]byte, 0, 1024), []byte("key-")...) + b.SetBytes(int64(opts.numRows * opts.valueBytes)) b.ResetTimer() - startKeyBuf := append(make([]byte, 0, 64), []byte("key-")...) - endKeyBuf := append(make([]byte, 0, 64), []byte("key-")...) for i := 0; i < b.N; i++ { // Choose a random key to start scan. - keyIdx := rand.Int31n(int32(opts.numKeys - opts.numRows)) - startKey := roachpb.Key(encoding.EncodeUvarintAscending(startKeyBuf[:4], uint64(keyIdx))) - endKey := roachpb.Key(encoding.EncodeUvarintAscending(endKeyBuf[:4], uint64(keyIdx+int32(opts.numRows)-1))) - endKey = endKey.Next() + if opts.numColumnFamilies == 0 { + keyIdx := rand.Int31n(int32(opts.numKeys - opts.numRows)) + startKey = roachpb.Key(encoding.EncodeUvarintAscending(startKeyBuf[:4], uint64(keyIdx))) + endKey = roachpb.Key(encoding.EncodeUvarintAscending(endKeyBuf[:4], uint64(keyIdx+int32(opts.numRows)-1))).Next() + } else { + startID := rand.Int63n(int64((opts.numKeys - opts.numRows) / opts.numColumnFamilies)) + endID := startID + int64(opts.numRows/opts.numColumnFamilies) + 1 + startKey = makeBenchRowKey(b, startKeyBuf[:0], int(startID), 0) + endKey = makeBenchRowKey(b, endKeyBuf[:0], int(endID), 0) + } walltime := int64(5 * (rand.Int31n(int32(opts.numVersions)) + 1)) ts := hlc.Timestamp{WallTime: walltime} res, err := MVCCScan(ctx, eng, startKey, endKey, ts, MVCCScanOptions{ @@ -1576,3 +1596,26 @@ func runCheckSSTConflicts(b *testing.B, numEngineKeys, numVersions, numSstKeys i require.NoError(b, err) } } + +var benchRowColMap catalog.TableColMap +var benchRowPrefix roachpb.Key + +func init() { + benchRowColMap.Set(0, 0) + benchRowPrefix = keys.SystemSQLCodec.IndexPrefix(1, 1) +} + +// makeBenchRowKey makes a key for a SQL row for use in benchmarks, +// using the system tenant with table 1 index 1, and a single column. +func makeBenchRowKey(b *testing.B, buf []byte, id int, columnFamily uint32) roachpb.Key { + var err error + buf = append(buf, benchRowPrefix...) + buf, _, err = rowenc.EncodeColumns( + []descpb.ColumnID{0}, nil /* directions */, benchRowColMap, + []tree.Datum{tree.NewDInt(tree.DInt(id))}, buf) + if err != nil { + // conditionally check this, for performance + require.NoError(b, err) + } + return keys.MakeFamilyKey(buf, columnFamily) +} From 6a7678c19d142b8ed6a7ba3c222d3e18aab66c78 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 14 Dec 2021 19:56:11 +0000 Subject: [PATCH 5/5] kvserver: add `WholeRowsOfSize` request parameter for scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This patch adds a `WholeRowsOfSize` parameter to request headers, and a corresponding `ScanWholeRows` version gate, which prevents scan requests from including partial SQL rows at the end of the result. See `api.proto` for details. This implementation requires the maximum row size to be plumbed down from SQL. This is a performance optimization which allows use of a fixed-size ring buffer to keep track of the last KV byte offsets in `pebbleMVCCScanner`. Two alternative implementations were attempted: keeping track of all KV byte offsets while scanning, and decoding every key's row prefix while scanning to track the last row's starting offset. Both of these alternatives predictably had significant overhead (~10%). However, this patch comes with a 2-3% performance penalty for scans even when `WholeRowsOfSize` is disabled, due to code restructuring and additional checks. Efforts to recover this performance have stalled. ``` name old time/op new time/op delta MVCCScan_Pebble/rows=1/versions=1/valueSize=64-24 4.25µs ± 1% 4.26µs ± 1% ~ (p=0.406 n=16+16) MVCCScan_Pebble/rows=1/versions=10/valueSize=64-24 6.10µs ± 1% 6.13µs ± 1% ~ (p=0.065 n=16+14) MVCCScan_Pebble/rows=100/versions=1/valueSize=64-24 36.4µs ± 1% 37.3µs ± 1% +2.57% (p=0.000 n=16+15) MVCCScan_Pebble/rows=100/versions=10/valueSize=64-24 122µs ± 2% 123µs ± 1% ~ (p=0.068 n=16+13) MVCCScan_Pebble/rows=10000/versions=1/valueSize=64-24 2.29ms ± 1% 2.36ms ± 1% +3.08% (p=0.000 n=16+16) MVCCScan_Pebble/rows=10000/versions=10/valueSize=64-24 9.15ms ± 1% 9.22ms ± 1% +0.75% (p=0.000 n=16+16) ``` Enabling `WholeRowsOfSize` has an additional 0-1.5% penalty for large scans, as seen below. However, notice that single-key scans with `versions=10` can have penalties as high as 7%. This is because rows may omit column families that are all `NULL`, so in order to know whether the single key is a complete row or not the scan must continue to the next key. This additional scan can be relatively expensive for single-key scans if there is a lot of MVCC garbage. However, if the last row contains the final column family when the limit is hit, this penalty can be avoided: compare how `rows=3/columnFamilies=3` and `rows=1/columnFamilies=3` change from `versions=1` to `versions=10`. ``` name old time/op new time/op delta MVCCScan_PebbleSQLRows/rows=1/columnFamilies=1/versions=1/valueSize=64-24 4.38µs ± 1% 4.40µs ± 1% ~ (p=0.075 n=16+14) MVCCScan_PebbleSQLRows/rows=1/columnFamilies=1/versions=10/valueSize=64-24 6.22µs ± 1% 6.18µs ± 1% -0.66% (p=0.004 n=14+15) MVCCScan_PebbleSQLRows/rows=1/columnFamilies=3/versions=1/valueSize=64-24 4.39µs ± 1% 4.33µs ± 1% -1.20% (p=0.000 n=16+16) MVCCScan_PebbleSQLRows/rows=1/columnFamilies=3/versions=10/valueSize=64-24 6.16µs ± 1% 6.54µs ± 1% +6.12% (p=0.000 n=15+16) MVCCScan_PebbleSQLRows/rows=1/columnFamilies=10/versions=1/valueSize=64-24 4.38µs ± 1% 4.35µs ± 1% -0.64% (p=0.000 n=16+14) MVCCScan_PebbleSQLRows/rows=1/columnFamilies=10/versions=10/valueSize=64-24 6.08µs ± 1% 6.50µs ± 1% +6.88% (p=0.000 n=16+16) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=1/versions=1/valueSize=64-24 5.35µs ± 2% 5.36µs ± 1% ~ (p=0.277 n=16+15) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=1/versions=10/valueSize=64-24 9.33µs ± 1% 9.33µs ± 1% ~ (p=0.904 n=16+16) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=3/versions=1/valueSize=64-24 5.34µs ± 1% 5.46µs ± 1% +2.14% (p=0.000 n=14+16) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=3/versions=10/valueSize=64-24 9.32µs ± 2% 9.45µs ± 2% +1.41% (p=0.000 n=16+16) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=10/versions=1/valueSize=64-24 5.32µs ± 1% 5.18µs ± 1% -2.50% (p=0.000 n=16+15) MVCCScan_PebbleSQLRows/rows=3/columnFamilies=10/versions=10/valueSize=64-24 9.22µs ± 1% 9.45µs ± 1% +2.54% (p=0.000 n=16+15) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=1/versions=1/valueSize=64-24 35.9µs ± 1% 35.7µs ± 1% -0.70% (p=0.001 n=15+15) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=1/versions=10/valueSize=64-24 117µs ± 1% 117µs ± 1% ~ (p=0.323 n=16+16) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=3/versions=1/valueSize=64-24 36.7µs ± 1% 37.0µs ± 1% +0.82% (p=0.000 n=15+16) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=3/versions=10/valueSize=64-24 118µs ± 1% 119µs ± 1% +0.89% (p=0.000 n=16+16) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=10/versions=1/valueSize=64-24 35.9µs ± 0% 36.3µs ± 2% +1.26% (p=0.000 n=13+15) MVCCScan_PebbleSQLRows/rows=100/columnFamilies=10/versions=10/valueSize=64-24 116µs ± 1% 117µs ± 1% +0.77% (p=0.000 n=15+16) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=1/versions=1/valueSize=64-24 2.41ms ± 1% 2.41ms ± 1% ~ (p=0.094 n=16+16) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=1/versions=10/valueSize=64-24 9.11ms ± 1% 9.10ms ± 1% ~ (p=0.822 n=14+16) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=3/versions=1/valueSize=64-24 2.52ms ± 1% 2.53ms ± 1% ~ (p=0.591 n=14+15) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=3/versions=10/valueSize=64-24 9.28ms ± 1% 9.33ms ± 1% +0.54% (p=0.010 n=16+16) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=10/versions=1/valueSize=64-24 2.44ms ± 1% 2.46ms ± 1% +0.96% (p=0.000 n=15+13) MVCCScan_PebbleSQLRows/rows=10000/columnFamilies=10/versions=10/valueSize=64-24 9.13ms ± 1% 9.18ms ± 1% +0.59% (p=0.006 n=15+16) ``` Release note: None --- .../settings/settings-for-tenants.txt | 2 +- docs/generated/settings/settings.html | 2 +- pkg/clusterversion/cockroach_versions.go | 7 + pkg/clusterversion/key_string.go | 5 +- pkg/keys/keys.go | 21 ++ pkg/kv/kvclient/kvcoord/dist_sender.go | 5 +- pkg/kv/kvserver/batcheval/BUILD.bazel | 4 + pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 1 + pkg/kv/kvserver/batcheval/cmd_scan.go | 1 + pkg/kv/kvserver/batcheval/cmd_scan_test.go | 79 +++++ pkg/roachpb/api.proto | 17 + pkg/storage/bench_pebble_test.go | 23 +- pkg/storage/bench_test.go | 26 +- pkg/storage/mvcc.go | 16 +- pkg/storage/mvcc_history_test.go | 47 ++- pkg/storage/pebble_mvcc_scanner.go | 311 +++++++++++++++--- pkg/storage/pebble_mvcc_scanner_test.go | 10 +- pkg/storage/testdata/mvcc_histories/max_keys | 138 ++++++++ .../testdata/mvcc_histories/target_bytes | 193 ++++++++++- 19 files changed, 826 insertions(+), 82 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 13f15d7ca395..107d07b5c70e 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -170,4 +170,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-42 set the active cluster version in the format '.' +version version 21.2-44 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index a670872ebe17..f7760d7f155b 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -177,6 +177,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-42set the active cluster version in the format '.' +versionversion21.2-44set the active cluster version in the format '.' diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 91d7647ccf7b..e3aa9a451467 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -239,6 +239,9 @@ const ( // EnableSpanConfigStore enables the use of the span configs infrastructure // in KV. EnableSpanConfigStore + // ScanWholeRows is the version at which the Header.WholeRowsOfSize parameter + // was introduced, preventing limited scans from returning partial rows. + ScanWholeRows // ************************************************* // Step (1): Add new versions here. @@ -359,6 +362,10 @@ var versionsSingleton = keyedVersions{ Key: EnableSpanConfigStore, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 42}, }, + { + Key: ScanWholeRows, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 44}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 881b8600e993..0867acdbd876 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -30,11 +30,12 @@ func _() { _ = x[EnsureSpanConfigReconciliation-19] _ = x[EnsureSpanConfigSubscription-20] _ = x[EnableSpanConfigStore-21] + _ = x[ScanWholeRows-22] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsAlterSystemProtectedTimestampAddColumnEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStore" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsAlterSystemProtectedTimestampAddColumnEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRows" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 463, 493, 521, 542} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 463, 493, 521, 542, 555} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index c0dbfa6dbcee..de6eae1dcbb3 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -13,6 +13,7 @@ package keys import ( "bytes" "fmt" + "math" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -776,6 +777,26 @@ func MakeFamilyKey(key []byte, famID uint32) []byte { return encoding.EncodeUvarintAscending(key, uint64(len(key)-size)) } +// DecodeFamilyKey returns the family ID in the given row key. Returns an error +// if the key does not contain a family ID. +func DecodeFamilyKey(key []byte) (uint32, error) { + n, err := GetRowPrefixLength(key) + if err != nil { + return 0, err + } + if n <= 0 || n >= len(key) { + return 0, errors.Errorf("invalid row prefix, got prefix length %d for key %s", n, key) + } + _, colFamilyID, err := encoding.DecodeUvarintAscending(key[n:]) + if err != nil { + return 0, err + } + if colFamilyID > math.MaxUint32 { + return 0, errors.Errorf("column family ID overflow, got %d", colFamilyID) + } + return uint32(colFamilyID), nil +} + // DecodeTableIDIndexID decodes a table id followed by an index id from the // provided key. The input key must already have its tenant id removed. func DecodeTableIDIndexID(key []byte) ([]byte, uint32, uint32, error) { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index f1cb5a1fb83e..c51e77fbe2c0 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1369,11 +1369,8 @@ func (ds *DistSender) divideAndSendBatchToRanges( // might be passed recursively to further divideAndSendBatchToRanges() // calls. if ba.MaxSpanRequestKeys > 0 { - if replyKeys > ba.MaxSpanRequestKeys { - log.Fatalf(ctx, "received %d results, limit was %d", replyKeys, ba.MaxSpanRequestKeys) - } ba.MaxSpanRequestKeys -= replyKeys - if ba.MaxSpanRequestKeys == 0 { + if ba.MaxSpanRequestKeys <= 0 { couldHaveSkippedResponses = true resumeReason = roachpb.RESUME_KEY_LIMIT return diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index 093634e70587..6e572dabdb9b 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -133,6 +133,10 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/rowenc", + "//pkg/sql/sem/tree", "//pkg/storage", "//pkg/storage/enginepb", "//pkg/testutils", diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 89e69e813ee2..bd65947021a7 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -50,6 +50,7 @@ func ReverseScan( TargetBytes: h.TargetBytes, TargetBytesAvoidExcess: h.AllowEmpty || avoidExcess, // AllowEmpty takes precedence AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, FailOnMoreRecent: args.KeyLocking != lock.None, Reverse: true, MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 84965069cc0c..90a951db5187 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -51,6 +51,7 @@ func Scan( TargetBytes: h.TargetBytes, TargetBytesAvoidExcess: h.AllowEmpty || avoidExcess, // AllowEmpty takes precedence AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, FailOnMoreRecent: args.KeyLocking != lock.None, Reverse: false, MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), diff --git a/pkg/kv/kvserver/batcheval/cmd_scan_test.go b/pkg/kv/kvserver/batcheval/cmd_scan_test.go index f3f03f043573..0d02c88315ba 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan_test.go @@ -16,8 +16,13 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -176,3 +181,77 @@ func testScanReverseScanInner( require.Len(t, rows, expN) } } + +// TestScanReverseScanWholeRows checks that WholeRowsOfSize is wired up +// correctly. Comprehensive testing is done e.g. in TestMVCCHistories. +func TestScanReverseScanWholeRows(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + ts := hlc.Timestamp{WallTime: 1} + + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + + // Write 2 rows with 3 column families each. + var rowKeys []roachpb.Key + for r := 0; r < 2; r++ { + for cf := uint32(0); cf < 3; cf++ { + key := makeRowKey(t, r, cf) + err := storage.MVCCPut(ctx, eng, nil, key, ts, roachpb.MakeValueFromString("value"), nil) + require.NoError(t, err) + rowKeys = append(rowKeys, key) + } + } + + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + var req roachpb.Request + var resp roachpb.Response + if !reverse { + req = &roachpb.ScanRequest{} + resp = &roachpb.ScanResponse{} + } else { + req = &roachpb.ReverseScanRequest{} + resp = &roachpb.ReverseScanResponse{} + } + req.SetHeader(roachpb.RequestHeader{Key: rowKeys[0], EndKey: roachpb.KeyMax}) + + // Scan with limit of 5 keys. This should only return the first row (3 keys), + // since they second row would yield 6 keys total. + cArgs := CommandArgs{ + EvalCtx: (&MockEvalCtx{ClusterSettings: cluster.MakeTestingClusterSettings()}).EvalContext(), + Args: req, + Header: roachpb.Header{ + Timestamp: ts, + MaxSpanRequestKeys: 5, + WholeRowsOfSize: 3, + }, + } + + if !reverse { + _, err := Scan(ctx, eng, cArgs, resp) + require.NoError(t, err) + } else { + _, err := ReverseScan(ctx, eng, cArgs, resp) + require.NoError(t, err) + } + + require.EqualValues(t, resp.Header().NumKeys, 3) + }) +} + +// makeRowKey makes a key for a SQL row for use in tests, using the system +// tenant with table 1 index 1, and a single column. +func makeRowKey(t *testing.T, id int, columnFamily uint32) roachpb.Key { + var colMap catalog.TableColMap + colMap.Set(0, 0) + + var err error + key := keys.SystemSQLCodec.IndexPrefix(1, 1) + key, _, err = rowenc.EncodeColumns( + []descpb.ColumnID{0}, nil /* directions */, colMap, + []tree.Datum{tree.NewDInt(tree.DInt(id))}, key) + require.NoError(t, err) + return keys.MakeFamilyKey(key, columnFamily) +} diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index f3c4f3e467e6..4c3e143400da 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -131,6 +131,10 @@ message ResponseHeader { // TargetBytes is exactly satisfied by a result that exhausted a range scan, // or by a response from a multi-request batch, we won't do additional work // (e.g. send another RPC to the next range) only to obtain resume_next_bytes. + // + // Also note that this is unaffected by whole_rows_of_size. The client may + // care about whole rows, but we'll only return the size of the next KV pair + // (which may just be part of the row), to avoid the cost of additional IO. int64 resume_next_bytes = 9; // The number of keys operated on. @@ -2256,6 +2260,19 @@ message Header { // TargetBytesAvoidExcess version gate enabled. Only supported by Get, Scan, // and ReverseScan. bool allow_empty = 23; + // If positive, Scan and ReverseScan requests with limits (MaxSpanRequestKeys + // or TargetBytes) will not return results with partial SQL rows at the end + // (recall that SQL rows can span multiple keys). Such partial rows will be + // removed from the result, unless AllowEmpty is false and the partial row is + // the first result row, in which case additional keys will be fetched to + // complete the row. + // + // The given value specifies the maximum number of keys in a row (i.e. the + // number of column families). If any larger rows are found at the end of the + // result, an error is returned. + // + // Added in 22.1, callers must check the ScanWholeRows version gate first. + int32 whole_rows_of_size = 26; // If true, DistSender returns partial non-empty results when encountering a // range boundary, with an appropriate resume span and reason // RESUME_RANGE_BOUNDARY. This will disable parallelism of DistSender diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index 544556918547..5639222f6780 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -90,15 +90,20 @@ func BenchmarkMVCCScan_PebbleSQLRows(b *testing.B) { b.Run(fmt.Sprintf("versions=%d", numVersions), func(b *testing.B) { for _, valueSize := range []int{8, 64, 512} { b.Run(fmt.Sprintf("valueSize=%d", valueSize), func(b *testing.B) { - runMVCCScan(ctx, b, setupMVCCPebble, benchScanOptions{ - benchDataOptions: benchDataOptions{ - numColumnFamilies: numColumnFamilies, - numVersions: numVersions, - valueBytes: valueSize, - }, - numRows: numRows, - reverse: false, - }) + for _, wholeRows := range []bool{false, true} { + b.Run(fmt.Sprintf("wholeRows=%t", wholeRows), func(b *testing.B) { + runMVCCScan(ctx, b, setupMVCCPebble, benchScanOptions{ + benchDataOptions: benchDataOptions{ + numColumnFamilies: numColumnFamilies, + numVersions: numVersions, + valueBytes: valueSize, + }, + numRows: numRows, + reverse: false, + wholeRows: wholeRows, + }) + }) + } }) } }) diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 06a5fdce8893..72c2c4e0071f 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -824,8 +824,9 @@ func setupMVCCData( type benchScanOptions struct { benchDataOptions - numRows int - reverse bool + numRows int + reverse bool + wholeRows bool } // runMVCCScan first creates test data (and resets the benchmarking @@ -841,6 +842,9 @@ func runMVCCScan(ctx context.Context, b *testing.B, emk engineMaker, opts benchS b.Fatal("test error: cannot call runMVCCScan with non-zero numKeys") } opts.numKeys = 100000 + if opts.wholeRows && opts.numColumnFamilies == 0 { + b.Fatal("test error: wholeRows requires numColumnFamilies > 0") + } eng, _ := setupMVCCData(ctx, b, emk, opts.benchDataOptions) defer eng.Close() @@ -875,15 +879,25 @@ func runMVCCScan(ctx context.Context, b *testing.B, emk engineMaker, opts benchS } walltime := int64(5 * (rand.Int31n(int32(opts.numVersions)) + 1)) ts := hlc.Timestamp{WallTime: walltime} + var wholeRowsOfSize int32 + if opts.wholeRows { + wholeRowsOfSize = int32(opts.numColumnFamilies) + } res, err := MVCCScan(ctx, eng, startKey, endKey, ts, MVCCScanOptions{ - MaxKeys: int64(opts.numRows), - Reverse: opts.reverse, + MaxKeys: int64(opts.numRows), + WholeRowsOfSize: wholeRowsOfSize, + AllowEmpty: wholeRowsOfSize != 0, + Reverse: opts.reverse, }) if err != nil { b.Fatalf("failed scan: %+v", err) } - if len(res.KVs) != opts.numRows { - b.Fatalf("failed to scan: %d != %d", len(res.KVs), opts.numRows) + expectKVs := opts.numRows + if opts.wholeRows { + expectKVs -= opts.numRows % opts.numColumnFamilies + } + if len(res.KVs) != expectKVs { + b.Fatalf("failed to scan: %d != %d", len(res.KVs), expectKVs) } } diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index b0af2d43bdef..4216ac146663 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -874,7 +874,7 @@ func mvccGet( keyBuf: mvccScanner.keyBuf, } - mvccScanner.init(opts.Txn, opts.Uncertainty) + mvccScanner.init(opts.Txn, opts.Uncertainty, 0) mvccScanner.get(ctx) // If we have a trace, emit the scan stats that we produced. @@ -2367,6 +2367,7 @@ func mvccScanToBytes( targetBytes: opts.TargetBytes, targetBytesAvoidExcess: opts.TargetBytesAvoidExcess, allowEmpty: opts.AllowEmpty, + wholeRows: opts.WholeRowsOfSize > 1, // single-KV rows don't need processing maxIntents: opts.MaxIntents, inconsistent: opts.Inconsistent, tombstones: opts.Tombstones, @@ -2374,7 +2375,11 @@ func mvccScanToBytes( keyBuf: mvccScanner.keyBuf, } - mvccScanner.init(opts.Txn, opts.Uncertainty) + var trackLastOffsets int + if opts.WholeRowsOfSize > 1 { + trackLastOffsets = int(opts.WholeRowsOfSize) + } + mvccScanner.init(opts.Txn, opts.Uncertainty, trackLastOffsets) var res MVCCScanResult var err error @@ -2499,6 +2504,13 @@ type MVCCScanOptions struct { // AllowEmpty will return an empty result if the first kv pair exceeds the // TargetBytes limit and TargetBytesAvoidExcess is set. AllowEmpty bool + // WholeRowsOfSize will prevent returning partial rows when limits (MaxKeys or + // TargetBytes) are set. The value indicates the max number of keys per row. + // If the last KV pair(s) belong to a partial row, they will be removed from + // the result -- except if the result only consists of a single partial row + // and AllowEmpty is false, in which case the remaining KV pairs of the row + // will be fetched and returned too. + WholeRowsOfSize int32 // MaxIntents is a maximum number of intents collected by scanner in // consistent mode before returning WriteIntentError. // diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index ec4825805afe..f8fa5f827f6c 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -21,6 +21,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -70,6 +74,7 @@ import ( // - `+foo` means `Key(foo).Next()` // - `-foo` means `Key(foo).PrefixEnd()` // - `%foo` means `append(LocalRangePrefix, "foo")` +// - `/foo/7` means SQL row with key foo, optional column family 7 (system tenant, table/index 1). // // Additionally, the pseudo-command `with` enables sharing // a group of arguments between multiple commands, for example: @@ -794,6 +799,9 @@ func cmdScan(e *evalCtx) error { if e.hasArg("allowEmpty") { opts.AllowEmpty = true } + if e.hasArg("wholeRows") { + opts.WholeRowsOfSize = 10 // arbitrary, must be greater than largest column family in tests + } res, err := MVCCScan(e.ctx, e.engine, key, endKey, ts, opts) // NB: the error is returned below. This ensures the test can // ascertain no result is populated in the intents when an error @@ -1027,15 +1035,44 @@ func (e *evalCtx) lookupTxn(txnName string) (*roachpb.Transaction, error) { } func toKey(s string) roachpb.Key { - switch { - case len(s) > 0 && s[0] == '+': + if len(s) == 0 { + return roachpb.Key(s) + } + switch s[0] { + case '+': return roachpb.Key(s[1:]).Next() - case len(s) > 0 && s[0] == '=': + case '=': return roachpb.Key(s[1:]) - case len(s) > 0 && s[0] == '-': + case '-': return roachpb.Key(s[1:]).PrefixEnd() - case len(s) > 0 && s[0] == '%': + case '%': return append(keys.LocalRangePrefix, s[1:]...) + case '/': + var pk string + var columnFamilyID uint64 + var err error + parts := strings.Split(s[1:], "/") + switch len(parts) { + case 2: + if columnFamilyID, err = strconv.ParseUint(parts[1], 10, 32); err != nil { + panic(fmt.Sprintf("invalid column family ID %s in row key %s: %s", parts[1], s, err)) + } + fallthrough + case 1: + pk = parts[0] + default: + panic(fmt.Sprintf("expected at most one / separator in row key %s", s)) + } + + var colMap catalog.TableColMap + colMap.Set(0, 0) + key := keys.SystemSQLCodec.IndexPrefix(1, 1) + key, _, err = rowenc.EncodeColumns([]descpb.ColumnID{0}, nil /* directions */, colMap, []tree.Datum{tree.NewDString(pk)}, key) + if err != nil { + panic(err) + } + key = keys.MakeFamilyKey(key, uint32(columnFamilyID)) + return key default: return roachpb.Key(s) } diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index 13387e2797ff..6fa0f3910393 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -17,6 +17,7 @@ import ( "sort" "sync" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -41,6 +42,32 @@ type pebbleResults struct { bytes int64 repr []byte bufs [][]byte + + // lastOffsets is a ring buffer that keeps track of byte offsets for the last + // N KV pairs. It is used to discard a partial SQL row at the end of the + // result via maybeTrimPartialLastRows() -- such rows can span multiple KV + // pairs. The length of lastOffsets is interpreted as the maximum expected SQL + // row size (i.e. number of column families). + // + // lastOffsets is initialized with a fixed length giving the N number of last + // KV pair offsets to track. lastOffsetIdx contains the index in lastOffsets + // where the next KV byte offset will be written, wrapping around to 0 when it + // reaches the end of lastOffsets. + // + // The lastOffsets values are byte offsets in p.repr and p.bufs. The latest + // lastOffset (i.e. the one at lastOffsetIdx-1) will be an offset in p.repr. + // When iterating backwards through the ring buffer and crossing a byte offset + // of 0, the next iterated byte offset in the ring buffer (at i-1) will then + // point to the previous buffer in p.bufs. + // + // Actual and default 0 values in the slice are disambiguated when iterating + // backwards through p.repr and p.bufs. If we iterate to the start of all byte + // buffers without iterating through all of lastOffsets (i.e. when there are + // fewer KV pairs than the length of lastOffsets), then we must be at the start + // of lastOffsets, and any 0 values at the end are of no interest. + lastOffsetsEnabled bool // NB: significantly faster than checking lastOffsets != nil + lastOffsets []int + lastOffsetIdx int } func (p *pebbleResults) clear() { @@ -96,6 +123,18 @@ func (p *pebbleResults) put( copy(p.repr[startIdx+kvLenSize+lenKey:], value) p.count++ p.bytes += int64(lenToAdd) + + // If we're tracking KV offsets, update the ring buffer. + if p.lastOffsetsEnabled { + p.lastOffsets[p.lastOffsetIdx] = startIdx + p.lastOffsetIdx++ + // NB: Branching is significantly faster than modulo in benchmarks, likely + // because of a high branch prediction hit rate. + if p.lastOffsetIdx == len(p.lastOffsets) { + p.lastOffsetIdx = 0 + } + } + return nil } @@ -103,6 +142,107 @@ func (p *pebbleResults) sizeOf(lenKey, lenValue int) int { return kvLenSize + lenKey + lenValue } +// continuesFirstRow returns true if the given key belongs to the same SQL row +// as the first KV pair in the result (or if the result is empty). If either +// key is not a valid SQL row key, returns false. +func (p *pebbleResults) continuesFirstRow(key roachpb.Key) bool { + repr := p.repr + if len(p.bufs) > 0 { + repr = p.bufs[0] + } + if len(repr) == 0 { + return true // no rows in the result + } + + rowPrefix := getRowPrefix(key) + if rowPrefix == nil { + return false + } + return bytes.Equal(rowPrefix, getRowPrefix(extractResultKey(repr))) +} + +// lastRowHasFinalColumnFamily returns true if the last key in the result is the +// maximum column family ID of the row (i.e. when it equals len(lastOffsets)-1). +// If so, we know that the row is complete. However, the inverse is not true: +// the final column families of the row may be omitted, in which case the caller +// has to scan to the next key to find out whether the row is complete. +func (p *pebbleResults) lastRowHasFinalColumnFamily() bool { + if !p.lastOffsetsEnabled || p.count == 0 { + return false + } + + lastOffsetIdx := p.lastOffsetIdx - 1 // p.lastOffsetIdx is where next offset would be stored + if lastOffsetIdx < 0 { + lastOffsetIdx = len(p.lastOffsets) - 1 + } + lastOffset := p.lastOffsets[lastOffsetIdx] + + key := extractResultKey(p.repr[lastOffset:]) + colFamilyID, err := keys.DecodeFamilyKey(key) + if err != nil { + return false + } + return int(colFamilyID) == len(p.lastOffsets)-1 +} + +// maybeTrimPartialLastRow removes the last KV pairs from the result that are part +// of the same SQL row as the given key, returning the earliest key removed. The +// row cannot be made up of more KV pairs than given by len(lastOffsets), +// otherwise an error is returned. Must be called before finish(). +func (p *pebbleResults) maybeTrimPartialLastRow(nextKey roachpb.Key) (roachpb.Key, error) { + if !p.lastOffsetsEnabled || len(p.repr) == 0 { + return nil, nil + } + trimRowPrefix := getRowPrefix(nextKey) + if trimRowPrefix == nil { + return nil, nil + } + + var firstTrimmedKey roachpb.Key + + // We're iterating backwards through the p.lastOffsets ring buffer, starting + // at p.lastOffsetIdx-1 (which is where the last KV was stored). The loop + // condition simply makes sure we limit the number of iterations to the size + // of the ring buffer, to prevent wrapping around. + for i := 0; i < len(p.lastOffsets); i++ { + lastOffsetIdx := p.lastOffsetIdx - 1 // p.lastOffsetIdx is where next offset would be stored + if lastOffsetIdx < 0 { + lastOffsetIdx = len(p.lastOffsets) - 1 + } + lastOffset := p.lastOffsets[lastOffsetIdx] + + // The remainder of repr from the offset is now a single KV. + repr := p.repr[lastOffset:] + key := extractResultKey(repr) + rowPrefix := getRowPrefix(key) + + // If the prefix belongs to a different row, we're done trimming. + if !bytes.Equal(rowPrefix, trimRowPrefix) { + return firstTrimmedKey, nil + } + + // Remove this KV pair. + p.repr = p.repr[:lastOffset] + p.count-- + p.bytes -= int64(len(repr)) + firstTrimmedKey = key + + p.lastOffsetIdx = lastOffsetIdx + p.lastOffsets[lastOffsetIdx] = 0 + + if len(p.repr) == 0 { + if len(p.bufs) == 0 { + // The entire result set was trimmed, so we're done. + return firstTrimmedKey, nil + } + // Pop the last buf back into repr. + p.repr = p.bufs[len(p.bufs)-1] + p.bufs = p.bufs[:len(p.bufs)-1] + } + } + return nil, errors.Errorf("row exceeds expected max size (%d): %s", len(p.lastOffsets), nextKey) +} + func (p *pebbleResults) finish() [][]byte { if len(p.repr) > 0 { p.bufs = append(p.bufs, p.repr) @@ -111,6 +251,32 @@ func (p *pebbleResults) finish() [][]byte { return p.bufs } +// getRowPrefix decodes a SQL row prefix from the given key. Returns nil if the +// key is not a valid SQL row, or if the prefix is the entire key. +func getRowPrefix(key roachpb.Key) []byte { + if len(key) == 0 { + return nil + } + n, err := keys.GetRowPrefixLength(key) + if err != nil || n <= 0 || n >= len(key) { + return nil + } + return key[:n] +} + +// extractResultKey takes in a binary KV result representation, finds the raw +// key, decodes it as an MVCC key, and returns the key (without timestamp). +// Returns nil if the key could not be decoded. repr must be a valid, non-empty +// KV representation, otherwise this may panic. +func extractResultKey(repr []byte) roachpb.Key { + keyLen := binary.LittleEndian.Uint32(repr[4:8]) + key, _, ok := enginepb.SplitMVCCKey(repr[8 : 8+keyLen]) + if !ok { + return nil + } + return key +} + // Go port of mvccScanner in libroach/mvcc.h. Stores all variables relating to // one MVCCGet / MVCCScan call. type pebbleMVCCScanner struct { @@ -136,15 +302,25 @@ type pebbleMVCCScanner struct { // If true, return an empty result if the first result exceeds targetBytes // and targetBytesAvoidExcess is true. allowEmpty bool + // If set, don't return partial SQL rows (spanning multiple KV pairs) when + // hitting a limit. Partial rows at the end of the result will be trimmed. If + // allowEmpty is false, and the partial row is the first row in the result, + // the row will instead be completed by fetching additional KV pairs. + // + // Requires init() to have been called with trackLastOffsets set to the + // maximum number of KV pairs in a row. + wholeRows bool // Stop adding intents and abort scan once maxIntents threshold is reached. // This limit is only applicable to consistent scans since they return // intents as an error. // Not used in inconsistent scans. // Ignored if zero. maxIntents int64 - // resumeReason contains the reason why an iteration was ended prematurely, - // i.e. which of the above limits were exceeded. - resumeReason roachpb.ResumeReason + // Resume fields describe the resume span to return. resumeReason must be set + // to a non-zero value to return a resume span, the others are optional. + resumeReason roachpb.ResumeReason + resumeKey roachpb.Key // if unset, falls back to p.advanceKey() + resumeNextBytes int64 // set when targetBytes is exceeded // Transaction epoch and sequence number. txn *roachpb.Transaction txnEpoch enginepb.TxnEpoch @@ -168,7 +344,6 @@ type pebbleMVCCScanner struct { curUnsafeKey MVCCKey curRawKey []byte curValue []byte - curExcluded bool results pebbleResults intents pebble.Batch // mostRecentTS stores the largest timestamp observed that is equal to or @@ -202,9 +377,14 @@ func (p *pebbleMVCCScanner) release() { // init sets bounds on the underlying pebble iterator, and initializes other // fields not set by the calling method. -func (p *pebbleMVCCScanner) init(txn *roachpb.Transaction, ui uncertainty.Interval) { +func (p *pebbleMVCCScanner) init( + txn *roachpb.Transaction, ui uncertainty.Interval, trackLastOffsets int, +) { p.itersBeforeSeek = maxItersBeforeSeek / 2 - p.curExcluded = false + if trackLastOffsets > 0 { + p.results.lastOffsetsEnabled = true + p.results.lastOffsets = make([]int, trackLastOffsets) + } if txn != nil { p.txn = txn @@ -237,6 +417,10 @@ func (p *pebbleMVCCScanner) get(ctx context.Context) { func (p *pebbleMVCCScanner) scan( ctx context.Context, ) (*roachpb.Span, roachpb.ResumeReason, int64, error) { + if p.wholeRows && !p.results.lastOffsetsEnabled { + return nil, 0, 0, errors.AssertionFailedf("cannot use wholeRows without trackLastOffsets") + } + p.isGet = false if p.reverse { if !p.iterSeekReverse(MVCCKey{Key: p.end}) { @@ -256,32 +440,33 @@ func (p *pebbleMVCCScanner) scan( return nil, 0, 0, p.err } - if p.resumeReason != 0 && (p.curExcluded || p.advanceKey()) { + if p.resumeReason != 0 { + resumeKey := p.resumeKey + if len(resumeKey) == 0 { + if !p.advanceKey() { + return nil, 0, 0, nil // nothing to resume + } + resumeKey = p.curUnsafeKey.Key + } + var resumeSpan *roachpb.Span - // curKey was not added to results, so it needs to be included in the - // resume span. if p.reverse { // NB: this is equivalent to: - // append(roachpb.Key(nil), p.curKey.Key...).Next() + // append(roachpb.Key(nil), resumeKey...).Next() // but with half the allocations. - curKey := p.curUnsafeKey.Key - curKeyCopy := make(roachpb.Key, len(curKey), len(curKey)+1) - copy(curKeyCopy, curKey) + resumeKeyCopy := make(roachpb.Key, len(resumeKey), len(resumeKey)+1) + copy(resumeKeyCopy, resumeKey) resumeSpan = &roachpb.Span{ Key: p.start, - EndKey: curKeyCopy.Next(), + EndKey: resumeKeyCopy.Next(), } } else { resumeSpan = &roachpb.Span{ - Key: append(roachpb.Key(nil), p.curUnsafeKey.Key...), + Key: append(roachpb.Key(nil), resumeKey...), EndKey: p.end, } } - var resumeNextBytes int64 - if p.resumeReason == roachpb.RESUME_BYTE_LIMIT && p.curExcluded { - resumeNextBytes = int64(p.results.sizeOf(len(p.curRawKey), len(p.curValue))) - } - return resumeSpan, p.resumeReason, resumeNextBytes, nil + return resumeSpan, p.resumeReason, p.resumeNextBytes, nil } return nil, 0, 0, nil } @@ -362,7 +547,7 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool { if p.curUnsafeKey.Timestamp.Less(p.ts) { // 1. Fast path: there is no intent and our read timestamp is newer // than the most recent version's timestamp. - return p.addAndAdvance(ctx, p.curRawKey, p.curValue) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.curRawKey, p.curValue) } // ts == read_ts @@ -383,7 +568,7 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool { // 3. There is no intent and our read timestamp is equal to the most // recent version's timestamp. - return p.addAndAdvance(ctx, p.curRawKey, p.curValue) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.curRawKey, p.curValue) } // ts > read_ts @@ -434,7 +619,7 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool { } if len(p.meta.RawBytes) != 0 { // 7. Emit immediately if the value is inline. - return p.addAndAdvance(ctx, p.curRawKey, p.meta.RawBytes) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.curRawKey, p.meta.RawBytes) } if p.meta.Txn == nil { @@ -559,7 +744,7 @@ func (p *pebbleMVCCScanner) getAndAdvance(ctx context.Context) bool { // addAndAdvance to take an MVCCKey explicitly. p.curUnsafeKey.Timestamp = metaTS p.keyBuf = EncodeKeyToBuf(p.keyBuf[:0], p.curUnsafeKey) - return p.addAndAdvance(ctx, p.keyBuf, value) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.keyBuf, value) } // 13. If no value in the intent history has a sequence number equal to // or less than the read, we must ignore the intents laid down by the @@ -710,28 +895,68 @@ func (p *pebbleMVCCScanner) advanceKeyAtNewKey(key []byte) bool { // Adds the specified key and value to the result set, excluding tombstones unless // p.tombstones is true. Advances to the next key unless we've reached the max // results limit. -func (p *pebbleMVCCScanner) addAndAdvance(ctx context.Context, rawKey []byte, val []byte) bool { +func (p *pebbleMVCCScanner) addAndAdvance( + ctx context.Context, key roachpb.Key, rawKey []byte, val []byte, +) bool { // Don't include deleted versions len(val) == 0, unless we've been instructed // to include tombstones in the results. - if len(val) > 0 || p.tombstones { - // Check if we should apply the targetBytes limit at all. We do this either - // if this is not the first result or if allowEmpty is true. - if p.targetBytes > 0 && (p.results.count > 0 || p.allowEmpty) { - size := p.results.bytes - nextSize := int64(p.results.sizeOf(len(rawKey), len(val))) - // Check if we actually exceeded the limit. - if size >= p.targetBytes || (p.targetBytesAvoidExcess && size+nextSize > p.targetBytes) { - p.curExcluded = true - p.resumeReason = roachpb.RESUME_BYTE_LIMIT - return false + if len(val) == 0 && !p.tombstones { + return p.advanceKey() + } + + // Check if adding the key would exceed a limit. + if p.targetBytes > 0 && (p.results.bytes >= p.targetBytes || (p.targetBytesAvoidExcess && + p.results.bytes+int64(p.results.sizeOf(len(rawKey), len(val))) > p.targetBytes)) { + p.resumeReason = roachpb.RESUME_BYTE_LIMIT + p.resumeNextBytes = int64(p.results.sizeOf(len(rawKey), len(val))) + + } else if p.maxKeys > 0 && p.results.count >= p.maxKeys { + p.resumeReason = roachpb.RESUME_KEY_LIMIT + } + + if p.resumeReason != 0 { + // If we exceeded a limit, but we're not allowed to return an empty result, + // then make sure we include the first key in the result. If wholeRows is + // enabled, then also make sure we complete the first SQL row. + if !p.allowEmpty && + (p.results.count == 0 || (p.wholeRows && p.results.continuesFirstRow(key))) { + p.resumeReason = 0 + p.resumeNextBytes = 0 + } else { + p.resumeKey = key + + // If requested, remove any partial SQL rows from the end of the result. + if p.wholeRows { + trimmedKey, err := p.results.maybeTrimPartialLastRow(key) + if err != nil { + p.err = err + return false + } + if trimmedKey != nil { + p.resumeKey = trimmedKey + } } - } - if err := p.results.put(ctx, rawKey, val, p.memAccount); err != nil { - p.err = errors.Wrapf(err, "scan with start key %s", p.start) return false } - if p.maxKeys > 0 && p.results.count >= p.maxKeys { - p.curExcluded = false + } + + if err := p.results.put(ctx, rawKey, val, p.memAccount); err != nil { + p.err = errors.Wrapf(err, "scan with start key %s", p.start) + return false + } + + // Check if we hit the key limit just now to avoid scanning further before + // checking the key limit above on the next iteration. This has a small cost + // (~0.5% for large scans), but avoids the potentially large cost of scanning + // lots of garbage before the next key -- especially when maxKeys is small. + if p.maxKeys > 0 && p.results.count >= p.maxKeys { + // If we're not allowed to return partial SQL rows, check whether the last + // KV pair in the result has the maximum column family ID of the row. If so, + // we can return early. However, if it doesn't then we can't know yet + // whether the row is complete or not, because the final column families of + // the row may have been omitted (if they are all NULL values) -- to find + // out, we must continue scanning to the next key and handle it above. + if !p.wholeRows || p.results.lastRowHasFinalColumnFamily() { p.resumeReason = roachpb.RESUME_KEY_LIMIT return false } @@ -765,7 +990,7 @@ func (p *pebbleMVCCScanner) seekVersion( if p.curUnsafeKey.Timestamp.LessEq(seekTS) { p.incrementItersBeforeSeek() if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) { - return p.addAndAdvance(ctx, p.curRawKey, p.curValue) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.curRawKey, p.curValue) } // Iterate through uncertainty interval. Though we found a value in // the interval, it may not be uncertainty. This is because seekTS @@ -790,7 +1015,7 @@ func (p *pebbleMVCCScanner) seekVersion( return p.advanceKeyAtNewKey(origKey) } if !uncertaintyCheck || p.curUnsafeKey.Timestamp.LessEq(p.ts) { - return p.addAndAdvance(ctx, p.curRawKey, p.curValue) + return p.addAndAdvance(ctx, p.curUnsafeKey.Key, p.curRawKey, p.curValue) } // Iterate through uncertainty interval. See the comment above about why // a value in this interval is not necessarily cause for an uncertainty diff --git a/pkg/storage/pebble_mvcc_scanner_test.go b/pkg/storage/pebble_mvcc_scanner_test.go index dda15f0f5973..779567407007 100644 --- a/pkg/storage/pebble_mvcc_scanner_test.go +++ b/pkg/storage/pebble_mvcc_scanner_test.go @@ -92,7 +92,7 @@ func TestMVCCScanWithManyVersionsAndSeparatedIntents(t *testing.T) { tombstones: false, failOnMoreRecent: false, } - mvccScanner.init(nil /* txn */, uncertainty.Interval{}) + mvccScanner.init(nil /* txn */, uncertainty.Interval{}, 0 /* trackLastOffsets */) _, _, _, err = mvccScanner.scan(context.Background()) require.NoError(t, err) @@ -151,7 +151,7 @@ func TestMVCCScanWithLargeKeyValue(t *testing.T) { end: roachpb.Key("e"), ts: ts, } - mvccScanner.init(nil /* txn */, uncertainty.Interval{}) + mvccScanner.init(nil /* txn */, uncertainty.Interval{}, 0 /* trackLastOffsets */) _, _, _, err := mvccScanner.scan(context.Background()) require.NoError(t, err) @@ -228,7 +228,7 @@ func TestMVCCScanWithMemoryAccounting(t *testing.T) { end: makeKey(nil, 11), ts: hlc.Timestamp{WallTime: 50}, } - scanner.init(&txn1, ui1) + scanner.init(&txn1, ui1, 0 /* trackLastOffsets */) cleanup := scannerWithAccount(ctx, st, scanner, 6000) resumeSpan, resumeReason, resumeNextBytes, err := scanner.scan(ctx) require.Nil(t, resumeSpan) @@ -244,7 +244,7 @@ func TestMVCCScanWithMemoryAccounting(t *testing.T) { end: makeKey(nil, 11), ts: hlc.Timestamp{WallTime: 50}, } - scanner.init(&txn1, ui1) + scanner.init(&txn1, ui1, 0 /* trackLastOffsets */) cleanup = scannerWithAccount(ctx, st, scanner, 6000) resumeSpan, resumeReason, resumeNextBytes, err = scanner.scan(ctx) require.Nil(t, resumeSpan) @@ -264,7 +264,7 @@ func TestMVCCScanWithMemoryAccounting(t *testing.T) { ts: hlc.Timestamp{WallTime: 50}, inconsistent: inconsistent, } - scanner.init(nil, uncertainty.Interval{}) + scanner.init(nil, uncertainty.Interval{}, 0 /* trackLastOffsets */) cleanup = scannerWithAccount(ctx, st, scanner, 100) resumeSpan, resumeReason, resumeNextBytes, err = scanner.scan(ctx) require.Nil(t, resumeSpan) diff --git a/pkg/storage/testdata/mvcc_histories/max_keys b/pkg/storage/testdata/mvcc_histories/max_keys index f75a7d5dc689..d1ff72a1191c 100644 --- a/pkg/storage/testdata/mvcc_histories/max_keys +++ b/pkg/storage/testdata/mvcc_histories/max_keys @@ -7,6 +7,12 @@ with ts=1,0 put k=aa v=val-aa put k=c v=val-c put k=e v=val-e + put k=/row1/0 v=r1a + put k=/row1/1 v=r1b + put k=/row1/4 v=r1e # column family 2-3 omitted (e.g. if all NULLs) + put k=/row2 v=r2a + put k=/row3 v=r3a + put k=/row3/1 v=r3b del k=aa ts=2,0 ---- >> at end: @@ -15,6 +21,12 @@ data: "aa"/2.000000000,0 -> / data: "aa"/1.000000000,0 -> /BYTES/val-aa data: "c"/1.000000000,0 -> /BYTES/val-c data: "e"/1.000000000,0 -> /BYTES/val-e +data: /Table/1/1/"row1"/0/1.000000000,0 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/1.000000000,0 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/1.000000000,0 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/1.000000000,0 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/1.000000000,0 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/1.000000000,0 -> /BYTES/r3b # Limit 1 works. run ok @@ -135,6 +147,12 @@ meta: "m"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=11.000000000, data: "m"/11.000000000,0 -> /BYTES/c meta: "n"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=30} ts=11.000000000,0 del=false klen=12 vlen=6 ih={{10 /BYTES/a}{20 /BYTES/b}} mergeTs= txnDidNotUpdateMeta=false data: "n"/11.000000000,0 -> /BYTES/c +data: /Table/1/1/"row1"/0/1.000000000,0 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/1.000000000,0 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/1.000000000,0 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/1.000000000,0 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/1.000000000,0 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/1.000000000,0 -> /BYTES/r3b run ok with t=A ts=11,0 max=3 @@ -155,6 +173,12 @@ data: "k"/11.000000000,0 -> /BYTES/c data: "l"/11.000000000,0 -> /BYTES/c data: "m"/11.000000000,0 -> /BYTES/c data: "n"/11.000000000,0 -> /BYTES/c +data: /Table/1/1/"row1"/0/1.000000000,0 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/1.000000000,0 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/1.000000000,0 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/1.000000000,0 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/1.000000000,0 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/1.000000000,0 -> /BYTES/r3b # Same case as above, except with a committed value at the key after MaxKeys. @@ -197,3 +221,117 @@ meta: "m"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=12.000000000, data: "m"/12.000000000,0 -> /BYTES/c data: "m"/11.000000000,0 -> /BYTES/c data: "n"/11.000000000,0 -> /BYTES/c +data: /Table/1/1/"row1"/0/1.000000000,0 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/1.000000000,0 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/1.000000000,0 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/1.000000000,0 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/1.000000000,0 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/1.000000000,0 -> /BYTES/r3b + +# Whole SQL rows. +run ok +with ts=12,0 + scan k=/ end=/z max=2 + scan k=/ end=/z max=2 wholeRows + scan k=/ end=/z max=2 wholeRows allowEmpty +---- +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: resume span [/Table/1/1/"row1"/4/1,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> + +run ok +with ts=12,0 + scan k=/ end=/z max=1 wholeRows allowEmpty + scan k=/ end=/z max=2 wholeRows allowEmpty + scan k=/ end=/z max=3 wholeRows allowEmpty + scan k=/ end=/z max=4 wholeRows allowEmpty + scan k=/ end=/z max=5 wholeRows allowEmpty + scan k=/ end=/z max=6 wholeRows allowEmpty +---- +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: resume span [/Table/1/1/"row3"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: resume span [/Table/1/1/"row3"/0,/Table/1/1/"z"/0) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 + +# Whole SQL rows in reverse. +run ok +with ts=12,0 + scan k=/ end=/z max=1 reverse + scan k=/ end=/z max=1 reverse wholeRows + scan k=/ end=/z max=1 reverse wholeRows allowEmpty +---- +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/0/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/1/1/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> + +run ok +with ts=12,0 + scan k=/ end=/z max=1 reverse wholeRows allowEmpty + scan k=/ end=/z max=2 reverse wholeRows allowEmpty + scan k=/ end=/z max=3 reverse wholeRows allowEmpty + scan k=/ end=/z max=4 reverse wholeRows allowEmpty + scan k=/ end=/z max=5 reverse wholeRows allowEmpty + scan k=/ end=/z max=6 reverse wholeRows allowEmpty +---- +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/1/1/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_KEY_LIMIT nextBytes=0 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @1.000000000,0 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @1.000000000,0 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @1.000000000,0 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @1.000000000,0 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @1.000000000,0 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @1.000000000,0 + +# WholeRows on non-SQL row data. +run ok +with ts=10,0 + scan k=a end=z max=3 wholeRows +---- +scan: "a" -> /BYTES/val-a @1.000000000,0 +scan: "c" -> /BYTES/val-c @1.000000000,0 +scan: "e" -> /BYTES/val-e @1.000000000,0 diff --git a/pkg/storage/testdata/mvcc_histories/target_bytes b/pkg/storage/testdata/mvcc_histories/target_bytes index 85793b99b770..705c95b8b192 100644 --- a/pkg/storage/testdata/mvcc_histories/target_bytes +++ b/pkg/storage/testdata/mvcc_histories/target_bytes @@ -19,6 +19,12 @@ with ts=123,45 put k=a v=abcdef put k=c v=ghijkllkjihg put k=e v=mnopqr + put k=/row1/0 v=r1a + put k=/row1/1 v=r1b + put k=/row1/4 v=r1e # column family 2-3 omitted (i.e. if all NULLs) + put k=/row2 v=r2a + put k=/row3 v=r3a + put k=/row3/1 v=r3b ---- >> at end: data: "a"/123.000000000,45 -> /BYTES/abcdef @@ -28,6 +34,12 @@ data: "aa"/1.000000000,0 -> /BYTES/willbetombstoned data: "c"/123.000000000,45 -> /BYTES/ghijkllkjihg data: "e"/123.000000000,45 -> /BYTES/mnopqr data: "e"/1.000000000,0 -> /BYTES/sameasabove +data: /Table/1/1/"row1"/0/123.000000000,45 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/123.000000000,45 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/123.000000000,45 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/123.000000000,45 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/123.000000000,45 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/123.000000000,45 -> /BYTES/r3b # Scans without or with a large target size return all results. @@ -577,6 +589,173 @@ scan: "aa" -> / @250.000000000,1 scan: "a" -> /BYTES/abcdef @123.000000000,45 scan: 98 bytes (target 98) +# Whole SQL rows. +run ok +with ts=300,0 avoidExcess + scan k=/ end=/z targetbytes=1 + scan k=/ end=/z targetbytes=1 wholeRows + scan k=/ end=/z targetbytes=1 wholeRows allowEmpty +---- +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: resume span [/Table/1/1/"row1"/1/1,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=41 +scan: 40 bytes (target 1) +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 122 bytes (target 1) +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 0 bytes (target 1) +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> + +run ok +with ts=300,0 avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=121 + scan k=/ end=/z targetbytes=122 + scan k=/ end=/z targetbytes=123 +---- +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=41 +scan: 0 bytes (target 121) +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 122 bytes (target 122) +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 122 bytes (target 123) + +run ok +with ts=300,0 avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=161 + scan k=/ end=/z targetbytes=162 + scan k=/ end=/z targetbytes=163 +---- +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: resume span [/Table/1/1/"row2"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 122 bytes (target 161) +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/"row3"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 162 bytes (target 162) +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/"row3"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 162 bytes (target 163) + +run ok +with ts=300,0 avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=242 + scan k=/ end=/z targetbytes=243 +---- +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/"row3"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=41 +scan: 162 bytes (target 242) +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: 243 bytes (target 243) + +# Whole SQL rows in reverse. +run ok +with ts=300,0 avoidExcess + scan k=/ end=/z targetbytes=1 reverse + scan k=/ end=/z targetbytes=1 reverse wholeRows + scan k=/ end=/z targetbytes=1 reverse wholeRows allowEmpty +---- +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/0/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 41 bytes (target 1) +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 81 bytes (target 1) +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/1/1/NULL) RESUME_BYTE_LIMIT nextBytes=41 +scan: 0 bytes (target 1) +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> + +run ok +with ts=300,0 reverse avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=80 + scan k=/ end=/z targetbytes=81 + scan k=/ end=/z targetbytes=82 +---- +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row3"/1/1/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 0 bytes (target 80) +scan: /Table/1/1/""/0-/Table/1/1/"z"/0 -> +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 81 bytes (target 81) +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 81 bytes (target 82) + +run ok +with ts=300,0 reverse avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=120 + scan k=/ end=/z targetbytes=121 + scan k=/ end=/z targetbytes=122 +---- +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row2"/0/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 81 bytes (target 120) +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_BYTE_LIMIT nextBytes=41 +scan: 121 bytes (target 121) +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_BYTE_LIMIT nextBytes=41 +scan: 121 bytes (target 122) + +run ok +with ts=300,0 reverse avoidExcess allowEmpty wholeRows + scan k=/ end=/z targetbytes=242 + scan k=/ end=/z targetbytes=243 +---- +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: resume span [/Table/1/1/""/0,/Table/1/1/"row1"/4/1/NULL) RESUME_BYTE_LIMIT nextBytes=40 +scan: 121 bytes (target 242) +scan: /Table/1/1/"row3"/1/1 -> /BYTES/r3b @123.000000000,45 +scan: /Table/1/1/"row3"/0 -> /BYTES/r3a @123.000000000,45 +scan: /Table/1/1/"row2"/0 -> /BYTES/r2a @123.000000000,45 +scan: /Table/1/1/"row1"/4/1 -> /BYTES/r1e @123.000000000,45 +scan: /Table/1/1/"row1"/1/1 -> /BYTES/r1b @123.000000000,45 +scan: /Table/1/1/"row1"/0 -> /BYTES/r1a @123.000000000,45 +scan: 243 bytes (target 243) + +# WholeRows on non-SQL row data. +run ok +with ts=300,0 + scan k=a end=/z targetbytes=110 avoidExcess wholeRows +---- +scan: "a" -> /BYTES/abcdef @123.000000000,45 +scan: "c" -> /BYTES/ghijkllkjihg @123.000000000,45 +scan: "e" -> /BYTES/mnopqr @123.000000000,45 +scan: resume span [/Table/1/1/"row1"/0,/Table/1/1/"z"/0) RESUME_BYTE_LIMIT nextBytes=40 +scan: 108 bytes (target 110) # Regression test for a bug simiar to #46652: Test appropriate termination when # the TargetBytes-th kv pair is in the intent history. @@ -607,17 +786,17 @@ with t=A ts=11,0 targetbytes=32 ---- scan: "k" -> /BYTES/b @11.000000000,0 scan: "l" -> /BYTES/b @11.000000000,0 -scan: resume span ["m","o") RESUME_BYTE_LIMIT nextBytes=82 +scan: resume span ["m","o") RESUME_BYTE_LIMIT nextBytes=25 scan: 50 bytes (target 32) scan: "k" -> /BYTES/b @11.000000000,0 -scan: resume span ["l","o") RESUME_BYTE_LIMIT nextBytes=82 +scan: resume span ["l","o") RESUME_BYTE_LIMIT nextBytes=25 scan: 25 bytes (target 32) scan: "n" -> /BYTES/b @11.000000000,0 scan: "m" -> /BYTES/b @11.000000000,0 -scan: resume span ["k","l\x00") RESUME_BYTE_LIMIT nextBytes=82 +scan: resume span ["k","l\x00") RESUME_BYTE_LIMIT nextBytes=25 scan: 50 bytes (target 32) scan: "n" -> /BYTES/b @11.000000000,0 -scan: resume span ["k","m\x00") RESUME_BYTE_LIMIT nextBytes=82 +scan: resume span ["k","m\x00") RESUME_BYTE_LIMIT nextBytes=25 scan: 25 bytes (target 32) >> at end: txn: "A" meta={id=00000000 key=/Min pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=20} lock=true stat=PENDING rts=11.000000000,0 wto=false gul=0,0 @@ -636,3 +815,9 @@ meta: "m"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=11.000000000, data: "m"/11.000000000,0 -> /BYTES/c meta: "n"/0,0 -> txn={id=00000000 key=/Min pri=0.00000000 epo=0 ts=11.000000000,0 min=0,0 seq=30} ts=11.000000000,0 del=false klen=12 vlen=6 ih={{10 /BYTES/a}{20 /BYTES/b}} mergeTs= txnDidNotUpdateMeta=false data: "n"/11.000000000,0 -> /BYTES/c +data: /Table/1/1/"row1"/0/123.000000000,45 -> /BYTES/r1a +data: /Table/1/1/"row1"/1/1/123.000000000,45 -> /BYTES/r1b +data: /Table/1/1/"row1"/4/1/123.000000000,45 -> /BYTES/r1e +data: /Table/1/1/"row2"/0/123.000000000,45 -> /BYTES/r2a +data: /Table/1/1/"row3"/0/123.000000000,45 -> /BYTES/r3a +data: /Table/1/1/"row3"/1/1/123.000000000,45 -> /BYTES/r3b