Skip to content

Commit

Permalink
Merge pull request #94698 from KaiSun314/push-get-pagination-into-mvcc
Browse files Browse the repository at this point in the history
storage: Refactor pagination for the Get command into the MVCC layer
  • Loading branch information
nvanbenschoten authored Jan 12, 2023
2 parents 5bf7569 + ce24131 commit e8a5d34
Show file tree
Hide file tree
Showing 23 changed files with 398 additions and 294 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,12 @@ func verifyCleanup(
return fmt.Errorf("expected no heartbeat")
}
}
_, intent, err := storage.MVCCGet(ctx, eng, key, hlc.MaxTimestamp, storage.MVCCGetOptions{
intentRes, err := storage.MVCCGet(ctx, eng, key, hlc.MaxTimestamp, storage.MVCCGetOptions{
Inconsistent: true,
})
require.NoError(t, err)
if intent != nil {
return fmt.Errorf("found unexpected write intent: %s", intent)
if intentRes.Intent != nil {
return fmt.Errorf("found unexpected write intent: %s", intentRes.Intent)
}
return nil
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,17 +1062,17 @@ func TestPartialRollbackOnEndTransaction(t *testing.T) {

// The second write has been rolled back; verify that the remaining
// value is from the first write.
res, i, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
res, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if i != nil {
t.Errorf("found intent, expected none: %+v", i)
if res.Intent != nil {
t.Errorf("found intent, expected none: %+v", res.Intent)
}
if res == nil {
if res.Value == nil {
t.Errorf("no value found, expected one")
} else {
s, err := res.GetBytes()
s, err := res.Value.GetBytes()
if err != nil {
t.Fatal(err)
}
Expand Down
52 changes: 15 additions & 37 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,27 +32,7 @@ func Get(
h := cArgs.Header
reply := resp.(*roachpb.GetResponse)

if h.MaxSpanRequestKeys < 0 || h.TargetBytes < 0 {
// Receipt of a GetRequest with negative MaxSpanRequestKeys or TargetBytes
// indicates that the request was part of a batch that has already exhausted
// its limit, which means that we should *not* serve the request and return
// a ResumeSpan for this GetRequest.
//
// This mirrors the logic in MVCCScan, though the logic in MVCCScan is
// slightly lower in the stack.
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
if h.MaxSpanRequestKeys < 0 {
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
} else if h.TargetBytes < 0 {
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
}
return result.Result{}, nil
}

var val *roachpb.Value
var intent *roachpb.Intent
var err error
val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
getRes, err := storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
Expand All @@ -61,29 +41,27 @@ func Get(
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
MaxKeys: cArgs.Header.MaxSpanRequestKeys,
TargetBytes: cArgs.Header.TargetBytes,
AllowEmpty: cArgs.Header.AllowEmpty,
})
if err != nil {
return result.Result{}, err
}
if val != nil {
// NB: This calculation is different from Scan, since Scan responses include
// the key/value pair while Get only includes the value.
numBytes := int64(len(val.RawBytes))
if h.TargetBytes > 0 && h.AllowEmpty && numBytes > h.TargetBytes {
reply.ResumeSpan = &roachpb.Span{Key: args.Key}
reply.ResumeReason = roachpb.RESUME_BYTE_LIMIT
reply.ResumeNextBytes = numBytes
return result.Result{}, nil
}
reply.NumKeys = 1
reply.NumBytes = numBytes
reply.ResumeSpan = getRes.ResumeSpan
reply.ResumeReason = getRes.ResumeReason
reply.ResumeNextBytes = getRes.ResumeNextBytes
reply.NumKeys = getRes.NumKeys
reply.NumBytes = getRes.NumBytes
if reply.ResumeSpan != nil {
return result.Result{}, nil
}
var intents []roachpb.Intent
if intent != nil {
intents = append(intents, *intent)
if getRes.Intent != nil {
intents = append(intents, *getRes.Intent)
}

reply.Value = val
reply.Value = getRes.Value
if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
var intentVals []roachpb.KeyValue
// NOTE: MVCCGet uses a Prefix iterator, so we want to use one in
Expand All @@ -103,7 +81,7 @@ func Get(
}

var res result.Result
if args.KeyLocking != lock.None && h.Txn != nil && val != nil {
if args.KeyLocking != lock.None && h.Txn != nil && getRes.Value != nil {
acq := roachpb.MakeLockAcquisition(h.Txn, args.Key, lock.Unreplicated)
res.Local.AcquiredLocks = []roachpb.LockAcquisition{acq}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,25 @@ func Refresh(
// specifying consistent=false. Note that we include tombstones,
// which must be considered as updates on refresh.
log.VEventf(ctx, 2, "refresh %s @[%s-%s]", args.Span(), refreshFrom, refreshTo)
val, intent, err := storage.MVCCGet(ctx, reader, args.Key, refreshTo, storage.MVCCGetOptions{
res, err := storage.MVCCGet(ctx, reader, args.Key, refreshTo, storage.MVCCGetOptions{
Inconsistent: true,
Tombstones: true,
})

if err != nil {
return result.Result{}, err
} else if val != nil {
if ts := val.Timestamp; refreshFrom.Less(ts) {
} else if res.Value != nil {
if ts := res.Value.Timestamp; refreshFrom.Less(ts) {
return result.Result{},
roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_COMMITTED_VALUE, args.Key, ts)
}
}

// Check if an intent which is not owned by this transaction was written
// at or beneath the refresh timestamp.
if intent != nil && intent.Txn.ID != h.Txn.ID {
if res.Intent != nil && res.Intent.Txn.ID != h.Txn.ID {
return result.Result{}, roachpb.NewRefreshFailedError(roachpb.RefreshFailedError_REASON_INTENT,
intent.Key, intent.Txn.WriteTimestamp)
res.Intent.Key, res.Intent.Txn.WriteTimestamp)
}

return result.Result{}, nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_refresh_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ func TestRefreshRangeTimeBoundIterator(t *testing.T) {
// have previously performed a consistent read at the lower time-bound to
// prove that there are no intents present that would be missed by the time-
// bound iterator.
if val, intent, err := storage.MVCCGet(ctx, db, k, ts1, storage.MVCCGetOptions{}); err != nil {
if res, err := storage.MVCCGet(ctx, db, k, ts1, storage.MVCCGetOptions{}); err != nil {
t.Fatal(err)
} else if intent != nil {
} else if res.Intent != nil {
t.Fatalf("got unexpected intent: %v", intent)
} else if !val.EqualTagAndData(v) {
t.Fatalf("expected %v, got %v", v, val)
} else if !res.Value.EqualTagAndData(v) {
t.Fatalf("expected %v, got %v", v, res.Value)
}

// Now the real test: a transaction at ts2 has been pushed to ts3
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,17 @@ func TestResolveIntentAfterPartialRollback(t *testing.T) {

// The second write has been rolled back; verify that the remaining
// value is from the first write.
res, i, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
res, err := storage.MVCCGet(ctx, batch, k, ts2, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if i != nil {
t.Errorf("%s: found intent, expected none: %+v", k, i)
if res.Intent != nil {
t.Errorf("%s: found intent, expected none: %+v", k, res.Intent)
}
if res == nil {
if res.Value == nil {
t.Errorf("%s: no value found, expected one", k)
} else {
s, err := res.GetBytes()
s, err := res.Value.GetBytes()
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/batcheval/cmd_subsume.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,17 @@ func Subsume(
// the maximum timestamp to ensure that we see an intent if one exists,
// regardless of what timestamp it is written at.
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, readWriter, descKey, hlc.MaxTimestamp,
intentRes, err := storage.MVCCGet(ctx, readWriter, descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return result.Result{}, errors.Wrap(err, "fetching local range descriptor")
} else if intent == nil {
} else if intentRes.Intent == nil {
return result.Result{}, errors.Errorf("range missing intent on its local descriptor")
}
val, _, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, intent.Txn.WriteTimestamp, intent.Txn)
valRes, err := storage.MVCCGetAsTxn(ctx, readWriter, descKey, intentRes.Intent.Txn.WriteTimestamp, intentRes.Intent.Txn)
if err != nil {
return result.Result{}, errors.Wrap(err, "fetching local range descriptor as txn")
} else if val != nil {
} else if valRes.Value != nil {
return result.Result{}, errors.Errorf("non-deletion intent on local range descriptor")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/batcheval/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,17 @@ func readProvisionalVal(
ctx context.Context, reader storage.Reader, usePrefixIter bool, intent *roachpb.Intent,
) (roachpb.KeyValue, error) {
if usePrefixIter {
val, _, err := storage.MVCCGetAsTxn(
valRes, err := storage.MVCCGetAsTxn(
ctx, reader, intent.Key, intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return roachpb.KeyValue{}, err
}
if val == nil {
if valRes.Value == nil {
// Intent is a deletion.
return roachpb.KeyValue{}, nil
}
return roachpb.KeyValue{Key: intent.Key, Value: *val}, nil
return roachpb.KeyValue{Key: intent.Key, Value: *valRes.Value}, nil
}
res, err := storage.MVCCScanAsTxn(
ctx, reader, intent.Key, intent.Key.Next(), intent.Txn.WriteTimestamp, intent.Txn,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func mergeWithData(t *testing.T, retries int64) {

// Verify no intents remains on range descriptor keys.
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(lhsDesc.StartKey), keys.RangeDescriptorKey(rhsDesc.StartKey)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,12 @@ func TestLeaseholdersRejectClockUpdateWithJump(t *testing.T) {
if advance := ts3.GoTime().Sub(ts2.GoTime()); advance != 0 {
t.Fatalf("expected clock not to advance, but it advanced by %s", advance)
}
val, _, err := storage.MVCCGet(context.Background(), store.Engine(), key, ts3,
valRes, err := storage.MVCCGet(context.Background(), store.Engine(), key, ts3,
storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
if a, e := mustGetInt(val), incArgs.Increment*numCmds; a != e {
if a, e := mustGetInt(valRes.Value), incArgs.Increment*numCmds; a != e {
t.Errorf("expected %d, got %d", e, a)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func TestStoreRangeSplitIntents(t *testing.T) {
t.Fatal(err)
}
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(roachpb.RKeyMin), keys.RangeDescriptorKey(splitKeyAddr)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
ctx, store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Errorf("failed to read consistent range descriptor for key %s: %+v", key, err)
Expand Down Expand Up @@ -667,7 +667,7 @@ func TestStoreRangeSplitIdempotency(t *testing.T) {
t.Fatal(err)
}
for _, key := range []roachpb.Key{keys.RangeDescriptorKey(roachpb.RKeyMin), keys.RangeDescriptorKey(splitKeyAddr)} {
if _, _, err := storage.MVCCGet(
if _, err := storage.MVCCGet(
context.Background(), store.Engine(), key, store.Clock().Now(), storage.MVCCGetOptions{},
); err != nil {
t.Fatal(err)
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/loqrecovery/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,17 @@ func applyReplicaUpdate(
// there will be keys not represented by any ranges or vice
// versa).
key := keys.RangeDescriptorKey(update.StartKey.AsRKey())
value, intent, err := storage.MVCCGet(
res, err := storage.MVCCGet(
ctx, readWriter, key, clock.Now(), storage.MVCCGetOptions{Inconsistent: true})
if value == nil {
if res.Value == nil {
return PrepareReplicaReport{}, errors.Errorf(
"failed to find a range descriptor for range %v", key)
}
if err != nil {
return PrepareReplicaReport{}, err
}
var localDesc roachpb.RangeDescriptor
if err := value.GetProto(&localDesc); err != nil {
if err := res.Value.GetProto(&localDesc); err != nil {
return PrepareReplicaReport{}, err
}
// Sanity check that this is indeed the right range.
Expand All @@ -213,7 +213,7 @@ func applyReplicaUpdate(
// we won't be able to do MVCCPut later during recovery for the new
// descriptor. It should have no effect on the recovery process itself as
// transaction would be rolled back anyways.
if intent != nil {
if res.Intent != nil {
// We rely on the property that transactions involving the range
// descriptor always start on the range-local descriptor's key. When there
// is an intent, this means that it is likely that the transaction did not
Expand Down Expand Up @@ -251,24 +251,24 @@ func applyReplicaUpdate(
// plan is trivially achievable, due to any of the above problems. But
// in the common case, we do expect one to exist.
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
report.AbortedTransactionID = res.Intent.Txn.ID

// A crude form of the intent resolution process: abort the
// transaction by deleting its record.
txnKey := keys.TransactionKey(intent.Txn.Key, intent.Txn.ID)
txnKey := keys.TransactionKey(res.Intent.Txn.Key, res.Intent.Txn.ID)
if _, err := storage.MVCCDelete(ctx, readWriter, &ms, txnKey, hlc.Timestamp{}, hlc.ClockTimestamp{}, nil); err != nil {
return PrepareReplicaReport{}, err
}
update := roachpb.LockUpdate{
Span: roachpb.Span{Key: intent.Key},
Txn: intent.Txn,
Span: roachpb.Span{Key: res.Intent.Key},
Txn: res.Intent.Txn,
Status: roachpb.ABORTED,
}
if _, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, &ms, update, storage.MVCCResolveWriteIntentOptions{}); err != nil {
return PrepareReplicaReport{}, err
}
report.AbortedTransaction = true
report.AbortedTransactionID = intent.Txn.ID
report.AbortedTransactionID = res.Intent.Txn.ID
}
newDesc := localDesc
replicas := []roachpb.ReplicaDescriptor{
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1805,18 +1805,18 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
// if one exists, regardless of what timestamp it is written at.
desc := r.descRLocked()
descKey := keys.RangeDescriptorKey(desc.StartKey)
_, intent, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp,
intentRes, err := storage.MVCCGet(ctx, r.Engine(), descKey, hlc.MaxTimestamp,
storage.MVCCGetOptions{Inconsistent: true})
if err != nil {
return false, err
} else if intent == nil {
} else if intentRes.Intent == nil {
return false, nil
}
val, _, err := storage.MVCCGetAsTxn(
ctx, r.Engine(), descKey, intent.Txn.WriteTimestamp, intent.Txn)
valRes, err := storage.MVCCGetAsTxn(
ctx, r.Engine(), descKey, intentRes.Intent.Txn.WriteTimestamp, intentRes.Intent.Txn)
if err != nil {
return false, err
} else if val != nil {
} else if valRes.Value != nil {
return false, nil
}

Expand All @@ -1833,7 +1833,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
return true, nil
}
r.mu.mergeComplete = mergeCompleteCh
r.mu.mergeTxnID = intent.Txn.ID
r.mu.mergeTxnID = intentRes.Intent.Txn.ID
// The RHS of a merge is not permitted to quiesce while a mergeComplete
// channel is installed. (If the RHS is quiescent when the merge commits, any
// orphaned followers would fail to queue themselves for GC.) Unquiesce the
Expand All @@ -1854,11 +1854,11 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
b := &kv.Batch{}
b.Header.Timestamp = r.Clock().Now()
b.AddRawRequest(&roachpb.PushTxnRequest{
RequestHeader: roachpb.RequestHeader{Key: intent.Txn.Key},
RequestHeader: roachpb.RequestHeader{Key: intentRes.Intent.Txn.Key},
PusherTxn: roachpb.Transaction{
TxnMeta: enginepb.TxnMeta{Priority: enginepb.MinTxnPriority},
},
PusheeTxn: intent.Txn,
PusheeTxn: intentRes.Intent.Txn,
PushType: roachpb.PUSH_ABORT,
})
if err := r.store.DB().Run(ctx, b); err != nil {
Expand All @@ -1882,7 +1882,7 @@ func (r *Replica) maybeWatchForMergeLocked(ctx context.Context) (bool, error) {
switch pushTxnRes.PusheeTxn.Status {
case roachpb.PENDING, roachpb.STAGING:
log.Fatalf(ctx, "PushTxn returned while merge transaction %s was still %s",
intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
intentRes.Intent.Txn.ID.Short(), pushTxnRes.PusheeTxn.Status)
case roachpb.COMMITTED:
// If PushTxn claims that the transaction committed, then the transaction
// definitely committed.
Expand Down
Loading

0 comments on commit e8a5d34

Please sign in to comment.