Skip to content

Commit

Permalink
storage: eagerly GC replicas on raft transport errors
Browse files Browse the repository at this point in the history
When the Raft transport stream returns an error we can use that error as
a signal that the replica may need to be GC'd.

Suggested in cockroachdb#8130.
Fixes cockroachdb#5789.
  • Loading branch information
petermattis committed Aug 1, 2016
1 parent 1e94f89 commit 98d673d
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 19 deletions.
2 changes: 1 addition & 1 deletion build/check-style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TestVet() {
grep -vE '^vet: cannot process directory \.git' | \
grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \
grep -vE 'declaration of "?(pE|e)rr"? shadows' | \
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
# To return proper HTTP error codes (e.g. 404 Not Found), we need to use
# grpc.Errorf, which has an error code as its first parameter. 'go vet'
# doesn't like that the first parameter isn't a format string.
Expand Down
56 changes: 55 additions & 1 deletion storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
},
})

if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") {
if err := <-errChan; !testutils.IsError(err, "sender replica too old, discarding message") {
t.Fatalf("got unexpected error: %v", err)
}

Expand All @@ -1577,6 +1577,60 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
}
}

func TestReplicaTooOldGC(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreContext()
sc.TestingKnobs.DisableScanner = true
mtc := multiTestContext{storeContext: &sc}
mtc.Start(t, 4)
defer mtc.Stop()

// Replicate the first range onto all of the nodes.
const rangeID = 1
mtc.replicateRange(rangeID, 1, 2, 3)

// Put some data in the range so we'll have something to test for.
incArgs := incrementArgs([]byte("a"), 5)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
// Wait for all nodes to catch up.
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5})

// Verify store 3 has the replica.
if _, err := mtc.stores[3].GetReplica(rangeID); err != nil {
t.Fatal(err)
}

// Stop node 3; while it is down remove the range from it. Since the node is
// down it won't see the removal and won't clean up its replica.
mtc.stopStore(3)
mtc.unreplicateRange(rangeID, 3)

// Perform another write.
incArgs = incrementArgs([]byte("a"), 11)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("a"), []int64{16, 16, 16, 5})

// Restart node 3. The removed replica will start talking to the other
// replicas and determine it needs to be GC'd.
mtc.restartStore(3)

util.SucceedsSoon(t, func() error {
replica, err := mtc.stores[3].GetReplica(rangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
return nil
}
return err
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
}

func TestReplicateReAddAfterDown(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
12 changes: 0 additions & 12 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
package storage

import (
"sync/atomic"

"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
Expand Down Expand Up @@ -141,16 +139,6 @@ func (r *Replica) GetLastIndex() (uint64, error) {
return r.LastIndex()
}

// SetDisabled turns replica scanning off or on as directed. Note that while
// disabled, removals are still processed.
func (rs *replicaScanner) SetDisabled(disabled bool) {
if disabled {
atomic.StoreInt32(&rs.disabled, 1)
} else {
atomic.StoreInt32(&rs.disabled, 0)
}
}

// GetLease exposes replica.getLease for tests.
func (r *Replica) GetLease() (*roachpb.Lease, *roachpb.Lease) {
return r.getLease()
Expand Down
30 changes: 27 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/protoutil"
Expand Down Expand Up @@ -145,6 +149,8 @@ func updatesTimestampCache(r roachpb.Request) bool {
return updatesTimestampCacheMethods[m]
}

var errReplicaTooOld = grpc.Errorf(codes.Aborted, "sender replica too old, discarding message")

// A pendingCmd holds a done channel for a command sent to Raft. Once
// committed to the Raft log, the command is executed and the result returned
// via the done channel.
Expand Down Expand Up @@ -333,11 +339,29 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

r.raftSender = store.ctx.Transport.MakeSender(
func(err error, toReplica roachpb.ReplicaDescriptor) {
ctx := context.TODO() // plumb the context from transport
if grpcutil.ErrorEqual(err, errReplicaTooOld) {
r.mu.Lock()
repID := r.mu.replicaID
r.mu.Unlock()
log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID)

if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil {
log.Errorf(ctx, "%s: unable to add replica %d to GC queue: %s", r, repID, err)
}
return
}
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Warningf(ctx,
"%s: outgoing raft transport stream to %s closed by the remote: %s",
r, toReplica, err)
}
})

if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions storage/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ func (rs *replicaScanner) Count() int64 {
return rs.count
}

// SetDisabled turns replica scanning off or on as directed. Note that while
// disabled, removals are still processed.
func (rs *replicaScanner) SetDisabled(disabled bool) {
if disabled {
atomic.StoreInt32(&rs.disabled, 1)
} else {
atomic.StoreInt32(&rs.disabled, 0)
}
}

// avgScan returns the average scan time of each scan cycle. Used in unittests.
func (rs *replicaScanner) avgScan() time.Duration {
rs.completedScan.L.Lock()
Expand Down
11 changes: 9 additions & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ type StoreTestingKnobs struct {
DisableSplitQueue bool
// DisableReplicateQueue disables the replication queue.
DisableReplicateQueue bool
// DisableScanner disables the replica scanner.
DisableScanner bool
}

var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}
Expand Down Expand Up @@ -911,6 +913,9 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip
if ctx.TestingKnobs.DisableReplicateQueue {
s.setReplicateQueueActive(false)
}
if ctx.TestingKnobs.DisableScanner {
s.setScannerActive(false)
}

return s
}
Expand Down Expand Up @@ -2356,8 +2361,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
}
// It's not a current member of the group. Is it from the past?
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d",
req.RangeID, req.FromReplica, desc.NextReplicaID)
return errReplicaTooOld
}
}

Expand Down Expand Up @@ -2923,3 +2927,6 @@ func (s *Store) setReplicateQueueActive(active bool) {
func (s *Store) setSplitQueueActive(active bool) {
s.splitQueue.SetDisabled(!active)
}
func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}
8 changes: 8 additions & 0 deletions util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func IsClosedConnection(err error) bool {
}
return netutil.IsClosedConnection(err)
}

// ErrorEqual checks two grpc errors for equality. We check structural
// equality, not pointer equality as you would get by comparing the error
// interface.
func ErrorEqual(err1, err2 error) bool {
return grpc.Code(err1) == grpc.Code(err2) &&
grpc.ErrorDesc(err1) == grpc.ErrorDesc(err2)
}

0 comments on commit 98d673d

Please sign in to comment.