diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 329564d3743b..cee381cc1a7f 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -475,6 +475,31 @@ func TestFailedReplicaChange(t *testing.T) { }) } +// Test that the preemptive snapshot has been released before the associated +// transaction for ChangeReplicas operation is run. +func TestPreemptiveSnapshotReleasedAfterApply(t *testing.T) { + defer leaktest.AfterTest(t)() + + var mtc *multiTestContext + sc := storage.TestStoreConfig(nil) + sc.TestingKnobs.TestingCommandFilter = func(filterArgs storagebase.FilterArgs) *roachpb.Error { + if et, ok := filterArgs.Req.(*roachpb.EndTransactionRequest); ok && et.Commit { + if !mtc.stores[filterArgs.Sid-1].AcquireRaftSnapshot() { + // We couldn't acquire the store snapshot which means the preemptive + // snapshot was not released. Return an error in order to fail the + // test. + return roachpb.NewErrorWithTxn(errors.Errorf("boom"), filterArgs.Hdr.Txn) + } + } + return nil + } + mtc = &multiTestContext{storeConfig: &sc} + mtc.Start(t, 2) + defer mtc.Stop() + + mtc.replicateRange(1, 1) +} + // We can truncate the old log entries and a new replica will be brought up from a snapshot. func TestReplicateAfterTruncation(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 56be853871d3..42c6fd805f20 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -3163,53 +3163,64 @@ func (r *Replica) ChangeReplicas( // raft operations. Racing with the replica GC queue can still partially // negate the benefits of pre-emptive snapshots, but that is a recoverable // degradation, not a catastrophic failure. - snap, err := r.GetSnapshot(ctx) - r.mu.Lock() - r.mu.outSnap.claimed = true - r.mu.Unlock() - defer r.CloseOutSnap() - log.Event(ctx, "generated snapshot") - if err != nil { - return errors.Wrapf(err, "%s: change replicas failed", r) - } + // + // NB: A closure is used here so that we can release the snapshot as soon + // as it has been applied on the remote and before the ChangeReplica + // operation is processed. This is important to allow other ranges to make + // progress which might be required for this ChangeReplicas operation to + // complete. See #10409. + if err := func() error { + snap, err := r.GetSnapshot(ctx) + r.mu.Lock() + r.mu.outSnap.claimed = true + r.mu.Unlock() + defer r.CloseOutSnap() + log.Event(ctx, "generated snapshot") + if err != nil { + return errors.Wrapf(err, "%s: change replicas failed", r) + } - fromRepDesc, err := r.GetReplicaDescriptor() - if err != nil { - return errors.Wrapf(err, "%s: change replicas failed", r) - } + fromRepDesc, err := r.GetReplicaDescriptor() + if err != nil { + return errors.Wrapf(err, "%s: change replicas failed", r) + } - if repDesc.ReplicaID != 0 { - return errors.Errorf( - "must not specify a ReplicaID (%d) for new Replica", - repDesc.ReplicaID, - ) - } + if repDesc.ReplicaID != 0 { + return errors.Errorf( + "must not specify a ReplicaID (%d) for new Replica", + repDesc.ReplicaID, + ) + } - if err := r.setPendingSnapshotIndex(snap.RaftSnap.Metadata.Index); err != nil { - return err - } + if err := r.setPendingSnapshotIndex(snap.RaftSnap.Metadata.Index); err != nil { + return err + } - req := SnapshotRequest_Header{ - RangeDescriptor: updatedDesc, - RaftMessageRequest: RaftMessageRequest{ - RangeID: r.RangeID, - FromReplica: fromRepDesc, - ToReplica: repDesc, - Message: raftpb.Message{ - Type: raftpb.MsgSnap, - To: 0, // special cased ReplicaID for preemptive snapshots - From: uint64(fromRepDesc.ReplicaID), - Term: snap.RaftSnap.Metadata.Term, - Snapshot: snap.RaftSnap, + req := SnapshotRequest_Header{ + RangeDescriptor: updatedDesc, + RaftMessageRequest: RaftMessageRequest{ + RangeID: r.RangeID, + FromReplica: fromRepDesc, + ToReplica: repDesc, + Message: raftpb.Message{ + Type: raftpb.MsgSnap, + To: 0, // special cased ReplicaID for preemptive snapshots + From: uint64(fromRepDesc.ReplicaID), + Term: snap.RaftSnap.Metadata.Term, + Snapshot: snap.RaftSnap, + }, }, - }, - RangeSize: r.GetMVCCStats().Total(), - // Recipients can choose to decline preemptive snapshots. - CanDecline: true, - } - if err := r.store.cfg.Transport.SendSnapshot( - ctx, r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch); err != nil { - return errors.Wrapf(err, "%s: change replicas aborted due to failed preemptive snapshot", r) + RangeSize: r.GetMVCCStats().Total(), + // Recipients can choose to decline preemptive snapshots. + CanDecline: true, + } + if err := r.store.cfg.Transport.SendSnapshot( + ctx, r.store.allocator.storePool, req, snap, r.store.Engine().NewBatch); err != nil { + return errors.Wrapf(err, "%s: change replicas aborted due to failed preemptive snapshot", r) + } + return nil + }(); err != nil { + return err } repDesc.ReplicaID = updatedDesc.NextReplicaID