Skip to content

Commit

Permalink
storage: ensure Replica objects never change replicaID
Browse files Browse the repository at this point in the history
WARNING: this change needs more testing to target the changes it makes
and at least some of the disabled tests should be reworked. This is a big
and scary change at this point in the cycle so I'm getting it out before
I'm really happy with it. There are some known TODOs.

On the plus side it does not seem to reproduce any crashes in hours with the
`partitionccl.TestRepartitioning` which readily produces crashes on master
when run under roachprod stress within ~20 minutes.

We've seen instability recently due to invariants being violated as
replicas catch up across periods of being removed and re-added to a range.
Due to learner replicas and their rollback behavior this is now a relatively
common case. Rather than handle all of these various scenarios this PR prevents
them from occuring by actively removing replicas when we determine that they
must have been removed.

Here's a high level overview of the change:

 * Once a Replica object has a non-zero Replica.mu.replicaID it will not
   change.
 * If a raft message or snapshot addressed to a higher replica ID is received
   the current replica will be removed completely.
 * If a replica sees a ChangeReplicasTrigger which removes it then it
   completely removes itself while applying that command.
 * Replica.mu.destroyStatus is used to meaningfully signify the removal state
   of a Replica. Replicas about to be synchronously removed are in
   destroyReasonRemovalPending.
 * The queues are now replica ID aware. If a replica was added to the queue
   and the replica found when trying to pop are not the same and we knew the
   replica ID of replica when we added it then we should not process it.

This hopefully gives us some new invariants:

 * There is only ever at most 1 *Replica which IsAlive() for a range on a store
   at a time.
 * Once a *Replica has a non-zero ReplicaID is never changes.

The change also introduces some new complexity. Namely we now allow removal of
uninitialized replicas, including their hard state. This allows us to catch up
across a split even when we know the RHS must have been removed.

Fixes #40367.

Release justification: This commit is safe for 19.2 because it fixes release
blockers.

Release note (bug fix): Fix crashes by preventing replica ID change.
  • Loading branch information
