Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: eagerly GC replicas on raft transport errors #8172

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build/check-style.sh
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TestVet() {
grep -vE '^vet: cannot process directory \.git' | \
grep -vE '\.pb\.gw\.go:[0-9]+: declaration of "?ctx"? shadows' | \
grep -vE 'declaration of "?(pE|e)rr"? shadows' | \
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries))\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
grep -vE '^(server/(serverpb/admin|serverpb/status|admin|status)|(ts|ts/tspb)/(server|timeseries)|storage/replica)\..*\go:[0-9]+: constant [0-9]+ not a string in call to Errorf'
# To return proper HTTP error codes (e.g. 404 Not Found), we need to use
# grpc.Errorf, which has an error code as its first parameter. 'go vet'
# doesn't like that the first parameter isn't a format string.
Expand Down
56 changes: 55 additions & 1 deletion storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1565,7 +1565,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
},
})

if err := <-errChan; !testutils.IsError(err, "older than NextReplicaID") {
if err := <-errChan; !testutils.IsError(err, "sender replica too old, discarding message") {
t.Fatalf("got unexpected error: %v", err)
}

Expand All @@ -1577,6 +1577,60 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
}
}

func TestReplicaTooOldGC(t *testing.T) {
defer leaktest.AfterTest(t)()

sc := storage.TestStoreContext()
sc.TestingKnobs.DisableScanner = true
mtc := multiTestContext{storeContext: &sc}
mtc.Start(t, 4)
defer mtc.Stop()

// Replicate the first range onto all of the nodes.
const rangeID = 1
mtc.replicateRange(rangeID, 1, 2, 3)

// Put some data in the range so we'll have something to test for.
incArgs := incrementArgs([]byte("a"), 5)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
// Wait for all nodes to catch up.
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5})

// Verify store 3 has the replica.
if _, err := mtc.stores[3].GetReplica(rangeID); err != nil {
t.Fatal(err)
}

// Stop node 3; while it is down remove the range from it. Since the node is
// down it won't see the removal and won't clean up its replica.
mtc.stopStore(3)
mtc.unreplicateRange(rangeID, 3)

// Perform another write.
incArgs = incrementArgs([]byte("a"), 11)
if _, err := client.SendWrapped(rg1(mtc.stores[0]), nil, &incArgs); err != nil {
t.Fatal(err)
}
mtc.waitForValues(roachpb.Key("a"), []int64{16, 16, 16, 5})

// Restart node 3. The removed replica will start talking to the other
// replicas and determine it needs to be GC'd.
mtc.restartStore(3)

util.SucceedsSoon(t, func() error {
replica, err := mtc.stores[3].GetReplica(rangeID)
if err != nil {
if _, ok := err.(*roachpb.RangeNotFoundError); ok {
return nil
}
return err
}
return errors.Errorf("found %s, waiting for it to be GC'd", replica)
})
}

