Skip to content

Commit

Permalink
storage: eagerly GC replicas on raft transport errors
Browse files Browse the repository at this point in the history
When the Raft transport stream returns an error we can use that error as
an signal that the replica may need to be GC'd.

Suggested in cockroachdb#8130.
Fixes cockroachdb#5789.
  • Loading branch information
petermattis committed Aug 1, 2016
1 parent 1e94f89 commit bda9196
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build/check-style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TestVet() {
grep -vE '^vet: cannot process directory \.git' | \
grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \
grep -vE 'declaration of "?(pE|e)rr"? shadows' | \
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
# To return proper HTTP error codes (e.g. 404 Not Found), we need to use
# grpc.Errorf, which has an error code as its first parameter. 'go vet'
# doesn't like that the first parameter isn't a format string.
Expand Down
54 changes: 53 additions & 1 deletion storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
},
})

if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") {
if err := <-errChan; !testutils.IsError(err, "sender replica too old, discarding message") {
t.Fatalf("got unexpected error: %v", err)
}

Expand All @@ -1577,6 +1577,58 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
}
}

func TestReplicaTooOldGC(t *testing.T) {
defer leaktest.AfterTest(t)()

mtc := startMultiTestContext(t, 4)
defer mtc.Stop()

// Replicate the first range onto all of the nodes.
const rangeID = 1
mtc.replicateRange(rangeID, 1, 2, 3)

// Put some data in the range so we'll have something to test for.
incArgs := incrementArgs([]byte("a"), 5)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
// Wait for all nodes to catch up.
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5})

// Verify store 3 has the replica.
if _, err := mtc.stores[3].GetReplica(rangeID); err != nil {
t.Fatal(err)
}

// Stop node 3; while it is down remove the range from it. Since the node is
// down it won't see the removal and won't clean up its replica.
mtc.stopStore(3)
mtc.unreplicateRange(rangeID, 3)

// Perform another write.
incArgs = incrementArgs([]byte("a"), 11)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("a"), []int64{16, 16, 16, 5})

// Restart node 3. The removed replica will start talking to the other
// replicas and determine it needs to be GC'd.
mtc.restartStore(3)
mtc.stores[3].SetReplicaScannerDisabled(true)

util.SucceedsSoon(t, func() error {
replica, err := mtc.stores[3].GetReplica(rangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
return nil
}
return err
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
}

func TestReplicateReAddAfterDown(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
30 changes: 27 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/protoutil"
Expand Down Expand Up @@ -145,6 +149,8 @@ func updatesTimestampCache(r roachpb.Request) bool {
return updatesTimestampCacheMethods[m]
}

var errReplicaTooOld = grpc.Errorf(codes.Aborted, "sender replica too old, discarding message")

// A pendingCmd holds a done channel for a command sent to Raft. Once
// committed to the Raft log, the command is executed and the result returned
// via the done channel.
Expand Down Expand Up @@ -333,11 +339,29 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

r.raftSender = store.ctx.Transport.MakeSender(
func(err error, toReplica roachpb.ReplicaDescriptor) {
ctx := context.TODO() // plumb the context from transport
if grpcutil.ErrorEqual(err, errReplicaTooOld) {
r.mu.Lock()
repID := r.mu.replicaID
r.mu.Unlock()
log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID)

if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil {
log.Errorf(ctx, "%s: unable to add replica %d to GC queue: %s", r, repID, err)
}
return
}
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Warningf(ctx,
"%s: outgoing raft transport stream to %s closed by the remote: %s",
r, toReplica, err)
}
})

if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2356,8 +2356,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
}
// It's not a current member of the group. Is it from the past?
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d",
req.RangeID, req.FromReplica, desc.NextReplicaID)
return errReplicaTooOld
}
}

Expand Down
8 changes: 8 additions & 0 deletions util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func IsClosedConnection(err error) bool {
}
return netutil.IsClosedConnection(err)
}

// ErrorEqual checks two grpc errors for equality. We check structural
// equality, not pointer equality as you would get by comparing the error
// interface.
func ErrorEqual(err1, err2 error) bool {
return grpc.Code(err1) == grpc.Code(err2) &&
grpc.ErrorDesc(err1) == grpc.ErrorDesc(err2)
}

0 comments on commit bda9196

Please sign in to comment.