From 6f66401984cb61ce9fd647f98b8f70b65cf86641 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Wed, 2 Nov 2016 16:22:19 -0400 Subject: [PATCH] storage: eager GC for replicas blocking snapshots Consider the following situation on a Range X: - the range lives on nodes 1,2,3 - gets rebalanced to nodes 1,2,4 while node 3 is down - node 3 restarts, but the (now removed) replica remains - quiescent - the range splits - the right hand side of the split gets rebalanced to - nodes 1,2,3. In order to receive a snapshot for in the last step, node 3 needs to have garbage-collected its old pre-split replica. If it weren't for node 3's downtime this would normally happen eagerly as its peers will inform it of its fate. In this scenario however, one would have to wait until a) a client request creates the Raft group (which is unlikely as it isn't being addressed any more) or b) a queue picks it up (which can take a long time). Instead, when returning an overlap and the overlap appears to be inactive, we add to the GC queue (which in turn activates the replica). For the situation above, we could also hope to only create the Raft group (as that would likely, but not necessarily, bring it into contact with its ex-peers). However, none of the old members may still be around in larger clusters, so going for the GC queue directly is the better option. --- pkg/storage/client_raft_test.go | 68 +++++++++++++++++++++++++++++++++ pkg/storage/store.go | 25 +++++++++++- 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index c8d5151e915f..20761094e89d 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -681,6 +681,74 @@ func TestConcurrentRaftSnapshots(t *testing.T) { mtc.waitForValues(key, []int64{incAB, incAB, incAB, incAB, incAB}) } +func TestReplicateAfterRemoveAndSplit(t *testing.T) { + defer leaktest.AfterTest(t)() + + sc := storage.TestStoreConfig() + sc.TestingKnobs.DisableReplicaGCQueue = true + mtc := &multiTestContext{storeConfig: &sc} + mtc.Start(t, 3) + defer mtc.Stop() + rng1, err := mtc.stores[0].GetReplica(1) + if err != nil { + t.Fatal(err) + } + + mtc.replicateRange(1, 1, 2) + + // Kill store 2. + mtc.stopStore(2) + + // Remove store 2 from the range to simulate removal of a dead node. + mtc.unreplicateRange(1, 2) + + // Split the range. + splitKey := roachpb.Key("m") + splitArgs := adminSplitArgs(splitKey, splitKey) + if _, err := rng1.AdminSplit(context.Background(), splitArgs, rng1.Desc()); err != nil { + t.Fatal(err) + } + + // Restart store 2. + mtc.restartStore(2) + + replicateRange2 := func() error { + // Try to up-replicate range 2 to store 2. We can't use replicateRange + // because this should fail on the first attempt and then eventually + // succeed. + const rangeID = 2 + startKey := mtc.findStartKeyLocked(rangeID) + + var desc roachpb.RangeDescriptor + if err := mtc.dbs[0].GetProto(context.TODO(), keys.RangeDescriptorKey(startKey), &desc); err != nil { + t.Fatal(err) + } + + rep, err := mtc.findMemberStoreLocked(desc).GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + + return rep.ChangeReplicas( + context.Background(), + roachpb.ADD_REPLICA, + roachpb.ReplicaDescriptor{ + NodeID: mtc.stores[2].Ident.NodeID, + StoreID: mtc.stores[2].Ident.StoreID, + }, + &desc, + ) + } + + expected := "snapshot intersects existing range" + if err := replicateRange2(); !testutils.IsError(err, expected) { + t.Fatalf("expected error, but found %v", err) + } + mtc.stores[2].SetReplicaGCQueueActive(true) + + util.SucceedsSoon(t, replicateRange2) +} + // Test various mechanism for refreshing pending commands. func TestRefreshPendingCommands(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/store.go b/pkg/storage/store.go index d6bc81339152..24567d94baaf 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3491,12 +3491,33 @@ func (s *Store) canApplySnapshotLocked( // When such a conflict exists, it will be resolved by one range // either being split or garbage collected. exReplica, err := s.getReplicaLocked(exRange.Desc().RangeID) + msg := "snapshot intersects existing range" + ctx := s.AnnotateCtx(context.TODO()) if err != nil { - ctx := s.AnnotateCtx(context.TODO()) log.Warning(ctx, errors.Wrapf( err, "unable to look up overlapping replica on %s", exReplica)) + } else { + inactive := func(r *Replica) bool { + if r.RaftStatus() == nil { + return true + } + lease, pendingLease := r.getLease() + now := s.Clock().Now() + return (lease == nil || !lease.Covers(now)) && + (pendingLease == nil || !pendingLease.Covers(now)) + } + + // If the existing range shows no signs of recent activity, give it a GC + // run. + if inactive(exReplica) { + if _, err := s.replicaGCQueue.Add(exReplica, replicaGCPriorityCandidate); err != nil { + log.Errorf(ctx, "%s: unable to add replica to GC queue: %s", exReplica, err) + } else { + msg += "; initiated GC: " + } + } } - return nil, errors.Errorf("snapshot intersects existing range %s", exReplica) + return nil, errors.Errorf("%s %v", msg, exReplica) // exReplica can be nil } placeholder := &ReplicaPlaceholder{