Skip to content

Commit

Permalink
kvserver: test interaction between ComputeChecksum and range merges
Browse files Browse the repository at this point in the history
This commit adds a "sanity check" test around the assertion at the
end of Replica.propose() that ensures that the lease applied index
of a subsumed range is never bumped.

Release note: None
  • Loading branch information
aayushshah15 committed Aug 4, 2020
1 parent 4ed88b9 commit 1458427
Showing 1 changed file with 103 additions and 0 deletions.
103 changes: 103 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,19 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) {
}
}

func checkConsistencyArgs(desc *roachpb.RangeDescriptor) *roachpb.CheckConsistencyRequest {
return &roachpb.CheckConsistencyRequest{
RequestHeader: roachpb.RequestHeader{
Key: desc.StartKey.AsRawKey(),
EndKey: desc.EndKey.AsRawKey(),
},
WithDiff: false,
Mode: 1,
Checkpoint: false,
Terminate: nil,
}
}

// TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range
// in a merge loses its lease while a merge is in progress, the new leaseholder
// does not incorrectly serve traffic before the merge completes.
Expand Down Expand Up @@ -1527,6 +1540,96 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) {
}
}

// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the the following:
// 1. While a range is subsumed, ComputeChecksum requests wait until the merge
// is complete before proceeding.
// 2. Once a merge is aborted, pending (and future) requests will be allowed to
// be proposed. An assertion at the end of Replica.propose() ensures that the
// lease applied index of a range cannot be bumped while it is subsumed. A large
// comment block at the end of Subsume() in cmd_subsume.go explains the hazard
// in detail. This test is meant as a sanity check for this assertion.
func TestStoreRangeMergeCheckConsistencyAfterSubsumption(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
storeCfg := kvserver.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true

// Install a hook to control when the merge transaction aborts.
mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries
abortMergeTxn := make(chan struct{})
storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
for _, r := range ba.Requests {
if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil {
mergeEndTxnReceived <- ba.Txn
<-abortMergeTxn
return &roachpb.Error{
Message: "abort the merge for test",
}
}
}
return nil
}

mtc := &multiTestContext{
storeConfig: &storeCfg,
startWithSingleRange: true,
}

mtc.Start(t, 2)
defer mtc.Stop()

// Create the ranges to be merged. Put both ranges on both stores, but give
// the second store the lease on the RHS.
lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0])
if err != nil {
t.Fatal(err)
}
mtc.replicateRange(lhsDesc.RangeID, 1)
mtc.replicateRange(rhsDesc.RangeID, 1)
mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1)

// Launch the merge.
mergeErr := make(chan *roachpb.Error)
go func() {
args := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
_, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args)
mergeErr <- pErr
}()

// Wait for the merge transaction to send its EndTxn request. It won't
// be able to complete just yet, thanks to the hook we installed above.
<-mergeEndTxnReceived

checkConsistencyResp := make(chan interface{})
go func() {
args := checkConsistencyArgs(rhsDesc)
_, pErr := kv.SendWrapped(ctx, mtc.stores[1].TestSender(), args)
checkConsistencyResp <- pErr
}()

select {
case <-checkConsistencyResp:
t.Fatalf("expected the consistency check to wait until the merge was complete")
case <-time.After(1 * time.Second):
}

// Let the merge abort, and then ensure that the consistency check
// successfully goes through.
close(abortMergeTxn)

pErr := <-mergeErr
require.IsType(t, &roachpb.Error{}, pErr)
require.Regexp(t, "abort the merge for test", pErr.Message)

testutils.SucceedsSoon(t, func() error {
pErr := <-checkConsistencyResp
require.Nil(t, pErr)
return nil
})
}

// TestStoreRangeMergeConcurrentRequests tests merging ranges that are serving
// other traffic concurrently.
func TestStoreRangeMergeConcurrentRequests(t *testing.T) {
Expand Down

0 comments on commit 1458427

Please sign in to comment.