diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 8a6506ce11aa..369e6fea0e26 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -32,23 +32,6 @@ func Get( h := cArgs.Header reply := resp.(*roachpb.GetResponse) - if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 { - // Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes - // indicates that the request was part of a batch that has already exhausted - // its limit, which means that we should *not* serve the request and return - // a ResumeSpan for this GetRequest. - // - // This mirrors the logic in MVCCScan, though the logic in MVCCScan is - // slightly lower in the stack. - reply.ResumeSpan = &roachpb.Span{Key: args.Key} - if h.MaxSpanRequestKeys < 0 { - reply.ResumeReason = roachpb.RESUME_KEY_LIMIT - } else if h.TargetBytes < 0 { - reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT - } - return result.Result{}, nil - } - var val *roachpb.Value var intent *roachpb.Intent var err error @@ -61,22 +44,16 @@ func Get( MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), LockTable: cArgs.Concurrency, DontInterleaveIntents: cArgs.DontInterleaveIntents, + MaxKeys: cArgs.Header.MaxSpanRequestKeys, + TargetBytes: cArgs.Header.TargetBytes, + AllowEmpty: cArgs.Header.AllowEmpty, + Reply: reply, }) if err != nil { return result.Result{}, err } - if val != nil { - // NB: This calculation is different from Scan, since Scan responses include - // the key/value pair while Get only includes the value. - numBytes := int64(len(val.RawBytes)) - if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes { - reply.ResumeSpan = &roachpb.Span{Key: args.Key} - reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT - reply.ResumeNextBytes = numBytes - return result.Result{}, nil - } - reply.NumKeys = 1 - reply.NumBytes = numBytes + if reply.ResumeSpan != nil { + return result.Result{}, nil } var intents []roachpb.Intent if intent != nil { diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 43e830b5ff65..06a0c06a37d0 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -900,6 +900,22 @@ type MVCCGetOptions struct { // or not. It is usually set by read-only requests that have resolved their // conflicts before they begin their MVCC scan. DontInterleaveIntents bool + // MaxKeys is the maximum number of kv pairs returned from this operation. + // The non-negative value represents an unbounded Get. The value -1 returns + // no keys in the result and a ResumeSpan equal to the request span is + // returned. + MaxKeys int64 + // TargetBytes is a byte threshold to limit the amount of data pulled into + // memory during a Get operation. The zero value indicates no limit. The + // value -1 returns no keys in the result. A positive value represents an + // unbounded Get unless AllowEmpty is set. If an empty result is returned, + // then a ResumeSpan equal to the request span is returned. + TargetBytes int64 + // AllowEmpty will return an empty result if the request key exceeds the + // TargetBytes limit. + AllowEmpty bool + // Reply is a pointer to the Get response object. + Reply *roachpb.GetResponse } func (opts *MVCCGetOptions) validate() error { @@ -1001,6 +1017,24 @@ func MVCCGet( func MVCCGetWithValueHeader( ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, enginepb.MVCCValueHeader, error) { + if opts.MaxKeys < 0 || opts.TargetBytes < 0 { + // Receipt of a GetRequest with negative MaxKeys or TargetBytes indicates + // that the request was part of a batch that has already exhausted its + // limit, which means that we should *not* serve the request and return a + // ResumeSpan for this GetRequest. + // + // This mirrors the logic in MVCCScan, though the logic in MVCCScan is + // slightly lower in the stack. + if opts.Reply != nil { + opts.Reply.ResumeSpan = &roachpb.Span{Key: key} + if opts.MaxKeys < 0 { + opts.Reply.ResumeReason = roachpb.RESUME_KEY_LIMIT + } else if opts.TargetBytes < 0 { + opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT + } + } + return nil, nil, enginepb.MVCCValueHeader{}, nil + } iter := newMVCCIterator( reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{ KeyTypes: IterKeyTypePointsAndRanges, @@ -1009,7 +1043,25 @@ func MVCCGetWithValueHeader( ) defer iter.Close() value, intent, vh, err := mvccGetWithValueHeader(ctx, iter, key, timestamp, opts) - return value.ToPointer(), intent, vh, err + val := value.ToPointer() + if err == nil && val != nil { + // NB: This calculation is different from Scan, since Scan responses include + // the key/value pair while Get only includes the value. + numBytes := int64(len(val.RawBytes)) + if opts.TargetBytes > 0 && opts.AllowEmpty && numBytes > opts.TargetBytes { + if opts.Reply != nil { + opts.Reply.ResumeSpan = &roachpb.Span{Key: key} + opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT + opts.Reply.ResumeNextBytes = numBytes + } + return nil, nil, enginepb.MVCCValueHeader{}, nil + } + if opts.Reply != nil { + opts.Reply.NumKeys = 1 + opts.Reply.NumBytes = numBytes + } + } + return val, intent, vh, err } // gcassert:inline diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index dc931c979183..e5c6f5cc070b 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -95,7 +95,7 @@ var ( // merge [t=] [ts=[,]] [resolve [status=]] k= v= [raw] // put [t=] [ts=[,]] [localTs=[,]] [resolve [status=]] k= v= [raw] // put_rangekey ts=[,] [localTs=[,]] k= end= -// get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] +// get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=,targetBytes=] [allowEmpty] // scan [t=] [ts=[,]] [resolve [status=]] k= [end=] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [max=] [targetbytes=] [wholeRows[=]] [allowEmpty] // export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxIntents=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] // @@ -1230,6 +1230,15 @@ func cmdGet(e *evalCtx) error { } opts.Uncertainty.GlobalLimit = txn.GlobalUncertaintyLimit } + if e.hasArg("maxKeys") { + e.scanArg("maxKeys", &opts.MaxKeys) + } + if e.hasArg("targetBytes") { + e.scanArg("targetBytes", &opts.TargetBytes) + } + if e.hasArg("allowEmpty") { + opts.AllowEmpty = true + } return e.withReader(func(r storage.Reader) error { val, intent, err := storage.MVCCGet(e.ctx, r, key, ts, opts) diff --git a/pkg/storage/testdata/mvcc_histories/get_pagination b/pkg/storage/testdata/mvcc_histories/get_pagination new file mode 100644 index 000000000000..98fe2539b71c --- /dev/null +++ b/pkg/storage/testdata/mvcc_histories/get_pagination @@ -0,0 +1,52 @@ +# Test MaxKeys and TargetBytes for get. + +# Put some test data. +run ok +put k=a v=a ts=1 +put k=b v=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb ts=1 +---- +>> at end: +data: "a"/1.000000000,0 -> /BYTES/a +data: "b"/1.000000000,0 -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb + +# Return none since maxKeys < 0. +run ok +get k=a ts=2 maxKeys=-1 +---- +get: "a" -> + +# Return value since maxKeys >= 0. +run ok +get k=a ts=2 maxKeys=1 +---- +get: "a" -> /BYTES/a @1.000000000,0 + +# Return none since targetBytes < 0. +run ok +get k=a ts=2 targetBytes=-1 +---- +get: "a" -> + +# Return none since targetBytes is insufficient and allowEmpty is true. +run ok +get k=b ts=2 targetBytes=1 allowEmpty +---- +get: "b" -> + +# Return value since targetBytes is sufficient and allowEmpty is true. +run ok +get k=a ts=2 targetBytes=100 allowEmpty +---- +get: "a" -> /BYTES/a @1.000000000,0 + +# Return value since targetBytes is insufficient and allowEmpty is false. +run ok +get k=b ts=2 targetBytes=1 +---- +get: "b" -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb @1.000000000,0 + +# Return value since targetBytes is sufficient and allowEmpty is false. +run ok +get k=a ts=2 targetBytes=100 +---- +get: "a" -> /BYTES/a @1.000000000,0