From 0de0aa1df4747bbcab41146545d638d087b58c17 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Sun, 31 Jul 2016 15:42:01 -0400 Subject: [PATCH] storage: eagerly GC replicas on raft transport errors When the Raft transport stream returns an error we can use that error as an signal that the replica may need to be GC'd. Suggested in #8130. Fixes #5789. --- build/check-style.sh | 2 +- storage/client_raft_test.go | 54 ++++++++++++++++++++++++++++++++++++- storage/replica.go | 30 ++++++++++++++++++--- storage/store.go | 3 +-- util/grpcutil/grpc_util.go | 8 ++++++ 5 files changed, 90 insertions(+), 7 deletions(-) diff --git a/build/check-style.sh b/build/check-style.sh index 9af6e1e976cb..f214205f1976 100755 --- a/build/check-style.sh +++ b/build/check-style.sh @@ -108,7 +108,7 @@ TestVet() { grep -vE '^vet: cannot process directory \.git' | \ grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \ grep -vE 'declaration of "?(pE|e)rr"? shadows' | \ - grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf' + grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf' # To return proper HTTP error codes (e.g. 404 Not Found), we need to use # grpc.Errorf, which has an error code as its first parameter. 'go vet' # doesn't like that the first parameter isn't a format string. diff --git a/storage/client_raft_test.go b/storage/client_raft_test.go index bc4ffdd45d80..05c9f43fdf70 100644 --- a/storage/client_raft_test.go +++ b/storage/client_raft_test.go @@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { }, }) - if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") { + if err := <-errChan; !testutils.IsError(err, "sender replica too old, discarding message") { t.Fatalf("got unexpected error: %v", err) } @@ -1577,6 +1577,58 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { } } +func TestReplicaTooOldGC(t *testing.T) { + defer leaktest.AfterTest(t)() + + mtc := startMultiTestContext(t, 4) + defer mtc.Stop() + + // Replicate the first range onto all of the nodes. + const rangeID = 1 + mtc.replicateRange(rangeID, 1, 2, 3) + + // Put some data in the range so we'll have something to test for. + incArgs := incrementArgs([]byte("a"), 5) + if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil { + t.Fatal(err) + } + // Wait for all nodes to catch up. + mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5}) + + // Verify store 3 has the replica. + if _, err := mtc.stores[3].GetReplica(rangeID); err != nil { + t.Fatal(err) + } + + // Stop node 3; while it is down remove the range from it. Since the node is + // down it won't see the removal and won't clean up its replica. + mtc.stopStore(3) + mtc.unreplicateRange(rangeID, 3) + + // Perform another write. + incArgs = incrementArgs([]byte("a"), 11) + if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil { + t.Fatal(err) + } + mtc.waitForValues(roachpb.Key("a"), []int64{16, 16, 16, 5}) + + // Restart node 3. The removed replica will start talking to the other + // replicas and determine it needs to be GC'd. + mtc.restartStore(3) + mtc.stores[3].SetReplicaScannerDisabled(true) + + util.SucceedsSoon(t, func() error { + replica, err := mtc.stores[3].GetReplica(rangeID) + if err != nil { + if _, ok := err.(*roachpb.RangeNotFoundError); ok { + return nil + } + return err + } + return errors.Errorf("found %s, waiting for it to be GC'd", replica) + }) +} + func TestReplicateReAddAfterDown(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/storage/replica.go b/storage/replica.go index 2c13617f5a01..880b0ca733f4 100644 --- a/storage/replica.go +++ b/storage/replica.go @@ -30,6 +30,9 @@ import ( "sync/atomic" "time" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/kr/pretty" @@ -46,6 +49,7 @@ import ( "github.com/cockroachdb/cockroach/storage/engine/enginepb" "github.com/cockroachdb/cockroach/storage/storagebase" "github.com/cockroachdb/cockroach/util/encoding" + "github.com/cockroachdb/cockroach/util/grpcutil" "github.com/cockroachdb/cockroach/util/hlc" "github.com/cockroachdb/cockroach/util/log" "github.com/cockroachdb/cockroach/util/protoutil" @@ -145,6 +149,8 @@ func updatesTimestampCache(r roachpb.Request) bool { return updatesTimestampCacheMethods[m] } +var errReplicaTooOld = grpc.Errorf(codes.Aborted, "sender replica too old, discarding message") + // A pendingCmd holds a done channel for a command sent to Raft. Once // committed to the Raft log, the command is executed and the result returned // via the done channel. @@ -333,11 +339,29 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R RangeID: desc.RangeID, store: store, abortCache: NewAbortCache(desc.RangeID), - raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) { - log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err) - }), } + r.raftSender = store.ctx.Transport.MakeSender( + func(err error, toReplica roachpb.ReplicaDescriptor) { + ctx := context.TODO() // plumb the context from transport + if grpcutil.ErrorEqual(err, errReplicaTooOld) { + r.mu.Lock() + repID := r.mu.replicaID + r.mu.Unlock() + log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID) + + if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil { + log.Errorf(ctx, "%s: unable to add to GC queue: %s", r, err) + } + return + } + if err != nil && !grpcutil.IsClosedConnection(err) { + log.Warningf(ctx, + "%s: outgoing raft transport stream to %s closed by the remote: %s", + r, toReplica, err) + } + }) + if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil { return nil, err } diff --git a/storage/store.go b/storage/store.go index 1c6b3ba7ed39..380945ada608 100644 --- a/storage/store.go +++ b/storage/store.go @@ -2356,8 +2356,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error { } // It's not a current member of the group. Is it from the past? if !found && req.FromReplica.ReplicaID < desc.NextReplicaID { - return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d", - req.RangeID, req.FromReplica, desc.NextReplicaID) + return errReplicaTooOld } } diff --git a/util/grpcutil/grpc_util.go b/util/grpcutil/grpc_util.go index c463b1ad0f43..0529cb1a1d2e 100644 --- a/util/grpcutil/grpc_util.go +++ b/util/grpcutil/grpc_util.go @@ -41,3 +41,11 @@ func IsClosedConnection(err error) bool { } return netutil.IsClosedConnection(err) } + +// ErrorEqual checks two grpc errors for equality. We check structural +// equality, not pointer equality as you would get by comparing the error +// interface. +func ErrorEqual(err1, err2 error) bool { + return grpc.Code(err1) == grpc.Code(err2) && + grpc.ErrorDesc(err1) == grpc.ErrorDesc(err2) +}