Skip to content

Commit

Permalink
storage: eager GC for replicas blocking snapshots
Browse files Browse the repository at this point in the history
Consider the following situation on a Range X:

- the range lives on nodes 1,2,3
- gets rebalanced to nodes 1,2,4 while node 3 is down
- node 3 restarts, but the (now removed) replica remains
- quiescent
- the range splits
- the right hand side of the split gets rebalanced to
- nodes 1,2,3.

In order to receive a snapshot for in the last step, node 3 needs to
have garbage-collected its old pre-split replica. If it weren't for node
3's downtime this would normally happen eagerly as its peers will inform
it of its fate. In this scenario however, one would have to wait until
a) a client request creates the Raft group (which is unlikely as it
isn't being addressed any more) or b) a queue picks it up (which can
take a long time).

Instead, when returning an overlap and the overlap appears to be
inactive, we add to the GC queue (which in turn activates the
replica). For the situation above, we could also hope to only create the
Raft group (as that would likely, but not necessarily, bring it into
contact with its ex-peers). However, none of the old members may still
be around in larger clusters, so going for the GC queue directly is the
better option.
  • Loading branch information
petermattis committed Nov 3, 2016
1 parent 7dde9b4 commit 6f66401
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
68 changes: 68 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,74 @@ func TestConcurrentRaftSnapshots(t *testing.T) {
mtc.waitForValues(key, []int64{incAB, incAB, incAB, incAB, incAB})
}

func TestReplicateAfterRemoveAndSplit(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreConfig()
sc.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &sc}
mtc.Start(t, 3)
defer mtc.Stop()
rng1, err := mtc.stores[0].GetReplica(1)
if err != nil {
t.Fatal(err)
}

mtc.replicateRange(1, 1, 2)

// Kill store 2.
mtc.stopStore(2)

// Remove store 2 from the range to simulate removal of a dead node.
mtc.unreplicateRange(1, 2)

// Split the range.
splitKey := roachpb.Key("m")
splitArgs := adminSplitArgs(splitKey, splitKey)
if _, err := rng1.AdminSplit(context.Background(), splitArgs, rng1.Desc()); err != nil {
t.Fatal(err)
}

// Restart store 2.
mtc.restartStore(2)

replicateRange2 := func() error {
// Try to up-replicate range 2 to store 2. We can't use replicateRange
// because this should fail on the first attempt and then eventually
// succeed.
const rangeID = 2
startKey := mtc.findStartKeyLocked(rangeID)

var desc roachpb.RangeDescriptor
if err := mtc.dbs[0].GetProto(context.TODO(), keys.RangeDescriptorKey(startKey), &desc); err != nil {
t.Fatal(err)
}

rep, err := mtc.findMemberStoreLocked(desc).GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}

return rep.ChangeReplicas(
context.Background(),
roachpb.ADD_REPLICA,
roachpb.ReplicaDescriptor{
NodeID: mtc.stores[2].Ident.NodeID,
StoreID: mtc.stores[2].Ident.StoreID,
},
&desc,
)
}

expected := "snapshot intersects existing range"
if err := replicateRange2(); !testutils.IsError(err, expected) {
t.Fatalf("expected error, but found %v", err)
}
mtc.stores[2].SetReplicaGCQueueActive(true)

util.SucceedsSoon(t, replicateRange2)
}

// Test various mechanism for refreshing pending commands.
func TestRefreshPendingCommands(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
25 changes: 23 additions & 2 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3491,12 +3491,33 @@ func (s *Store) canApplySnapshotLocked(
// When such a conflict exists, it will be resolved by one range
// either being split or garbage collected.
exReplica, err := s.getReplicaLocked(exRange.Desc().RangeID)
msg := "snapshot intersects existing range"
ctx := s.AnnotateCtx(context.TODO())
if err != nil {
ctx := s.AnnotateCtx(context.TODO())
log.Warning(ctx, errors.Wrapf(
err, "unable to look up overlapping replica on %s", exReplica))
} else {
inactive := func(r *Replica) bool {
if r.RaftStatus() == nil {
return true
}
lease, pendingLease := r.getLease()
now := s.Clock().Now()
return (lease == nil || !lease.Covers(now)) &&
(pendingLease == nil || !pendingLease.Covers(now))
}

// If the existing range shows no signs of recent activity, give it a GC
// run.
if inactive(exReplica) {
if _, err := s.replicaGCQueue.Add(exReplica, replicaGCPriorityCandidate); err != nil {
log.Errorf(ctx, "%s: unable to add replica to GC queue: %s", exReplica, err)
} else {
msg += "; initiated GC: "
}
}
}
return nil, errors.Errorf("snapshot intersects existing range %s", exReplica)
return nil, errors.Errorf("%s %v", msg, exReplica) // exReplica can be nil
}

placeholder := &ReplicaPlaceholder{
Expand Down

0 comments on commit 6f66401

Please sign in to comment.