func TestReplicateReAddAfterDown(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
12 changes: 0 additions & 12 deletions storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
package storage

import (
"sync/atomic"

"github.com/cockroachdb/cockroach/internal/client"
"github.com/cockroachdb/cockroach/roachpb"
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
Expand Down Expand Up @@ -141,16 +139,6 @@ func (r *Replica) GetLastIndex() (uint64, error) {
return r.LastIndex()
}

// SetDisabled turns replica scanning off or on as directed. Note that while
// disabled, removals are still processed.
func (rs *replicaScanner) SetDisabled(disabled bool) {
if disabled {
atomic.StoreInt32(&rs.disabled, 1)
} else {
atomic.StoreInt32(&rs.disabled, 0)
}
}

// GetLease exposes replica.getLease for tests.
func (r *Replica) GetLease() (*roachpb.Lease, *roachpb.Lease) {
return r.getLease()
Expand Down
34 changes: 31 additions & 3 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"sync/atomic"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/kr/pretty"
Expand All @@ -46,6 +49,7 @@ import (
"github.com/cockroachdb/cockroach/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/storage/storagebase"
"github.com/cockroachdb/cockroach/util/encoding"
"github.com/cockroachdb/cockroach/util/grpcutil"
"github.com/cockroachdb/cockroach/util/hlc"
"github.com/cockroachdb/cockroach/util/log"
"github.com/cockroachdb/cockroach/util/protoutil"
Expand Down Expand Up @@ -145,6 +149,8 @@ func updatesTimestampCache(r roachpb.Request) bool {
return updatesTimestampCacheMethods[m]
}

var errReplicaTooOld = grpc.Errorf(codes.Aborted, "sender replica too old, discarding message")

// A pendingCmd holds a done channel for a command sent to Raft. Once
// committed to the Raft log, the command is executed and the result returned
// via the done channel.
Expand Down Expand Up @@ -333,11 +339,33 @@ func NewReplica(desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.R
RangeID: desc.RangeID,
store: store,
abortCache: NewAbortCache(desc.RangeID),
raftSender: store.ctx.Transport.MakeSender(func(err error, toReplica roachpb.ReplicaDescriptor) {
log.Warningf(context.TODO(), "range %d: outgoing raft transport stream to %s closed by the remote: %v", desc.RangeID, toReplica, err)
}),
}

r.raftSender = store.ctx.Transport.MakeSender(
func(err error, toReplica roachpb.ReplicaDescriptor) {
ctx := context.TODO() // plumb the context from transport
if grpcutil.ErrorEqual(err, errReplicaTooOld) {
if err := r.store.Stopper().RunTask(func() {
r.mu.Lock()
repID := r.mu.replicaID
r.mu.Unlock()
log.Infof(ctx, "%s: replica %d too old, adding to replica GC queue", r, repID)

if err := r.store.replicaGCQueue.Add(r, 1.0); err != nil {
log.Errorf(ctx, "%s: unable to add replica %d to GC queue: %s", r, repID, err)
}
}); err != nil {
log.Errorf(ctx, "%s: %s", r, err)
}
return
}
if err != nil && !grpcutil.IsClosedConnection(err) {
log.Warningf(ctx,
"%s: outgoing raft transport stream to %s closed by the remote: %s",
r, toReplica, err)
}
})

if err := r.newReplicaInner(desc, store.Clock(), replicaID); err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions storage/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ func (rs *replicaScanner) Count() int64 {
return rs.count
}

// SetDisabled turns replica scanning off or on as directed. Note that while
// disabled, removals are still processed.
func (rs *replicaScanner) SetDisabled(disabled bool) {
if disabled {
atomic.StoreInt32(&rs.disabled, 1)
} else {
atomic.StoreInt32(&rs.disabled, 0)
}
}

// avgScan returns the average scan time of each scan cycle. Used in unittests.
func (rs *replicaScanner) avgScan() time.Duration {
rs.completedScan.L.Lock()
Expand Down
11 changes: 9 additions & 2 deletions storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,8 @@ type StoreTestingKnobs struct {
DisableSplitQueue bool
// DisableReplicateQueue disables the replication queue.
DisableReplicateQueue bool
// DisableScanner disables the replica scanner.
DisableScanner bool
}

var _ base.ModuleTestingKnobs = &StoreTestingKnobs{}
Expand Down Expand Up @@ -911,6 +913,9 @@ func NewStore(ctx StoreContext, eng engine.Engine, nodeDesc *roachpb.NodeDescrip
if ctx.TestingKnobs.DisableReplicateQueue {
s.setReplicateQueueActive(false)
}
if ctx.TestingKnobs.DisableScanner {
s.setScannerActive(false)
}

return s
}
Expand Down Expand Up @@ -2356,8 +2361,7 @@ func (s *Store) handleRaftMessage(req *RaftMessageRequest) error {
}
// It's not a current member of the group. Is it from the past?
if !found && req.FromReplica.ReplicaID < desc.NextReplicaID {
return errors.Errorf("range %s: discarding message from %+v, older than NextReplicaID %d",
req.RangeID, req.FromReplica, desc.NextReplicaID)
return errReplicaTooOld
}
}

Expand Down Expand Up @@ -2923,3 +2927,6 @@ func (s *Store) setReplicateQueueActive(active bool) {
func (s *Store) setSplitQueueActive(active bool) {
s.splitQueue.SetDisabled(!active)
}
func (s *Store) setScannerActive(active bool) {
s.scanner.SetDisabled(!active)
}
8 changes: 8 additions & 0 deletions util/grpcutil/grpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func IsClosedConnection(err error) bool {
}
return netutil.IsClosedConnection(err)
}

// ErrorEqual checks two grpc errors for equality. We check structural
// equality, not pointer equality as you would get by comparing the error
// interface.
func ErrorEqual(err1, err2 error) bool {
return grpc.Code(err1) == grpc.Code(err2) &&
grpc.ErrorDesc(err1) == grpc.ErrorDesc(err2)
}