ajwerner committed Sep 16, 2019
1 parent 5cee194 commit 6b42756
Show file tree
Hide file tree
Showing 30 changed files with 877 additions and 480 deletions.
13 changes: 7 additions & 6 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,16 @@ func TestBackupRestorePartitioned(t *testing.T) {
// because EXPERIMENTAL_RELOCATE can fail if there are other replication
// changes happening.
testutils.SucceedsSoon(t, func() error {
sqlDB.Exec(t, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`)
return nil
_, err := sqlDB.DB.ExecContext(ctx, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 0)`)
return err
})
testutils.SucceedsSoon(t, func() error {
sqlDB.Exec(t, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 100)`)
return nil
_, err := sqlDB.DB.ExecContext(ctx, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[2], 100)`)
return err
})
testutils.SucceedsSoon(t, func() error {
sqlDB.Exec(t, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 200)`)
return nil
_, err := sqlDB.DB.ExecContext(ctx, `ALTER TABLE data.bank EXPERIMENTAL_RELOCATE VALUES (ARRAY[3], 200)`)
return err
})

const localFoo1 = localFoo + "/1"
Expand Down Expand Up @@ -477,6 +477,7 @@ func backupAndRestore(
t.Fatal("unexpected span start at primary index")
}
}

}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,9 +1121,12 @@ func setupPartitioningTestCluster(
cfg.NumReplicas = proto.Int32(1)

tsArgs := func(attr string) base.TestServerArgs {
s := cluster.MakeTestingClusterSettingsWithVersion(cluster.BinaryMinimumSupportedVersion, cluster.BinaryServerVersion)
return base.TestServerArgs{
Settings: s,
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
BootstrapVersion: &cluster.ClusterVersion{roachpb.Version{Major: 19, Minor: 1, Unstable: 0}},
// Disable LBS because when the scan is happening at the rate it's happening
// below, it's possible that one of the system ranges trigger a split.
DisableLoadBasedSplitting: true,
Expand All @@ -1139,6 +1142,7 @@ func setupPartitioningTestCluster(
UseDatabase: "data",
}
}

tcArgs := base.TestClusterArgs{ServerArgsPerNode: map[int]base.TestServerArgs{
0: tsArgs("n1"),
1: tsArgs("n2"),
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/apply/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package apply

import (
"context"
"errors"

"go.etcd.io/etcd/raft/raftpb"
)
Expand Down Expand Up @@ -54,6 +55,10 @@ type StateMachine interface {
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
}

// ErrRemoved can be returned from ApplySideEffects which will stop the
// task from processing more commands and return immediately.
var ErrRemoved = errors.New("replica removed")

// Batch accumulates a series of updates from Commands and performs them
// all at once to its StateMachine when applied. Groups of Commands will be
// staged in the Batch such that one or more trivial Commands are staged or
Expand Down
42 changes: 37 additions & 5 deletions pkg/storage/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1633,6 +1633,7 @@ func TestStoreRangeMergeConcurrentRequests(t *testing.T) {
func TestStoreReplicaGCAfterMerge(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("this test seems byzantine, right? ")
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
Expand Down Expand Up @@ -1662,10 +1663,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
for _, rangeID := range []roachpb.RangeID{lhsDesc.RangeID, rhsDesc.RangeID} {
repl, err := store1.GetReplica(rangeID)
if err != nil {
t.Fatal(err)
continue
}
if err := store1.ManualReplicaGC(repl); err != nil {
t.Fatal(err)
t.Logf("replica was already removed: %v", err)
}
if _, err := store1.GetReplica(rangeID); err == nil {
t.Fatalf("replica of r%d not gc'd from s1", rangeID)
Expand Down Expand Up @@ -2035,6 +2036,7 @@ func TestStoreRangeMergeSlowAbandonedFollower(t *testing.T) {
func TestStoreRangeMergeAbandonedFollowers(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Skip("these invariants are no longer true")
ctx := context.Background()
storeCfg := storage.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
Expand Down Expand Up @@ -2876,6 +2878,33 @@ func (h *unreliableRaftHandler) HandleRaftResponse(
return h.RaftMessageHandler.HandleRaftResponse(ctx, resp)
}

// mtcStoreRaftMessageHandler exists to allows a store to be stopped and
// restarted while maintaining a partition using an unreliableRaftHandler.
type mtcStoreRaftMessageHandler struct {
mtc *multiTestContext
storeIdx int
}

func (h *mtcStoreRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
return h.mtc.stores[h.storeIdx].HandleRaftRequest(ctx, req, respStream)
}

func (h *mtcStoreRaftMessageHandler) HandleRaftResponse(
ctx context.Context, resp *storage.RaftMessageResponse,
) error {
return h.mtc.stores[h.storeIdx].HandleRaftResponse(ctx, resp)
}

func (h *mtcStoreRaftMessageHandler) HandleSnapshot(
header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream,
) error {
return h.mtc.stores[h.storeIdx].HandleSnapshot(header, respStream)
}

func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down Expand Up @@ -3353,9 +3382,12 @@ func TestMergeQueue(t *testing.T) {
t.Run("non-collocated", func(t *testing.T) {
reset(t)
verifyUnmerged(t)
mtc.replicateRange(rhs().RangeID, 1)
mtc.transferLease(ctx, rhs().RangeID, 0, 1)
mtc.unreplicateRange(rhs().RangeID, 0)
rhsRangeID := rhs().RangeID
mtc.replicateRange(rhsRangeID, 1)
mtc.transferLease(ctx, rhsRangeID, 0, 1)
mtc.unreplicateRange(rhsRangeID, 0)
require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t)
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)

func checkGauge(t *testing.T, id string, g *metric.Gauge, e int64) {
Expand Down Expand Up @@ -313,8 +314,9 @@ func TestStoreMetrics(t *testing.T) {
return mtc.unreplicateRangeNonFatal(replica.RangeID, 0)
})

// Force GC Scan on store 0 in order to fully remove range.
mtc.stores[1].MustForceReplicaGCScanAndProcess()
// Wait until we're sure that store 0 has successfully processed its removal.
require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0))

mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5})

// Verify range count is as expected.
Expand Down
79 changes: 65 additions & 14 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,15 +1192,12 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) {
return err
}

if err := replicateRHS(); !testutils.IsError(err, storage.IntersectingSnapshotMsg) {
t.Fatalf("unexpected error %v", err)
}

// Enable the replica GC queue so that the next attempt to replicate the RHS
// to store 2 will cause the obsolete replica to be GC'd allowing a
// subsequent replication to succeed.
mtc.stores[2].SetReplicaGCQueueActive(true)

// This used to fail with IntersectingSnapshotMsg because we relied on replica
// GC to remove the LHS and that queue is disabled. Now we will detect that
// the LHS is not part of the range because of a ReplicaTooOldError and then
// we'll replicaGC the LHS in response.
// TODO(ajwerner): filter the reponses to node 2 or disable this eager
// replicaGC.
testutils.SucceedsSoon(t, replicateRHS)
}

Expand Down Expand Up @@ -2992,6 +2989,52 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
defer mtc.Stop()
mtc.Start(t, 3)

// We're going to set up the cluster with partitioning so that we can
// partition node 0 from the others. We do this by installing
// unreliableRaftHandler listeners on all three Stores which we can enable
// and disable with an atomic. The handler on the partitioned store filters
// out all messages while the handler on the other two stores only filters
// out messages from the partitioned store. When activated the configuration
// looks like:
//
// [0]
// x x
// / \
// x x
// [1]<---->[2]
const partStore = 0
var partitioned atomic.Value
partitioned.Store(false)
partRepl, err := mtc.stores[partStore].GetReplica(1)
if err != nil {
t.Fatal(err)
}
partReplDesc, err := partRepl.GetReplicaDescriptor()
if err != nil {
t.Fatal(err)
}
for _, s := range []int{0, 1, 2} {
s := s
h := &unreliableRaftHandler{
rangeID: 1,
RaftMessageHandler: &mtcStoreRaftMessageHandler{
mtc: mtc,
storeIdx: s,
},
}
// Only filter messages from the partitioned store on the other
// two stores.
h.dropReq = func(req *storage.RaftMessageRequest) bool {
return partitioned.Load().(bool) &&
(s == partStore || req.FromReplica.StoreID == partRepl.StoreID())
}
h.dropHB = func(hb *storage.RaftHeartbeat) bool {
return partitioned.Load().(bool) &&
(s == partStore || hb.FromReplicaID == partReplDesc.ReplicaID)
}
mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h)
}

// First put the range on all three nodes.
raftID := roachpb.RangeID(1)
mtc.replicateRange(raftID, 1, 2)
Expand Down Expand Up @@ -3036,7 +3079,9 @@ func TestReplicateRogueRemovedNode(t *testing.T) {
}
return nil
})

// Partition nodes 1 and 2 from node 0. Otherwise they'd get a
// ReplicaTooOldError from node 0 and proceed to remove themselves.
partitioned.Store(true)
// Bring node 2 back up.
mtc.restartStore(2)

Expand Down Expand Up @@ -3537,6 +3582,11 @@ func TestRemovedReplicaError(t *testing.T) {
func TestRemoveRangeWithoutGC(t *testing.T) {
defer leaktest.AfterTest(t)()

// TODO(ajwerner): update this test to create the scenario where we do
// not process the remove and then shut down the node and restart it.
// Perhaps add a testing flag.
t.Skip("we now will remove the replica")

sc := storage.TestStoreConfig(nil)
sc.TestingKnobs.DisableReplicaGCQueue = true
mtc := &multiTestContext{storeConfig: &sc}
Expand All @@ -3555,18 +3605,19 @@ func TestRemoveRangeWithoutGC(t *testing.T) {
if err != nil {
return err
}
desc := rep.Desc()
if len(desc.InternalReplicas) != 1 {
return errors.Errorf("range has %d replicas", len(desc.InternalReplicas))
if _, err := rep.IsDestroyed(); err == nil {
return errors.Errorf("range is still alive")
}
return nil
})

// The replica's data is still on disk.
// We use an inconsistent scan because there's going to be an intent on the
// range descriptor to remove this replica.
var desc roachpb.RangeDescriptor
descKey := keys.RangeDescriptorKey(roachpb.RKeyMin)
if ok, err := engine.MVCCGetProto(context.Background(), mtc.stores[0].Engine(), descKey,
mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{}); err != nil {
mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{Inconsistent: true}); err != nil {
t.Fatal(err)
} else if !ok {
t.Fatal("expected range descriptor to be present")
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
// removes a range from a store that no longer should have a replica.
func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("the range will be removed synchronously now")

mtc := &multiTestContext{}
defer mtc.Stop()
Expand Down
11 changes: 8 additions & 3 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3314,8 +3314,12 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
// different replicaID than the split trigger expects.
add := func() {
_, err := tc.AddReplicas(kRHS, tc.Target(1))
if !testutils.IsError(err, `snapshot intersects existing range`) {
t.Fatalf(`expected snapshot intersects existing range" error got: %+v`, err)
// The "snapshot intersects existing range" error is expected if the store
// has not heard a raft message addressed to a later replica ID while the
// "was not found on" error is expected if the store has heard that it has
// a newer replica ID before receiving the snapshot.
if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) {
t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err)
}
}
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -3361,7 +3365,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
if err != nil {
return err
}
if desc := repl.Desc(); !descLHS.Equal(desc) {
if desc := repl.Desc(); desc.IsInitialized() && !descLHS.Equal(desc) {
require.NoError(t, store.ManualReplicaGC(repl))
return errors.Errorf("expected %s got %s", &descLHS, desc)
}
return nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,21 @@ func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, des
return err
}

