Skip to content

Commit

Permalink
storage: eagerly GC replicas on raft transport errors
Browse files Browse the repository at this point in the history
When the Raft transport stream returns an error we can use that error as
an signal that the replica may need to be GC'd.

Suggested in #8130.
Fixes #5789.
  • Loading branch information
petermattis committed Aug 1, 2016
1 parent 1e94f89 commit 0de0aa1
Showing 5 changed files with 90 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build/check-style.sh
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ TestVet() {
grep -vE '^vet: cannot process directory \.git' | \
grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \
grep -vE 'declaration of "?(pE|e)rr"? shadows' | \
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
# To return proper HTTP error codes (e.g. 404 Not Found), we need to use
# grpc.Errorf, which has an error code as its first parameter. 'go vet'
# doesn't like that the first parameter isn't a format string.
54 changes: 53 additions & 1 deletion storage/client_raft_test.go
Original file line number Diff line number Diff line change
@@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
},
})

if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") {
if err := <-errChan; !testutils.IsError(err, "sender replica too old, discarding message") {
t.Fatalf("got unexpected error: %v", err)
}

@@ -1577,6 +1577,58 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
}
}

func TestReplicaTooOldGC(t *testing.T) {
defer leaktest.AfterTest(t)()

mtc := startMultiTestContext(t, 4)
defer mtc.Stop()

// Replicate the first range onto all of the nodes.
const rangeID = 1
mtc.replicateRange(rangeID, 1, 2, 3)

// Put some data in the range so we'll have something to test for.
incArgs := incrementArgs([]byte("a"), 5)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
// Wait for all nodes to catch up.
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5})

// Verify store 3 has the replica.
if _, err := mtc.stores[3].GetReplica(rangeID); err != nil {
t.Fatal(err)
}

// Stop node 3; while it is down remove the range from it. Since the node is
// down it won't see the removal and won't clean up its replica.
mtc.stopStore(3)
mtc.unreplicateRange(rangeID, 3)

// Perform another write.
incArgs = incrementArgs([]byte("a"), 11)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("a"), []int64{16, 16, 16, 5})

// Restart node 3. The removed replica will start talking to the other
// replicas and determine it needs to be GC'd.
mtc.restartStore(3)
mtc.stores[3].SetReplicaScannerDisabled(true)

util.SucceedsSoon(t, func() error {
replica, err := mtc.stores[3].GetReplica(rangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
return nil
}
return err
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
}

func TestReplicateReAddAfterDown(t *testing.T) {
defer leaktest.AfterTest(t)()

30 changes: 27 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
@@ -46,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/protoutil"
@@ -145,6 +149,8 @@ func updatesTimestampCache(r roachpb.Request) bool {
return updatesTimestampCacheMethods[m]
}

var errReplicaTooOld = grpc.Errorf(codes.Aborted, "sender replica too old, discarding message")

// A pendingCmd holds a done channel for a command sent to Raft. Once
// committed to the Raft log, the command is executed and the result returned
// via the done channel.
@@ -333,11 +339,29 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

r.raftSender = store.ctx.Transport.MakeSender(
func(err error, toReplica roachpb.ReplicaDescriptor) {
ctx := context.TODO() // plumb the context from transport
if grpcutil.ErrorEqual(err, errReplicaTooOld) {
r.mu.Lock()
repID := r.mu.replicaID
r.mu.Unlock()
log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID)

if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil {
log.Errorf(ctx, "%s: unable to add to GC queue: %s", r, err)
}
return
}
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Warningf(ctx,
"%s: outgoing raft transport stream to %s closed by the remote: %s",
r, toReplica, err)
}
})

if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil {
return nil, err
}
3 changes: 1 addition & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
@@ -2356,8 +2356,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
}
// It's not a current member of the group. Is it from the past?
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d",
req.RangeID, req.FromReplica, desc.NextReplicaID)
return errReplicaTooOld
}
}

8 changes: 8 additions & 0 deletions util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
@@ -41,3 +41,11 @@ func IsClosedConnection(err error) bool {
}
return netutil.IsClosedConnection(err)
}

// ErrorEqual checks two grpc errors for equality. We check structural
// equality, not pointer equality as you would get by comparing the error
// interface.
func ErrorEqual(err1, err2 error) bool {
return grpc.Code(err1) == grpc.Code(err2) &&
grpc.ErrorDesc(err1) == grpc.ErrorDesc(err2)
}

0 comments on commit 0de0aa1

Please sign in to comment.