Skip to content

Commit

Permalink
storage: avoid resurrecting dead replicas
Browse files Browse the repository at this point in the history
Introduce Replica.mu.minReplicaID which enforces the tombstone invariant
that we don't accept messages for previous Replica incarnations.

Make TestRaftRemoveRace more aggressive.

Fixes cockroachdb#9037
  • Loading branch information
petermattis committed Nov 21, 2016
1 parent d75dd1a commit 393b6e1
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 21 deletions.
18 changes: 9 additions & 9 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1906,19 +1906,19 @@ func TestRaftAfterRemoveRange(t *testing.T) {
mtc.expireLeases()
}

// TestRaftRemoveRace adds and removes a replica repeatedly in an
// attempt to reproduce a race
// (https://github.com/cockroachdb/cockroach/issues/1911). Note that
// 10 repetitions is not enough to reliably reproduce the problem, but
// it's better than any other tests we have for this (increasing the
// number of repetitions adds an unacceptable amount of test runtime).
// TestRaftRemoveRace adds and removes a replica repeatedly in an attempt to
// reproduce a race (see #1911 and #9037).
func TestRaftRemoveRace(t *testing.T) {
defer leaktest.AfterTest(t)()
mtc := startMultiTestContext(t, 3)
mtc := startMultiTestContext(t, 10)
defer mtc.Stop()

rangeID := roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)
const rangeID = roachpb.RangeID(1)
// Up-replicate to a bunch of nodes which stresses a condition where a
// replica created via a preemptive snapshot receives a message for a
// previous incarnation of the replica (i.e. has a smaller replica ID) that
// existed on the same node.
mtc.replicateRange(rangeID, 1, 2, 3, 4, 5, 6, 7, 8, 9)

for i := 0; i < 10; i++ {
mtc.unreplicateRange(rangeID, 2)
Expand Down
43 changes: 31 additions & 12 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ type Replica struct {
// Raft group). The replica ID will be non-zero whenever the replica is
// part of a Raft group.
replicaID roachpb.ReplicaID
// The minimum allowed ID for this replica. Initialized from
// RaftTombstone.NextReplicaID.
minReplicaID roachpb.ReplicaID
// The ID of the leader replica within the Raft group. Used to determine
// when the leadership changes.
leaderID roachpb.ReplicaID
Expand Down Expand Up @@ -702,18 +705,27 @@ func (r *Replica) destroyDataRaftMuLocked() error {

// Save a tombstone. The range cannot be re-replicated onto this
// node without having a replica ID of at least desc.NextReplicaID.
tombstoneKey := keys.RaftTombstoneKey(desc.RangeID)
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
ctx := r.AnnotateCtx(context.TODO())
if err := engine.MVCCPutProto(ctx, batch, nil, tombstoneKey, hlc.ZeroTimestamp, nil, tombstone); err != nil {
if err := r.setTombstoneKey(ctx, batch, desc); err != nil {
return err
}

return batch.Commit()
}

func (r *Replica) setTombstoneKey(
ctx context.Context, eng engine.ReadWriter, desc *roachpb.RangeDescriptor,
) error {
r.mu.Lock()
r.mu.minReplicaID = desc.NextReplicaID
r.mu.Unlock()
tombstoneKey := keys.RaftTombstoneKey(desc.RangeID)
tombstone := &roachpb.RaftTombstone{
NextReplicaID: desc.NextReplicaID,
}
return engine.MVCCPutProto(ctx, eng, nil, tombstoneKey,
hlc.ZeroTimestamp, nil, tombstone)
}

func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -722,16 +734,23 @@ func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error {

// setReplicaIDLocked requires that the replica lock is held.
func (r *Replica) setReplicaIDLocked(replicaID roachpb.ReplicaID) error {
if replicaID == 0 {
// If the incoming message didn't give us a new replica ID,
// there's nothing to do (this is only expected for preemptive snapshots).
if r.mu.replicaID == replicaID {
// The common case: the replica ID is unchanged.
return nil
}
if r.mu.replicaID == replicaID {
if replicaID == 0 {
// If the incoming message does not have a new replica ID it is a
// preemptive snapshot. We'll set a tombstone for the old replica ID if the
// snapshot is accepted.
return nil
} else if r.mu.replicaID > replicaID {
}
if replicaID < r.mu.minReplicaID {
return &roachpb.RaftGroupDeletedError{}
}
if r.mu.replicaID > replicaID {
return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID)
} else if r.mu.replicaID != 0 {
}
if r.mu.replicaID != 0 {
// TODO(bdarnell): clean up previous raftGroup (update peers)
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6030,6 +6030,47 @@ func TestReplicaIDChangePending(t *testing.T) {
<-commandProposed
}

func TestSetReplicaID(t *testing.T) {
defer leaktest.AfterTest(t)()

tsc := TestStoreConfig(nil)
tc := testContext{}
tc.StartWithStoreConfig(t, tsc)
defer tc.Stop()

repl := tc.repl

testCases := []struct {
replicaID roachpb.ReplicaID
minReplicaID roachpb.ReplicaID
newReplicaID roachpb.ReplicaID
expected string
}{
{0, 0, 1, ""},
{0, 1, 1, ""},
{0, 2, 1, "raft group deleted"},
{1, 2, 1, ""},
{2, 0, 1, "replicaID cannot move backwards"},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
repl.mu.Lock()
repl.mu.replicaID = c.replicaID
repl.mu.minReplicaID = c.minReplicaID
repl.mu.Unlock()

err := repl.setReplicaID(c.newReplicaID)
if c.expected == "" {
if err != nil {
t.Fatalf("expected success, but found %v", err)
}
} else if !testutils.IsError(err, c.expected) {
t.Fatalf("expected %s, but found %v", c.expected, err)
}
})
}
}

func TestReplicaRetryRaftProposal(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2877,6 +2877,7 @@ func (s *Store) processRaftRequest(
// would limit the effectiveness of RaftTransport.SendSync for
// preemptive snapshots.
r.mu.internalRaftGroup = nil
needTombstone := r.mu.state.Desc.NextReplicaID != 0
r.mu.Unlock()

appliedIndex, _, err := loadAppliedIndex(ctx, r.store.Engine(), r.RangeID)
Expand Down Expand Up @@ -2911,6 +2912,14 @@ func (s *Store) processRaftRequest(
ready = raftGroup.Ready()
}

if needTombstone {
// Write a tombstone key in order to prevent the replica from receiving
// messages from its previous incarnation.
if err := r.setTombstoneKey(ctx, r.store.Engine(), r.mu.state.Desc); err != nil {
return roachpb.NewError(err)
}
}

// Apply the snapshot, as Raft told us to.
if err := r.applySnapshot(ctx, inSnap, ready.Snapshot, ready.HardState); err != nil {
return roachpb.NewError(err)
Expand Down Expand Up @@ -3490,6 +3499,7 @@ func (s *Store) tryGetOrCreateReplica(
// replica even outside of raft processing. Have to do this after grabbing
// Store.mu to maintain lock ordering invariant.
repl.mu.Lock()
repl.mu.minReplicaID = tombstone.NextReplicaID
// Add the range to range map, but not replicasByKey since the range's start
// key is unknown. The range will be added to replicasByKey later when a
// snapshot is applied. After unlocking Store.mu above, another goroutine
Expand Down

0 comments on commit 393b6e1

Please sign in to comment.