diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ad704a7ce425..0fda10e841b4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -1330,6 +1330,19 @@ func TestStoreRangeMergeSplitRace_SplitWins(t *testing.T) { } } +func checkConsistencyArgs(desc *roachpb.RangeDescriptor) *roachpb.CheckConsistencyRequest { + return &roachpb.CheckConsistencyRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: desc.StartKey.AsRawKey(), + EndKey: desc.EndKey.AsRawKey(), + }, + WithDiff: false, + Mode: 1, + Checkpoint: false, + Terminate: nil, + } +} + // TestStoreRangeMergeRHSLeaseExpiration verifies that, if the right-hand range // in a merge loses its lease while a merge is in progress, the new leaseholder // does not incorrectly serve traffic before the merge completes. @@ -1527,6 +1540,96 @@ func TestStoreRangeMergeRHSLeaseExpiration(t *testing.T) { } } +// TestStoreRangeMergeCheckConsistencyAfterSubsumption verifies the the following: +// 1. While a range is subsumed, ComputeChecksum requests wait until the merge +// is complete before proceeding. +// 2. Once a merge is aborted, pending (and future) requests will be allowed to +// be proposed. An assertion at the end of Replica.propose() ensures that the +// lease applied index of a range cannot be bumped while it is subsumed. A large +// comment block at the end of Subsume() in cmd_subsume.go explains the hazard +// in detail. This test is meant as a sanity check for this assertion. +func TestStoreRangeMergeCheckConsistencyAfterSubsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + storeCfg := kvserver.TestStoreConfig(nil) + storeCfg.TestingKnobs.DisableReplicateQueue = true + storeCfg.TestingKnobs.DisableMergeQueue = true + + // Install a hook to control when the merge transaction aborts. + mergeEndTxnReceived := make(chan *roachpb.Transaction, 10) // headroom in case the merge transaction retries + abortMergeTxn := make(chan struct{}) + storeCfg.TestingKnobs.TestingRequestFilter = func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { + for _, r := range ba.Requests { + if et := r.GetEndTxn(); et != nil && et.InternalCommitTrigger.GetMergeTrigger() != nil { + mergeEndTxnReceived <- ba.Txn + <-abortMergeTxn + return &roachpb.Error{ + Message: "abort the merge for test", + } + } + } + return nil + } + + mtc := &multiTestContext{ + storeConfig: &storeCfg, + startWithSingleRange: true, + } + + mtc.Start(t, 2) + defer mtc.Stop() + + // Create the ranges to be merged. Put both ranges on both stores, but give + // the second store the lease on the RHS. + lhsDesc, rhsDesc, err := createSplitRanges(ctx, mtc.stores[0]) + if err != nil { + t.Fatal(err) + } + mtc.replicateRange(lhsDesc.RangeID, 1) + mtc.replicateRange(rhsDesc.RangeID, 1) + mtc.transferLease(ctx, rhsDesc.RangeID, 0, 1) + + // Launch the merge. + mergeErr := make(chan *roachpb.Error) + go func() { + args := adminMergeArgs(lhsDesc.StartKey.AsRawKey()) + _, pErr := kv.SendWrapped(ctx, mtc.stores[0].TestSender(), args) + mergeErr <- pErr + }() + + // Wait for the merge transaction to send its EndTxn request. It won't + // be able to complete just yet, thanks to the hook we installed above. + <-mergeEndTxnReceived + + checkConsistencyResp := make(chan interface{}) + go func() { + args := checkConsistencyArgs(rhsDesc) + _, pErr := kv.SendWrapped(ctx, mtc.stores[1].TestSender(), args) + checkConsistencyResp <- pErr + }() + + select { + case <-checkConsistencyResp: + t.Fatalf("expected the consistency check to wait until the merge was complete") + case <-time.After(1 * time.Second): + } + + // Let the merge abort, and then ensure that the consistency check + // successfully goes through. + close(abortMergeTxn) + + pErr := <-mergeErr + require.IsType(t, &roachpb.Error{}, pErr) + require.Regexp(t, "abort the merge for test", pErr.Message) + + testutils.SucceedsSoon(t, func() error { + pErr := <-checkConsistencyResp + require.Nil(t, pErr) + return nil + }) +} + // TestStoreRangeMergeConcurrentRequests tests merging ranges that are serving // other traffic concurrently. func TestStoreRangeMergeConcurrentRequests(t *testing.T) {