Skip to content

Commit

Permalink
storage: ensure Replica objects never change replicaID
Browse files Browse the repository at this point in the history
We've seen instability recently due to invariants being violated as
replicas catch up across periods of being removed and re-added to a range.
Due to learner replicas and their rollback behavior this is now a relatively
common case. Rather than handle all of these various scenarios this PR prevents
them from occuring by actively removing replicas when we determine that they
must have been removed.

Here's a high level overview of the change:

 * Once a Replica object has a non-zero Replica.mu.replicaID it will not
   change.
 * If a raft message or snapshot addressed to a higher replica ID is received
   the current replica will be removed completely.
 * If a replica sees a ChangeReplicasTrigger which removes it then it
   completely removes itself while applying that command.
 * Replica.mu.destroyStatus is used to meaningfully signify the removal state
   of a Replica. Replicas about to be synchronously removed are in
   destroyReasonRemovalPending.

This hopefully gives us some new invariants:

 * There is only ever at most 1 *Replica which IsAlive() for a range on a store
   at a time.
 * Once a *Replica has a non-zero ReplicaID is never changes.

The change also introduces some new complexity. Namely we now allow removal of
uninitialized replicas, including their hard state. This allows us to catch up
across a split even when we know the RHS must have been removed.

Fixes #40367.

Release justification: This commit is safe for 19.2 because it fixes release
blockers.

Release note (bug fix): Fix crashes by preventing replica ID change.
  • Loading branch information
ajwerner committed Sep 18, 2019
1 parent 5c9ea3b commit f579994
Show file tree
Hide file tree
Showing 27 changed files with 1,103 additions and 613 deletions.
5 changes: 5 additions & 0 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package apply

import (
"context"
"errors"

"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -54,6 +55,10 @@ type StateMachine interface {
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the
// task from processing more commands and return immediately.
var ErrRemoved = errors.New("replica removed")

// Batch accumulates a series of updates from Commands and performs them
// all at once to its StateMachine when applied. Groups of Commands will be
// staged in the Batch such that one or more trivial Commands are staged or
Expand Down
80 changes: 8 additions & 72 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft"
"go.etcd.io/etcd/raft/raftpb"
)

Expand Down Expand Up @@ -1638,6 +1637,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
storeCfg.TestingKnobs.DisableReplicateQueue = true
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 2)
defer mtc.Stop()
Expand Down Expand Up @@ -2041,6 +2041,7 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) {
storeCfg.TestingKnobs.DisableReplicaGCQueue = true
storeCfg.TestingKnobs.DisableSplitQueue = true
storeCfg.TestingKnobs.DisableMergeQueue = true
storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true
mtc := &multiTestContext{storeConfig: &storeCfg}
mtc.Start(t, 3)
defer mtc.Stop()
Expand Down Expand Up @@ -2808,74 +2809,6 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) {
}
}

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
log.Infof(
ctx,
"dropping Raft message %s",
raft.DescribeMessage(req.Message, func([]byte) string {
return "<omitted>"
}),
)

return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
}

func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -3353,9 +3286,12 @@ func TestMergeQueue(t *testing.T) {
t.Run("non-collocated", func(t *testing.T) {
reset(t)
verifyUnmerged(t)
mtc.replicateRange(rhs().RangeID, 1)
mtc.transferLease(ctx, rhs().RangeID, 0, 1)
mtc.unreplicateRange(rhs().RangeID, 0)
rhsRangeID := rhs().RangeID
mtc.replicateRange(rhsRangeID, 1)
mtc.transferLease(ctx, rhsRangeID, 0, 1)
mtc.unreplicateRange(rhsRangeID, 0)
require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t)
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func checkGauge(t *testing.T, id string, g *metric.Gauge, e int64) {
Expand Down Expand Up @@ -313,8 +314,9 @@ func TestStoreMetrics(t *testing.T) {
return mtc.unreplicateRangeNonFatal(replica.RangeID, 0)
})

// Force GC Scan on store 0 in order to fully remove range.
mtc.stores[1].MustForceReplicaGCScanAndProcess()
// Wait until we're sure that store 0 has successfully processed its removal.
require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0))

mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5})

// Verify range count is as expected.
Expand Down
191 changes: 191 additions & 0 deletions pkg/storage/client_raft_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package storage_test

import (
"context"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"go.etcd.io/etcd/raft"
)

