Skip to content

Commit

Permalink
kvserver: assert that lease applied index of a subsumed range cannot …
Browse files Browse the repository at this point in the history
…be bumped

The fix to #44878 ensures that the follower replicas of a subsumed range cannot
activate closed timestamps broadcast by it until the merge aborts (note that if
merge commits, the subsumed range is destroyed). The implicit assumption we
relied on is that the lease applied index of a range is never bumped while it
is subsumed.

This commit adds an assertion to make sure this invariant is always upheld. It
also adds a test that acts as a sanity check to ensure that, if the merge is
aborted, future and pending requests are allowed to go through just fine.

Release note: None
  • Loading branch information
aayushshah15 committed Jun 26, 2020
1 parent 86f49d0 commit 5f0b494
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 0 deletions.
102 changes: 102 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,6 +1317,19 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
}
}

func checkConsistencyArgs(desc *roachpb.RangeDescriptor) *roachpb.CheckConsistencyRequest {
return &roachpb.CheckConsistencyRequest{
RequestHeader: roachpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
WithDiff: false,
Mode: 1,
Checkpoint: false,
Terminate: nil,
}
}

// TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range
// in a merge loses its lease while a merge is in progress, the new leaseholder
// does not incorrectly serve traffic before the merge completes.
Expand Down Expand Up @@ -1513,6 +1526,95 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
}
}

// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the the following:
// 1. While a range is subsumed, CheckConsistency requests wait until the merge
// is complete before proceeding.
// 2. Once a merge is aborted, pending (and future) requests will be allowed to
// be proposed. This is meant as a sanity check for the assertion at the
// end of Replica.propose().
func TestStoreRangeMergeCheckConsistencyAfterSubsumption(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := kvserver.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true

// Install a hook to control when the merge transaction aborts.
mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries
abortMergeTxn := make(chan struct{})
storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, r := range ba.Requests {
if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil {
mergeEndTxnReceived <- ba.Txn
<-abortMergeTxn
return &roachpb.Error{
Message: "abort the merge for test",
}
}
}
return nil
}

mtc := &multiTestContext{
storeConfig: &storeCfg,
startWithSingleRange: true,
}

mtc.Start(t, 2)
defer mtc.Stop()

// Create the ranges to be merged. Put both ranges on both stores, but give
// the second store the lease on the RHS.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0])
if err != nil {
t.Fatal(err)
}
mtc.replicateRange(lhsDesc.RangeID, 1)
mtc.replicateRange(rhsDesc.RangeID, 1)
mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1)

// Launch the merge.
mergeErr := make(chan *roachpb.Error)
go func() {
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args)
mergeErr <- pErr
}()

// Wait for the merge transaction to send its EndTxn request. It won't
// be able to complete just yet, thanks to the hook we installed above.
<-mergeEndTxnReceived

checkConsistencyResp := make(chan interface{})
go func() {
args := checkConsistencyArgs(rhsDesc)
_, pErr := kv.SendWrapped(ctx, mtc.stores[1].TestSender(), args)
checkConsistencyResp <- pErr
}()

select {
case <-checkConsistencyResp:
t.Fatalf("expected the consistency check to wait until the merge was complete")
case <-time.After(1 * time.Second):
}

// Let the merge abort, and then ensure that the consistency check
// successfully goes through.
close(abortMergeTxn)

pErr := <-mergeErr
require.IsType(t, &roachpb.Error{}, pErr)
require.Regexp(t, "abort the merge for test", pErr.Message)

select {
case pErr := <-checkConsistencyResp:
require.Nil(t, pErr)
case <-time.After(1 * time.Second):
t.Fatal("consistency check did not successfully complete after the merge was complete")
}
}

// TestStoreRangeMergeConcurrentRequests tests merging ranges that are serving
// other traffic concurrently.
func TestStoreRangeMergeConcurrentRequests(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,22 @@ func (r *Replica) propose(ctx context.Context, p *ProposalData) (index int64, pE
if err != nil {
return 0, roachpb.NewError(err)
}
if maxLeaseIndex != 0 {
r.mu.RLock()
defer r.mu.RUnlock()
if r.mergeInProgressRLocked() {
// The correctness of range merges relies on the invariant that the
// LeaseAppliedIndex of the range is not bumped while a range is in its
// subsumed state. If this invariant is ever violated, the follower
// replicas of the subsumed range (RHS) are free to activate any future
// closed timestamp updates even before the merge completes. This would be
// a serializability violation.
//
// See comment block in executeReadOnlyBatchWithServerSideRefreshes for
// details.
log.Fatalf(ctx, "lease applied index bumped while the range was subsumed")
}
}
return int64(maxLeaseIndex), nil
}

Expand Down

0 comments on commit 5f0b494

Please sign in to comment.