Skip to content

Commit

Permalink
storage: clean up preemptive snapshot when receiving replica ID as le…
Browse files Browse the repository at this point in the history
…arner

This commit adds an annotation to raft request messages to indicate that the
sender believes the current replica is a learner. If the current replica on
the recipient was created as a preemptive snapshot (it's initialized but not
in the range) then we should remove that replica immediately.

Release note: None
  • Loading branch information
ajwerner committed Sep 17, 2019
1 parent a7c0625 commit 6f7c3f2
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 115 deletions.
227 changes: 133 additions & 94 deletions pkg/storage/raft.pb.go

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions pkg/storage/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ message RaftHeartbeat {
optional uint64 term = 4 [(gogoproto.nullable) = false];
optional uint64 commit = 5 [(gogoproto.nullable) = false];
optional bool quiesce = 6 [(gogoproto.nullable) = false];

// ToIsLearner was added in v19.2 to aid in the transition from preemptive
// snapshots to learner replicas. If a Replica learns its ID from a message
// which indicates that it is a learner and it is not currently a part of the
// range (due to being from a preemptive snapshot) then it must delete all of
// its data.
//
// TODO(ajwerner): remove in 20.2 once we ensure that preemptive snapshots can
// no longer be present and that we're never talking to a 19.2 node.
optional bool to_is_learner = 7 [(gogoproto.nullable) = false];
}

// RaftMessageRequest is the request used to send raft messages using our
Expand Down
20 changes: 19 additions & 1 deletion pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,25 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
log.Event(ctx, "applying command")
}

// Acquire the split or merge lock, if necessary. If a split or merge
// There's a nasty edge case here which only exists in 19.2 (for subtle
// reasons).
//
// Imagine we're catching up from a pre-emptive snapshot and we come across
// a merge, we're not gaurenteed that the RHS is going to be present. One
// option might be to destroy the current replica. Unfortunately we might
// also gotten raft messages and voted so we'll know our replica ID but our
// current view of the range descriptor will not include the current store.
//
// * One proposal is to just refuse to apply the current command and just spin
// handling raft ready objects but not actually applying any commands but
// detecting the illegal merge and truncating the CommittedEntries slice
// to before the merge prior to calling Advance.
// * The other is to permit destroying an initialized replica with
// nextReplicaID == Replica.mu.replicaID if the store is not in
// Replica.mu.state.Desc. We'd need to make sure to preserve the hard state
// in this case.