// unreliableRaftHandler drops all Raft messages that are addressed to the
// specified rangeID, but lets all other messages through.
type unreliableRaftHandler struct {
rangeID roachpb.RangeID
storage.RaftMessageHandler
// If non-nil, can return false to avoid dropping a msg to rangeID
dropReq func(*storage.RaftMessageRequest) bool
dropHB func(*storage.RaftHeartbeat) bool
dropResp func(*storage.RaftMessageResponse) bool
}

func (h *unreliableRaftHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 {
reqCpy := *req
req = &reqCpy
req.Heartbeats = h.filterHeartbeats(req.Heartbeats)
req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps)
if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 {
// Entirely filtered.
return nil
}
} else if req.RangeID == h.rangeID {
if h.dropReq == nil || h.dropReq(req) {
log.Infof(
ctx,
"dropping Raft message %s",
raft.DescribeMessage(req.Message, func([]byte) string {
return "<omitted>"
}),
)

return nil
}
}
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}

func (h *unreliableRaftHandler) filterHeartbeats(
hbs []storage.RaftHeartbeat,
) []storage.RaftHeartbeat {
if len(hbs) == 0 {
return hbs
}
var cpy []storage.RaftHeartbeat
for i := range hbs {
hb := &hbs[i]
if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) {
cpy = append(cpy, *hb)
}
}
return cpy
}

func (h *unreliableRaftHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
if resp.RangeID == h.rangeID {
if h.dropResp == nil || h.dropResp(resp) {
return nil
}
}
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
}

// mtcStoreRaftMessageHandler exists to allows a store to be stopped and
// restarted while maintaining a partition using an unreliableRaftHandler.
type mtcStoreRaftMessageHandler struct {
mtc *multiTestContext
storeIdx int
}

func (h *mtcStoreRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
return h.mtc.Store(h.storeIdx).HandleRaftRequest(ctx, req, respStream)
}

func (h *mtcStoreRaftMessageHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
return h.mtc.Store(h.storeIdx).HandleRaftResponse(ctx, resp)
}

func (h *mtcStoreRaftMessageHandler) HandleSnapshot(
header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream,
) error {
return h.mtc.Store(h.storeIdx).HandleSnapshot(header, respStream)
}

// mtcPartitionedRange is a convenient abstraction to create a range on a node
// in a multiTestContext which can be partitioned and unpartitioned.
type mtcPartitionedRange struct {
rangeID roachpb.RangeID
partitionedNode int
partitioned atomic.Value
}

// setupPartitionedRange sets up an mtcPartitionedRange for the provided mtc,
// rangeID, and node index in the mtc. The range is initially not partitioned.
//
// We're going to set up the cluster with partitioning so that we can
// partition node p from the others. We do this by installing
// unreliableRaftHandler listeners on all three Stores which we can enable
// and disable with an atomic. The handler on the partitioned store filters
// out all messages while the handler on the other two stores only filters
// out messages from the partitioned store. When activated the configuration
// looks like:
//
// [p]
// x x
// / \
// x x
// [*]<---->[*]
//
func setupPartitionedRange(
mtc *multiTestContext, rangeID roachpb.RangeID, partitionedNode int,
) (*mtcPartitionedRange, error) {
partRange := &mtcPartitionedRange{
rangeID: rangeID,
partitionedNode: partitionedNode,
}
partRange.partitioned.Store(false)

partRange.partitioned.Store(false)
partRepl, err := mtc.stores[partitionedNode].GetReplica(rangeID)
if err != nil {
return nil, err
}
partReplDesc, err := partRepl.GetReplicaDescriptor()
if err != nil {
return nil, err
}
for i := range mtc.stores {
s := i
h := &unreliableRaftHandler{
rangeID: rangeID,
RaftMessageHandler: &mtcStoreRaftMessageHandler{
mtc: mtc,
storeIdx: s,
},
}
// Only filter messages from the partitioned store on the other
// two stores.
h.dropReq = func(req *storage.RaftMessageRequest) bool {
return partRange.partitioned.Load().(bool) &&
(s == partitionedNode || req.FromReplica.StoreID == partRepl.StoreID())
}
h.dropHB = func(hb *storage.RaftHeartbeat) bool {
return partRange.partitioned.Load().(bool) &&
(s == partitionedNode || hb.FromReplicaID == partReplDesc.ReplicaID)
}
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h)
}
return partRange, nil
}

func (pr *mtcPartitionedRange) activate() {
pr.partitioned.Store(true)
}

func (pr *mtcPartitionedRange) deactivate() {
pr.partitioned.Store(false)
}
Loading

0 comments on commit f579994

Please sign in to comment.