Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Refactor pagination for the Get command into the MVCC layer #94698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ func verifyCleanup(
return fmt.Errorf("expected no heartbeat")
}
}
_, intent, err := storage.MVCCGet(ctx, eng, key, hlc.MaxTimestamp, storage.MVCCGetOptions{
intentRes, err := storage.MVCCGet(ctx, eng, key, hlc.MaxTimestamp, storage.MVCCGetOptions{
Inconsistent: true,
})
require.NoError(t, err)
if intent != nil {
return fmt.Errorf("found unexpected write intent: %s", intent)
if intentRes.Intent != nil {
return fmt.Errorf("found unexpected write intent: %s", intentRes.Intent)
}
return nil
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,17 +1062,17 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {

// The second write has been rolled back; verify that the remaining
// value is from the first write.
res, i, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
res, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if i != nil {
t.Errorf("found intent, expected none: %+v", i)
if res.Intent != nil {
t.Errorf("found intent, expected none: %+v", res.Intent)
}
if res == nil {
if res.Value == nil {
t.Errorf("no value found, expected one")
} else {
s, err := res.GetBytes()
s, err := res.Value.GetBytes()
if err != nil {
t.Fatal(err)
}
Expand Down
52 changes: 15 additions & 37 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,7 @@ func Get(
h := cArgs.Header
reply := resp.(*roachpb.GetResponse)

if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes
// indicates that the request was part of a batch that has already exhausted
// its limit, which means that we should *not* serve the request and return
// a ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
if h.MaxSpanRequestKeys < 0 {
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if h.TargetBytes < 0 {
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result.Result{}, nil
}

var val *roachpb.Value
var intent *roachpb.Intent
var err error
val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
getRes, err := storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
Expand All @@ -61,29 +41,27 @@ func Get(
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
AllowEmpty: cArgs.Header.AllowEmpty,
})
if err != nil {
return result.Result{}, err
}
if val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes {
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
reply.ResumeNextBytes = numBytes
return result.Result{}, nil
}
reply.NumKeys = 1
reply.NumBytes = numBytes
reply.ResumeSpan = getRes.ResumeSpan
reply.ResumeReason = getRes.ResumeReason
reply.ResumeNextBytes = getRes.ResumeNextBytes
reply.NumKeys = getRes.NumKeys
reply.NumBytes = getRes.NumBytes
if reply.ResumeSpan != nil {
return result.Result{}, nil
}
var intents []roachpb.Intent
if intent != nil {
intents = append(intents, *intent)
if getRes.Intent != nil {
intents = append(intents, *getRes.Intent)
}

reply.Value = val
reply.Value = getRes.Value
if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
var intentVals []roachpb.KeyValue
// NOTE: MVCCGet uses a Prefix iterator, so we want to use one in
Expand All @@ -103,7 +81,7 @@ func Get(
}

var res result.Result
if args.KeyLocking != lock.None && h.Txn != nil && val != nil {
if args.KeyLocking != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated)
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,25 @@ func Refresh(
// specifying consistent=false. Note that we include tombstones,
// which must be considered as updates on refresh.
log.VEventf(ctx, 2, "refresh %s @[%s-%s]", args.Span(), refreshFrom, refreshTo)
val, intent, err := storage.MVCCGet(ctx, reader, args.Key, refreshTo, storage.MVCCGetOptions{
res, err := storage.MVCCGet(ctx, reader, args.Key, refreshTo, storage.MVCCGetOptions{
Inconsistent: true,
Tombstones: true,
})

if err != nil {
return result.Result{}, err
} else if val != nil {
if ts := val.Timestamp; refreshFrom.Less(ts) {
} else if res.Value != nil {
if ts := res.Value.Timestamp; refreshFrom.Less(ts) {
return result.Result{},
roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_COMMITTED_VALUE, args.Key, ts)
}
}

// Check if an intent which is not owned by this transaction was written
// at or beneath the refresh timestamp.
if intent != nil && intent.Txn.ID != h.Txn.ID {
if res.Intent != nil && res.Intent.Txn.ID != h.Txn.ID {
return result.Result{}, roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_INTENT,
intent.Key, intent.Txn.WriteTimestamp)
res.Intent.Key, res.Intent.Txn.WriteTimestamp)
}

return result.Result{}, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// have previously performed a consistent read at the lower time-bound to
// prove that there are no intents present that would be missed by the time-
// bound iterator.
if val, intent, err := storage.MVCCGet(ctx, db, k, ts1, storage.MVCCGetOptions{}); err != nil {
if res, err := storage.MVCCGet(ctx, db, k, ts1, storage.MVCCGetOptions{}); err != nil {
t.Fatal(err)
} else if intent != nil {
} else if res.Intent != nil {
t.Fatalf("got unexpected intent: %v", intent)
} else if !val.EqualTagAndData(v) {
t.Fatalf("expected %v, got %v", v, val)
} else if !res.Value.EqualTagAndData(v) {
t.Fatalf("expected %v, got %v", v, res.Value)
}

// Now the real test: a transaction at ts2 has been pushed to ts3
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {

// The second write has been rolled back; verify that the remaining
// value is from the first write.
res, i, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
res, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if i != nil {
t.Errorf("%s: found intent, expected none: %+v", k, i)
if res.Intent != nil {
t.Errorf("%s: found intent, expected none: %+v", k, res.Intent)
}
if res == nil {
if res.Value == nil {
t.Errorf("%s: no value found, expected one", k)
} else {
s, err := res.GetBytes()
s, err := res.Value.GetBytes()
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ func Subsume(
// the maximum timestamp to ensure that we see an intent if one exists,
// regardless of what timestamp it is written at.
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, readWriter, descKey, hlc.MaxTimestamp,
intentRes, err := storage.MVCCGet(ctx, readWriter, descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return result.Result{}, errors.Wrap(err, "fetching local range descriptor")
} else if intent == nil {
} else if intentRes.Intent == nil {
return result.Result{}, errors.Errorf("range missing intent on its local descriptor")
}
val, _, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, intent.Txn.WriteTimestamp, intent.Txn)
valRes, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, intentRes.Intent.Txn.WriteTimestamp, intentRes.Intent.Txn)
if err != nil {
return result.Result{}, errors.Wrap(err, "fetching local range descriptor as txn")
} else if val != nil {
} else if valRes.Value != nil {
return result.Result{}, errors.Errorf("non-deletion intent on local range descriptor")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func readProvisionalVal(
ctx context.Context, reader storage.Reader, usePrefixIter bool, intent *roachpb.Intent,
) (roachpb.KeyValue, error) {
if usePrefixIter {
val, _, err := storage.MVCCGetAsTxn(
valRes, err := storage.MVCCGetAsTxn(
ctx, reader, intent.Key, intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return roachpb.KeyValue{}, err
}
if val == nil {
if valRes.Value == nil {
// Intent is a deletion.
return roachpb.KeyValue{}, nil
}
return roachpb.KeyValue{Key: intent.Key, Value: *val}, nil
return roachpb.KeyValue{Key: intent.Key, Value: *valRes.Value}, nil
}
res, err := storage.MVCCScanAsTxn(
ctx, reader, intent.Key, intent.Key.Next(), intent.Txn.WriteTimestamp, intent.Txn,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func mergeWithData(t *testing.T, retries int64) {

// Verify no intents remains on range descriptor keys.
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(lhsDesc.StartKey), keys.RangeDescriptorKey(rhsDesc.StartKey)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
if advance := ts3.GoTime().Sub(ts2.GoTime()); advance != 0 {
t.Fatalf("expected clock not to advance, but it advanced by %s", advance)
}
val, _, err := storage.MVCCGet(context.Background(), store.Engine(), key, ts3,
valRes, err := storage.MVCCGet(context.Background(), store.Engine(), key, ts3,
storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if a, e := mustGetInt(val), incArgs.Increment*numCmds; a != e {
if a, e := mustGetInt(valRes.Value), incArgs.Increment*numCmds; a != e {
t.Errorf("expected %d, got %d", e, a)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestStoreRangeSplitIntents(t *testing.T) {
t.Fatal(err)
}
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(roachpb.RKeyMin), keys.RangeDescriptorKey(splitKeyAddr)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Errorf("failed to read consistent range descriptor for key %s: %+v", key, err)
Expand Down Expand Up @@ -667,7 +667,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
t.Fatal(err)
}
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(roachpb.RKeyMin), keys.RangeDescriptorKey(splitKeyAddr)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
context.Background(), store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,17 @@ func applyReplicaUpdate(
// there will be keys not represented by any ranges or vice
// versa).
key := keys.RangeDescriptorKey(update.StartKey.AsRKey())
value, intent, err := storage.MVCCGet(
res, err := storage.MVCCGet(
ctx, readWriter, key, clock.Now(), storage.MVCCGetOptions{Inconsistent: true})
if value == nil {
if res.Value == nil {
return PrepareReplicaReport{}, errors.Errorf(
"failed to find a range descriptor for range %v", key)
}
if err != nil {
return PrepareReplicaReport{}, err
}
var localDesc roachpb.RangeDescriptor
if err := value.GetProto(&localDesc); err != nil {
if err := res.Value.GetProto(&localDesc); err != nil {
return PrepareReplicaReport{}, err
}
// Sanity check that this is indeed the right range.
Expand All @@ -213,7 +213,7 @@ func applyReplicaUpdate(
// we won't be able to do MVCCPut later during recovery for the new
// descriptor. It should have no effect on the recovery process itself as
// transaction would be rolled back anyways.
if intent != nil {
if res.Intent != nil {
// We rely on the property that transactions involving the range
// descriptor always start on the range-local descriptor's key. When there
// is an intent, this means that it is likely that the transaction did not
Expand Down Expand Up @@ -251,24 +251,24 @@ func applyReplicaUpdate(
// plan is trivially achievable, due to any of the above problems. But
// in the common case, we do expect one to exist.
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
report.AbortedTransactionID = res.Intent.Txn.ID

// A crude form of the intent resolution process: abort the
// transaction by deleting its record.
txnKey := keys.TransactionKey(intent.Txn.Key, intent.Txn.ID)
txnKey := keys.TransactionKey(res.Intent.Txn.Key, res.Intent.Txn.ID)
if _, err := storage.MVCCDelete(ctx, readWriter, &ms, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil); err != nil {
return PrepareReplicaReport{}, err
}
update := roachpb.LockUpdate{
Span: roachpb.Span{Key: intent.Key},
Txn: intent.Txn,
Span: roachpb.Span{Key: res.Intent.Key},
Txn: res.Intent.Txn,
Status: roachpb.ABORTED,
}
if _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
report.AbortedTransactionID = res.Intent.Txn.ID
}
newDesc := localDesc
replicas := []roachpb.ReplicaDescriptor{
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,18 +1805,18 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// if one exists, regardless of what timestamp it is written at.
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp,
intentRes, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return false, err
} else if intent == nil {
} else if intentRes.Intent == nil {
return false, nil
}
val, _, err := storage.MVCCGetAsTxn(
ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn)
valRes, err := storage.MVCCGetAsTxn(
ctx, r.Engine(), descKey, intentRes.Intent.Txn.WriteTimestamp, intentRes.Intent.Txn)
if err != nil {
return false, err
} else if val != nil {
} else if valRes.Value != nil {
return false, nil
}

Expand All @@ -1833,7 +1833,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
return true, nil
}
r.mu.mergeComplete = mergeCompleteCh
r.mu.mergeTxnID = intent.Txn.ID
r.mu.mergeTxnID = intentRes.Intent.Txn.ID
// The RHS of a merge is not permitted to quiesce while a mergeComplete
// channel is installed. (If the RHS is quiescent when the merge commits, any
// orphaned followers would fail to queue themselves for GC.) Unquiesce the
Expand All @@ -1854,11 +1854,11 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
b := &kv.Batch{}
b.Header.Timestamp = r.Clock().Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intent.Txn.Key},
RequestHeader: roachpb.RequestHeader{Key: intentRes.Intent.Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: enginepb.MinTxnPriority},
},
PusheeTxn: intent.Txn,
PusheeTxn: intentRes.Intent.Txn,
PushType: roachpb.PUSH_ABORT,
})
if err := r.store.DB().Run(ctx, b); err != nil {
Expand All @@ -1882,7 +1882,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
switch pushTxnRes.PusheeTxn.Status {
case roachpb.PENDING, roachpb.STAGING:
log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s",
intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
case roachpb.COMMITTED:
// If PushTxn claims that the transaction committed, then the transaction
// definitely committed.
Expand Down
Loading