Skip to content

Commit

Permalink
kv/kvserver: allow the merge transaction to be pushed
Browse files Browse the repository at this point in the history
Historically we have not allowed the merge transaction to be pushed. The
reason we disabled this was because of the hazard due to attempting to
refresh reads on the RHS of the merge after the SubsumeRequest has been
sent. The `SubsumeRequest` effectively freezes the RHS until the merge
commits or aborts. In order to side-step this hazard, this change ensures
that nothing should prevent the merge transaction from either committing
or aborting.

Release note: None
  • Loading branch information
ajwerner committed Feb 14, 2021
1 parent 0edc56a commit bc1d7b8
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 45 deletions.
69 changes: 32 additions & 37 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,37 +1241,10 @@ func TestStoreRangeMergeSplitRace_MergeWins(t *testing.T) {
// transaction's only intent so far is on P's local range descriptor, and so the
// split transaction can happily commit.
//
// The merge transaction then continues, writing an intent on Q's local
// descriptor. Since the merge transaction is executing at an earlier timestamp
// than the split transaction, the intent is written "under" the updated
// descriptor written by the split transaction.
//
// In the past, the merge transaction would simply push its commit timestamp
// forward and proceed, even though, upon committing, it would discover that it
// was forbidden from committing with a pushed timestamp and abort instead. (For
// why merge transactions cannot forward their commit timestamps, see the
// discussion on the retry loop within AdminMerge.) This was problematic. Before
// the doomed merge transaction attempted to commit, it would send a Subsume
// request, launching a merge watcher goroutine on Q. This watcher goroutine
// could incorrectly think that the merge transaction committed. Why? To
// determine whether a merge has truly aborted, the watcher goroutine sends a
// Get(/Meta2/QEndKey) request with a read uncommitted isolation level. If the
// Get request returns either nil or a descriptor for a different range, the
// merge is assumed to have committed. In this case, unfortunately, QEndKey is
// the Q's end key post-split. After all, the split has committed and updated
// Q's in-memory descriptor. The split transactions intents are cleaned up
// asynchronously, however, and since the watcher goroutine is not performing a
// consistent read it will not wait for the intents to be cleaned up. So
// Get(/Meta2/QEndKey) might return nil, in which case the watcher goroutine
// will incorrectly infer that the merge committed. (Note that the watcher
// goroutine can't perform a consistent read, as that would look up the
// transaction record on Q and deadlock, since Q is blocked for merging.)
//
// The bug was fixed by updating Q's local descriptor with a conditional put
// instead of a put. This forces the merge transaction to fail early if writing
// the intent would require forwarding the commit timestamp. In other words,
// this ensures that the merge watcher goroutine is never launched if the RHS
// local descriptor is updated while the merge transaction is executing.
// The merge transaction then continues, reading and writing an intent on Q's
// local descriptor. The locking nature of the read request to Q's local
// descriptor ensures that the merge transaction will observe the post-split
// value for Q.
func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -1280,20 +1253,34 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {

var distSender *kvcoord.DistSender
var lhsDescKey atomic.Value
var lhsStartKey atomic.Value
var launchSplit int64
var mergeRetries int64
var mergePreSplit atomic.Value
var splitCommit atomic.Value
var mergeEndTxnTimestamp atomic.Value
testingRequestFilter := func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, req := range ba.Requests {
if get := req.GetGet(); get != nil && get.KeyLocking != lock.None {
if v := lhsDescKey.Load(); v != nil && v.(roachpb.Key).Equal(get.Key) {
// If this is the first merge attempt, launch the split
// before the merge's first locking read succeeds.
if atomic.CompareAndSwapInt64(&launchSplit, 1, 0) {
mergePreSplit.Store(ba.Txn.ReadTimestamp)
_, pErr := kv.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("c")))
return pErr
}
// Otherwise, record that the merge retried and proceed.
atomic.AddInt64(&mergeRetries, 1)
// Otherwise, proceed.
}
}
if split := req.GetAdminSplit(); split != nil && split.Key.Equal(roachpb.Key("c")) {
splitCommit.Store(ba.Timestamp)
}
if endTxn := req.GetEndTxn(); endTxn != nil {
ct := endTxn.InternalCommitTrigger
startKey, _ := lhsStartKey.Load().(roachpb.RKey)
if ct != nil && ct.MergeTrigger != nil && startKey != nil &&
startKey.Equal(ct.MergeTrigger.LeftDesc.StartKey) {
mergeEndTxnTimestamp.Store(ba.Txn.ReadTimestamp)
}
}
}
Expand Down Expand Up @@ -1321,13 +1308,21 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
}
lhsDescKey.Store(keys.RangeDescriptorKey(lhsDesc.StartKey))
atomic.StoreInt64(&launchSplit, 1)
lhsStartKey.Store(lhsDesc.StartKey)

mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
if _, pErr := kv.SendWrapped(ctx, distSender, mergeArgs); pErr != nil {
_, pErr := kv.SendWrapped(ctx, distSender, mergeArgs)
if pErr != nil {
t.Fatal(pErr)
}
if atomic.LoadInt64(&mergeRetries) == 0 {
t.Fatal("expected merge to retry at least once due to concurrent split")
mergePreSplitTS := mergePreSplit.Load().(hlc.Timestamp)
splitTS := splitCommit.Load().(hlc.Timestamp)
mergePostSplitTS := mergeEndTxnTimestamp.Load().(hlc.Timestamp)
if splitTS.LessEq(mergePreSplitTS) {
t.Fatalf("expected merge to start before concurrent split, %v <= %v", splitTS, mergePreSplitTS)
}
if mergePostSplitTS.LessEq(splitTS) {
t.Fatalf("expected merge to start before concurrent split, %v <= %v", mergePostSplitTS, splitTS)
}
}

Expand Down
24 changes: 16 additions & 8 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,12 +564,6 @@ func (r *Replica) AdminMerge(
log.Event(ctx, "merge txn begins")
txn.SetDebugName(mergeTxnName)

// Observe the commit timestamp to force a client-side retry. See the
// comment on the retry loop after this closure for details.
//
// TODO(benesch): expose a proper API for preventing the fast path.
_ = txn.CommitTimestamp()

// Pipelining might send QueryIntent requests to the RHS after the RHS has
// noticed the merge and started blocking all traffic. This causes the merge
// transaction to deadlock. Just turn pipelining off; the structure of the
Expand Down Expand Up @@ -708,6 +702,18 @@ func (r *Replica) AdminMerge(
return err
}

// Refresh the transaction so that the transaction won't try to refresh
// its reads on the RHS after it is frozen.
if err := txn.ForceRefresh(ctx); err != nil {
return err
}

// Freeze the commit timestamp of the transaction to prevent future pushes
// due to high-priority reads from other transactions. Any attempt to
// refresh reads on the RHS would result in a stalled merge because the
// RHS will be frozen after the Subsume is sent.
_ = txn.CommitTimestamp()

// Intents have been placed, so the merge is now in its critical phase. Get
// a consistent view of the data from the right-hand range. If the merge
// commits, we'll write this data to the left-hand range in the merge
Expand Down Expand Up @@ -761,8 +767,10 @@ func (r *Replica) AdminMerge(
// we'll unlock the right-hand range, giving the next, fresh transaction a
// chance to succeed.
//
// Note that client.DB.Txn performs retries using the same transaction, so we
// have to use our own retry loop.
// A second reason to eschew kv.DB.Txn() is that the API to disable pipelining
// is finicky and only allows disabling pipelining before any operations have
// been sent, even in prior epochs. Calling DisablePipelining() on a restarted
// transaction yields an error.
for {
txn := kv.NewTxn(ctx, r.store.DB(), r.NodeID())
err := runMergeTxn(txn)
Expand Down

0 comments on commit bc1d7b8

Please sign in to comment.