func (m *multiTestContext) waitForUnreplicated(rangeID roachpb.RangeID, dest int) error {
// Wait for the unreplications to complete on destination node.
return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
_, err := m.stores[dest].GetReplica(rangeID)
switch err.(type) {
case nil:
return fmt.Errorf("replica still exists on dest %d", dest)
case *roachpb.RangeNotFoundError:
return nil
default:
return err
}
})
}

// readIntFromEngines reads the current integer value at the given key
// from all configured engines, filling in zeros when the value is not
// found. Returns a slice of the same length as mtc.engines.
Expand Down
15 changes: 4 additions & 11 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,6 @@ func NewTestStorePool(cfg StoreConfig) *StorePool {
)
}

func (r *Replica) ReplicaID() roachpb.ReplicaID {
r.mu.RLock()
defer r.mu.RUnlock()
return r.ReplicaIDLocked()
}

func (r *Replica) ReplicaIDLocked() roachpb.ReplicaID {
return r.mu.replicaID
}

func (r *Replica) AssertState(ctx context.Context, reader engine.Reader) {
r.raftMu.Lock()
defer r.raftMu.Unlock()
Expand Down Expand Up @@ -269,10 +259,13 @@ func (r *Replica) InitQuotaPool(quota uint64) error {
r.mu.Lock()
defer r.mu.Unlock()
var appliedIndex uint64
err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) {
isRemoved, err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) {
appliedIndex = r.BasicStatus().Applied
return false, nil
})
if isRemoved {
_, err = r.IsDestroyed()
}
if err != nil {
return err
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,11 @@ func (mq *mergeQueue) process(
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
return err
}

rhsDesc, err = removeLearners(ctx, db, rhsDesc)
if err != nil {
log.VEventf(ctx, 2, `%v`, err)
Expand Down
Loading

0 comments on commit 6b42756

Please sign in to comment.