Skip to content

Commit

Permalink
storage: Refactor pagination for the Get command into the MVCC layer
Browse files Browse the repository at this point in the history
Informs: cockroachdb#77228

Refactor key and byte pagination for the Get command into the MVCC layer
Previously, pagination was done in pkg/kv/kvserver/batcheval/cmd_get.go,
but to ensure consistency in where pagination logic is located across
all commands, we move the pagination logic for the Get command to the
MVCC layer where the pagination logic for most other commands is. This
also enables better parameter testing in the storage package since we
can leverage e.g. data-driven tests like TestMVCCHistories.

Release note: None
  • Loading branch information
KaiSun314 committed Jan 6, 2023
1 parent 1d7bd69 commit 4b3134a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 31 deletions.
35 changes: 6 additions & 29 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,6 @@ func Get(
h := cArgs.Header
reply := resp.(*roachpb.GetResponse)

if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes
// indicates that the request was part of a batch that has already exhausted
// its limit, which means that we should *not* serve the request and return
// a ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
if h.MaxSpanRequestKeys < 0 {
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if h.TargetBytes < 0 {
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result.Result{}, nil
}

var val *roachpb.Value
var intent *roachpb.Intent
var err error
Expand All @@ -61,22 +44,16 @@ func Get(
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
AllowEmpty: cArgs.Header.AllowEmpty,
Reply: reply,
})
if err != nil {
return result.Result{}, err
}
if val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes {
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
reply.ResumeNextBytes = numBytes
return result.Result{}, nil
}
reply.NumKeys = 1
reply.NumBytes = numBytes
if reply.ResumeSpan != nil {
return result.Result{}, nil
}
var intents []roachpb.Intent
if intent != nil {
Expand Down
54 changes: 53 additions & 1 deletion pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,22 @@ type MVCCGetOptions struct {
// or not. It is usually set by read-only requests that have resolved their
// conflicts before they begin their MVCC scan.
DontInterleaveIntents bool
// MaxKeys is the maximum number of kv pairs returned from this operation.
// The non-negative value represents an unbounded Get. The value -1 returns
// no keys in the result and a ResumeSpan equal to the request span is
// returned.
MaxKeys int64
// TargetBytes is a byte threshold to limit the amount of data pulled into
// memory during a Get operation. The zero value indicates no limit. The
// value -1 returns no keys in the result. A positive value represents an
// unbounded Get unless AllowEmpty is set. If an empty result is returned,
// then a ResumeSpan equal to the request span is returned.
TargetBytes int64
// AllowEmpty will return an empty result if the request key exceeds the
// TargetBytes limit.
AllowEmpty bool
// Reply is a pointer to the Get response object.
Reply *roachpb.GetResponse
}

func (opts *MVCCGetOptions) validate() error {
Expand Down Expand Up @@ -1001,6 +1017,24 @@ func MVCCGet(
func MVCCGetWithValueHeader(
ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions,
) (*roachpb.Value, *roachpb.Intent, enginepb.MVCCValueHeader, error) {
if opts.MaxKeys < 0 || opts.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxKeys or TargetBytes indicates
// that the request was part of a batch that has already exhausted its
// limit, which means that we should *not* serve the request and return a
// ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
if opts.Reply != nil {
opts.Reply.ResumeSpan = &roachpb.Span{Key: key}
if opts.MaxKeys < 0 {
opts.Reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if opts.TargetBytes < 0 {
opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
}
return nil, nil, enginepb.MVCCValueHeader{}, nil
}
iter := newMVCCIterator(
reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
Expand All @@ -1009,7 +1043,25 @@ func MVCCGetWithValueHeader(
)
defer iter.Close()
value, intent, vh, err := mvccGetWithValueHeader(ctx, iter, key, timestamp, opts)
return value.ToPointer(), intent, vh, err
val := value.ToPointer()
if err == nil && val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if opts.TargetBytes > 0 && opts.AllowEmpty && numBytes > opts.TargetBytes {
if opts.Reply != nil {
opts.Reply.ResumeSpan = &roachpb.Span{Key: key}
opts.Reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
opts.Reply.ResumeNextBytes = numBytes
}
return nil, nil, enginepb.MVCCValueHeader{}, nil
}
if opts.Reply != nil {
opts.Reply.NumKeys = 1
opts.Reply.NumBytes = numBytes
}
}
return val, intent, vh, err
}

// gcassert:inline
Expand Down
11 changes: 10 additions & 1 deletion pkg/storage/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var (
// merge [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put [t=<name>] [ts=<int>[,<int>]] [localTs=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> v=<string> [raw]
// put_rangekey ts=<int>[,<int>] [localTs=<int>[,<int>]] k=<key> end=<key>
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]]
// get [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [maxKeys=<int>,targetBytes=<int>] [allowEmpty]
// scan [t=<name>] [ts=<int>[,<int>]] [resolve [status=<txnstatus>]] k=<key> [end=<key>] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=<int>[,<int>]] [globalUncertaintyLimit=<int>[,<int>]] [max=<max>] [targetbytes=<target>] [wholeRows[=<int>]] [allowEmpty]
// export [k=<key>] [end=<key>] [ts=<int>[,<int>]] [kTs=<int>[,<int>]] [startTs=<int>[,<int>]] [maxIntents=<int>] [allRevisions] [targetSize=<int>] [maxSize=<int>] [stopMidKey] [fingerprint]
//
Expand Down Expand Up @@ -1230,6 +1230,15 @@ func cmdGet(e *evalCtx) error {
}
opts.Uncertainty.GlobalLimit = txn.GlobalUncertaintyLimit
}
if e.hasArg("maxKeys") {
e.scanArg("maxKeys", &opts.MaxKeys)
}
if e.hasArg("targetBytes") {
e.scanArg("targetBytes", &opts.TargetBytes)
}
if e.hasArg("allowEmpty") {
opts.AllowEmpty = true
}

return e.withReader(func(r storage.Reader) error {
val, intent, err := storage.MVCCGet(e.ctx, r, key, ts, opts)
Expand Down
52 changes: 52 additions & 0 deletions pkg/storage/testdata/mvcc_histories/get_pagination
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Test MaxKeys and TargetBytes for get.

# Put some test data.
run ok
put k=a v=a ts=1
put k=b v=bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb ts=1
----
>> at end:
data: "a"/1.000000000,0 -> /BYTES/a
data: "b"/1.000000000,0 -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb

# Return none since maxKeys < 0.
run ok
get k=a ts=2 maxKeys=-1
----
get: "a" -> <no data>

# Return value since maxKeys >= 0.
run ok
get k=a ts=2 maxKeys=1
----
get: "a" -> /BYTES/a @1.000000000,0

# Return none since targetBytes < 0.
run ok
get k=a ts=2 targetBytes=-1
----
get: "a" -> <no data>

# Return none since targetBytes is insufficient and allowEmpty is true.
run ok
get k=b ts=2 targetBytes=1 allowEmpty
----
get: "b" -> <no data>

# Return value since targetBytes is sufficient and allowEmpty is true.
run ok
get k=a ts=2 targetBytes=100 allowEmpty
----
get: "a" -> /BYTES/a @1.000000000,0

# Return value since targetBytes is insufficient and allowEmpty is false.
run ok
get k=b ts=2 targetBytes=1
----
get: "b" -> /BYTES/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb @1.000000000,0

# Return value since targetBytes is sufficient and allowEmpty is false.
run ok
get k=a ts=2 targetBytes=100
----
get: "a" -> /BYTES/a @1.000000000,0

0 comments on commit 4b3134a

Please sign in to comment.