Skip to content

Commit

Permalink
Merge pull request #7404 from petermattis/pmattis/replica-destoyed
Browse files Browse the repository at this point in the history
storage: mark Replicas as destroyed
  • Loading branch information
petermattis authored Jun 23, 2016
2 parents 5505c6f + 4583320 commit 646ce6c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
9 changes: 9 additions & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ type Replica struct {
mu struct {
// Protects all fields in the mu struct.
sync.Mutex
// Has the replica been destroyed.
destroyed error
// The state of the Raft state machine.
state storagebase.ReplicaState
// Counter used for assigning lease indexes for proposals.
Expand Down Expand Up @@ -236,6 +238,10 @@ type Replica struct {
// withRaftGroupLocked calls the supplied function with the (lazily
// initialized) Raft group. It assumes that the Replica lock is held.
func (r *Replica) withRaftGroupLocked(f func(r *raft.RawNode) error) error {
if r.mu.destroyed != nil {
return r.mu.destroyed
}

if r.mu.internalRaftGroup == nil {
raftGroup, err := raft.NewRawNode(&raft.Config{
ID: uint64(r.mu.replicaID),
Expand Down Expand Up @@ -368,6 +374,9 @@ func (r *Replica) Destroy(origDesc roachpb.RangeDescriptor) error {
}
// Clear the map.
r.mu.pendingCmds = map[storagebase.CmdIDKey]*pendingCmd{}
r.mu.internalRaftGroup = nil
r.mu.destroyed = util.Errorf("replica %d (range %d) was garbage collected",
r.mu.replicaID, r.RangeID)
r.mu.Unlock()

return r.store.destroyReplicaData(desc)
Expand Down
23 changes: 23 additions & 0 deletions storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/util/retry"
"github.com/cockroachdb/cockroach/util/stop"
"github.com/cockroachdb/cockroach/util/uuid"
"github.com/coreos/etcd/raft"
)

var testIdent = roachpb.StoreIdent{
Expand Down Expand Up @@ -388,6 +389,28 @@ func TestStoreRemoveReplicaOldDescriptor(t *testing.T) {
}
}

func TestStoreRemoveReplicaDestroy(t *testing.T) {
defer leaktest.AfterTest(t)()
store, _, stopper := createTestStore(t)
defer stopper.Stop()

rng1, err := store.GetReplica(1)
if err != nil {
t.Fatal(err)
}
if err := store.RemoveReplica(rng1, *rng1.Desc(), true); err != nil {
t.Fatal(err)
}

// Verify that removal of a replica marks it as destroyed so that future raft
// commands on the Replica structure will fail.
err = rng1.withRaftGroup(func(r *raft.RawNode) error { return nil })
expected := "replica .* was garbage collected"
if !testutils.IsError(err, expected) {
t.Fatalf("expected error %s, but got %v", expected, err)
}
}

func TestStoreRangeSet(t *testing.T) {
defer leaktest.AfterTest(t)()
store, _, stopper := createTestStore(t)
Expand Down

0 comments on commit 646ce6c

Please sign in to comment.