// acquire the split or merge lock, if necessary. If a split or merge
// command was rejected with a below-Raft forced error then its replicated
// result was just cleared and this will be a no-op.
if splitMergeUnlock, err := b.r.maybeAcquireSplitMergeLock(ctx, cmd.raftCmd); err != nil {
Expand Down
11 changes: 9 additions & 2 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ func (r *Replica) maybeCoalesceHeartbeat(
Term: msg.Term,
Commit: msg.Commit,
Quiesce: quiesce,
ToIsLearner: toReplica.GetType() == roachpb.LEARNER,
}
if log.V(4) {
log.Infof(ctx, "coalescing beat: %+v", beat)
Expand Down Expand Up @@ -1572,7 +1573,7 @@ func (r *Replica) maybeAcquireSplitMergeLock(
func (r *Replica) acquireSplitLock(
ctx context.Context, split *roachpb.SplitTrigger,
) (func(), error) {
rightRng, _, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil)
rightRng, _, err := r.store.getOrCreateReplica(ctx, split.RightDesc.RangeID, 0, nil, true)
if err != nil {
return nil, err
}
Expand All @@ -1594,7 +1595,13 @@ func (r *Replica) acquireMergeLock(
// right-hand replica in place, any snapshots for the right-hand range will
// block on raftMu, waiting for the merge to complete, after which the replica
// will realize it has been destroyed and reject the snapshot.
rightRepl, _, err := r.store.getOrCreateReplica(ctx, merge.RightDesc.RangeID, 0, nil)
//
// There's one edge case to this requirement: the current Replica may be
// catching up from a pre-emptive snapshot. This case only arises in v19.2
// while performing the upgrade. If we detect that we're not a part of this
// range then it's not safe to apply this merge. In that case we need to
// remove the LHS replica synchronously.
rightRepl, _, err := r.store.getOrCreateReplica(ctx, merge.RightDesc.RangeID, 0, nil, true)
if err != nil {
return nil, err
}
Expand Down
62 changes: 45 additions & 17 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3356,6 +3356,10 @@ func (s *Store) HandleSnapshot(
})
}

// learnerReplicaType exists to avoid allocating when marking a replica as a
// learner in uncoalesceBeats.
var learnerReplicaType = roachpb.LEARNER

func (s *Store) uncoalesceBeats(
ctx context.Context,
beats []RaftHeartbeat,
Expand Down Expand Up @@ -3393,6 +3397,9 @@ func (s *Store) uncoalesceBeats(
Message: msg,
Quiesce: beat.Quiesce,
}
if beat.ToIsLearner {
beatReqs[i].ToReplica.Type = &learnerReplicaType
}
if log.V(4) {
log.Infof(ctx, "uncoalesced beat: %+v", beatReqs[i])
}
Expand Down Expand Up @@ -3479,6 +3486,7 @@ func (s *Store) withReplicaForRequest(
req.RangeID,
req.ToReplica.ReplicaID,
&req.FromReplica,
req.ToReplica.GetType() == roachpb.VOTER_FULL,
)
if err != nil {
return roachpb.NewError(err)
Expand Down Expand Up @@ -4063,6 +4071,7 @@ func (s *Store) getOrCreateReplica(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
creatingReplica *roachpb.ReplicaDescriptor,
isVoter bool,
) (_ *Replica, created bool, _ error) {
r := retry.Start(retry.Options{
InitialBackoff: time.Microsecond,
Expand All @@ -4075,6 +4084,7 @@ func (s *Store) getOrCreateReplica(
rangeID,
replicaID,
creatingReplica,
isVoter,
)
if err == errRetry {
continue
Expand All @@ -4097,8 +4107,27 @@ func (s *Store) tryGetOrCreateReplica(
rangeID roachpb.RangeID,
replicaID roachpb.ReplicaID,
creatingReplica *roachpb.ReplicaDescriptor,
isVoter bool,
) (_ *Replica, created bool, err error) {
var (
removeReplica = func(repl *Replica) error {
repl.mu.destroyStatus.Set(err, destroyReasonRemovalPending)
isInitialized := repl.isInitializedRLocked()
repl.mu.Unlock()
defer repl.mu.Lock()
if !isInitialized {
if err := s.removeUninitializedReplicaRaftMuLocked(ctx, repl, replicaID); err != nil {
log.Fatalf(ctx, "failed to remove uninitialized replica: %v", err)
}
} else {
if err := s.removeReplicaImpl(ctx, repl, replicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
log.Fatal(ctx, err)
}
}
return errRetry
}
handleFromReplicaTooOld = func(repl *Replica) error {
if creatingReplica == nil {
return nil
Expand All @@ -4120,22 +4149,16 @@ func (s *Store) tryGetOrCreateReplica(
if log.V(1) {
log.Infof(ctx, "found message for newer replica ID: %v %v %v %v %v", repl.mu.replicaID, replicaID, repl.mu.minReplicaID, repl.mu.state.Desc, &repl.mu.destroyStatus)
}
repl.mu.destroyStatus.Set(err, destroyReasonRemovalPending)
isInitialized := repl.isInitializedRLocked()
repl.mu.Unlock()
defer repl.mu.Lock()
if !isInitialized {
if err := s.removeUninitializedReplicaRaftMuLocked(ctx, repl, replicaID); err != nil {
log.Fatalf(ctx, "failed to remove uninitialized replica: %v", err)
}
} else {
if err := s.removeReplicaImpl(ctx, repl, replicaID, RemoveOptions{
DestroyData: true,
}); err != nil {
log.Fatal(ctx, err)
}
return removeReplica(repl)
}
handleNonVoterWithPreemptiveSnapshot = func(repl *Replica) error {
if repl.mu.replicaID != 0 || replicaID == 0 || !repl.isInitializedRLocked() || isVoter {
return nil
}
return errRetry
if log.V(1) {
log.Infof(ctx, "found message for replica ID %v as non-voter but currently not part of the range, destroying preemptive snapshot", replicaID)
}
return removeReplica(repl)
}
)
// The common case: look up an existing (initialized) replica.
Expand All @@ -4156,11 +4179,16 @@ func (s *Store) tryGetOrCreateReplica(
repl.raftMu.Unlock()
return nil, false, err
}
if err := handleNonVoterWithPreemptiveSnapshot(repl); err != nil {
repl.raftMu.Unlock()
return nil, false, err
}

var err error
if repl.mu.replicaID == 0 {
if repl.mu.replicaID == 0 && replicaID != 0 {
// This message is telling us about our replica ID.
// This is a common case when dealing with preemptive snapshots.
// This is a common case when dealing with preemptive snapshots if we're
// initialized.
err = repl.setReplicaIDRaftMuLockedMuLocked(repl.AnnotateCtx(ctx), replicaID)
} else if replicaID != 0 && repl.mu.replicaID > replicaID {
// TODO(ajwerner): probably just silently drop this message.
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,11 +744,19 @@ func (s *Store) shouldAcceptSnapshotData(
}
pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest,
func(ctx context.Context, r *Replica) *roachpb.Error {
// If the current replica is not initialized then we should accept this
// snapshot if it doesn't overlap existing ranges.
if !r.IsInitialized() {
s.mu.Lock()
defer s.mu.Unlock()
return roachpb.NewError(s.checkSnapshotOverlapLocked(ctx, snapHeader))
}
// If the current range is initialized then we need to accept this
// this snapshot. There's a hidden nasty case here during 19.2 where
// our currently initialized range is due to a preemptive snapshot and
// we've now assigned that range a replica ID. Fundamentally at this
// point we want to clear out the pre-emptive snapshot because applying
// learner snapshot over top is likely going to be problematic.
return nil
})
return pErr.GoError()
Expand Down
14 changes: 13 additions & 1 deletion pkg/storage/store_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestSnapshotPreemptiveOnUninitializedReplica(t *testing.T) {
store, _ := createTestStore(t, testStoreOpts{}, stopper)

// Create an uninitialized replica.
repl, created, err := store.getOrCreateReplica(ctx, 77, 1, nil)
repl, created, err := store.getOrCreateReplica(ctx, 77, 1, nil, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -141,3 +141,15 @@ func TestSnapshotPreemptiveOnUninitializedReplica(t *testing.T) {
t.Fatal(err)
}
}

// TestSnapshotPreemptiveRemovedAfterMessageToNonVoter tests that replicas which
// encounter preemptive snapshots remove all of the data due to that preemptive
// snapshot.
//
// This behavior is critical to provide safety when upgrading from preemptive
// snapshots. Before learner replicas we would not add a store to a range
// until we had successfully sent a preemptive snapshot. Furthermore we'd
// only add that store assuming that the
func TestPreemptiveSnapshotRemovedAfterMessageTo(t *testing.T) {

}

0 comments on commit 6f7c3f2

Please sign in to comment.