Skip to content

Commit

Permalink
storage: eagerly GC replicas on raft transport errors
Browse files Browse the repository at this point in the history
When the Raft transport stream returns an error we can use that error as
an signal that the replica may need to be GC'd.

Suggested in cockroachdb#8130.
Fixes cockroachdb#5789.
  • Loading branch information
petermattis committed Aug 1, 2016
1 parent 1e94f89 commit 2c84fac
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build/check-style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TestVet() {
grep -vE '^vet: cannot process directory \.git' | \
grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \
grep -vE 'declaration of "?(pE|e)rr"? shadows' | \
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
# To return proper HTTP error codes (e.g. 404 Not Found), we need to use
# grpc.Errorf, which has an error code as its first parameter. 'go vet'
# doesn't like that the first parameter isn't a format string.
Expand Down
2 changes: 1 addition & 1 deletion storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
},
})

if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") {
if err := <-errChan; !testutils.IsError(err, "replica too old, discarding message") {
t.Fatalf("got unexpected error: %v", err)
}

Expand Down
34 changes: 31 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/protoutil"
Expand Down Expand Up @@ -145,6 +149,10 @@ func updatesTimestampCache(r roachpb.Request) bool {
return updatesTimestampCacheMethods[m]
}

const replicaTooOld = "replica too old, discarding message"

var errReplicaTooOld = grpc.Errorf(codes.Aborted, replicaTooOld)

// A pendingCmd holds a done channel for a command sent to Raft. Once
// committed to the Raft log, the command is executed and the result returned
// via the done channel.
Expand Down Expand Up @@ -333,11 +341,31 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

r.raftSender = store.ctx.Transport.MakeSender(
func(err error, toReplica roachpb.ReplicaDescriptor) {
ctx := context.Background()
// NB: We can't compare err == errReplicaTooOld because grpc.rpcError is
// a pointer and the pointer comparison will fail.
if grpc.Code(err) == codes.Aborted && grpc.ErrorDesc(err) == replicaTooOld {
r.mu.Lock()
repID := r.mu.replicaID
r.mu.Unlock()
log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID)

if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil {
log.Errorf(ctx, "%s: unable to add to GC queue: %s", r, err)
}
return
}
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Warningf(ctx,
"%s: outgoing raft transport stream to %s closed by the remote: %v",
r, toReplica, err)
}
})

if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2356,8 +2356,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
}
// It's not a current member of the group. Is it from the past?
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d",
req.RangeID, req.FromReplica, desc.NextReplicaID)
return errReplicaTooOld
}
}

Expand Down

0 comments on commit 2c84fac

Please sign in to comment.