From 23586068e5543ca07a5c4df0a69cbfe4d02e23f4 Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Wed, 4 Sep 2019 11:25:27 -0400 Subject: [PATCH] storage: ensure Replica objects never change replicaID WARNING: this change needs more testing to target the changes it makes and at least some of the disabled tests should be reworked. This is a big and scary change at this point in the cycle so I'm getting it out before I'm really happy with it. There are some known TODOs. On the plus side it does not seem to reproduce any crashes in hours with the `partitionccl.TestRepartitioning` which readily produces crashes on master when run under roachprod stress within ~20 minutes. We've seen instability recently due to invariants being violated as replicas catch up across periods of being removed and re-added to a range. Due to learner replicas and their rollback behavior this is now a relatively common case. Rather than handle all of these various scenarios this PR prevents them from occuring by actively removing replicas when we determine that they must have been removed. Here's a high level overview of the change: * Once a Replica object has a non-zero Replica.mu.replicaID it will not change. * If a raft message or snapshot addressed to a higher replica ID is received the current replica will be removed completely. * If a replica sees a ChangeReplicasTrigger which removes it then it completely removes itself while applying that command. * Replica.mu.destroyStatus is used to meaningfully signify the removal state of a Replica. Replicas about to be synchronously removed are in destroyReasonRemovalPending. * The queues are now replica ID aware. If a replica was added to the queue and the replica found when trying to pop are not the same and we knew the replica ID of replica when we added it then we should not process it. This hopefully gives us some new invariants: * There is only ever at most 1 *Replica which IsAlive() for a range on a store at a time. * Once a *Replica has a non-zero ReplicaID is never changes. The change also introduces some new complexity. Namely we now allow removal of uninitialized replicas, including their hard state. This allows us to catch up across a split even when we know the RHS must have been removed. Fixes #40367. Release justification: This commit is safe for 19.2 because it fixes release blockers. Release note (bug fix): Fix crashes by preventing replica ID change. --- pkg/storage/apply/task.go | 5 + pkg/storage/client_merge_test.go | 84 +--- pkg/storage/client_metrics_test.go | 6 +- pkg/storage/client_raft_helpers_test.go | 115 +++++ pkg/storage/client_raft_test.go | 75 ++- pkg/storage/client_replica_gc_test.go | 5 +- pkg/storage/client_split_test.go | 11 +- pkg/storage/client_test.go | 15 + pkg/storage/helpers_test.go | 15 +- pkg/storage/merge_queue.go | 2 - pkg/storage/queue.go | 38 +- pkg/storage/queue_concurrency_test.go | 4 +- pkg/storage/queue_helpers_testutil.go | 2 +- pkg/storage/queue_test.go | 38 ++ pkg/storage/replica.go | 10 + pkg/storage/replica_application_result.go | 41 +- .../replica_application_state_machine.go | 95 +++- pkg/storage/replica_destroy.go | 73 ++- pkg/storage/replica_gc_queue.go | 36 +- pkg/storage/replica_init.go | 67 ++- pkg/storage/replica_learner_test.go | 16 +- pkg/storage/replica_proposal_buf.go | 6 +- pkg/storage/replica_raft.go | 68 ++- pkg/storage/replica_raftstorage.go | 24 +- pkg/storage/replica_test.go | 111 ----- pkg/storage/split_delay_helper.go | 2 +- pkg/storage/store.go | 438 ++++++++++++------ pkg/storage/store_snapshot.go | 69 +-- pkg/storage/store_test.go | 112 +---- pkg/storage/testing_knobs.go | 6 + 30 files changed, 953 insertions(+), 636 deletions(-) create mode 100644 pkg/storage/client_raft_helpers_test.go diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go index 8bfa62acd0a1..26e9ef9511dd 100644 --- a/pkg/storage/apply/task.go +++ b/pkg/storage/apply/task.go @@ -12,6 +12,7 @@ package apply import ( "context" + "errors" "go.etcd.io/etcd/raft/raftpb" ) @@ -54,6 +55,10 @@ type StateMachine interface { ApplySideEffects(CheckedCommand) (AppliedCommand, error) } +// ErrRemoved can be returned from ApplySideEffects which will stop the +// task from processing more commands and return immediately. +var ErrRemoved = errors.New("replica removed") + // Batch accumulates a series of updates from Commands and performs them // all at once to its StateMachine when applied. Groups of Commands will be // staged in the Batch such that one or more trivial Commands are staged or diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index f6073a23ec68..f3ee1338ae62 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -57,7 +57,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" ) @@ -1638,6 +1637,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { storeCfg.TestingKnobs.DisableReplicateQueue = true storeCfg.TestingKnobs.DisableReplicaGCQueue = true storeCfg.TestingKnobs.DisableMergeQueue = true + storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &storeCfg} mtc.Start(t, 2) defer mtc.Stop() @@ -1662,10 +1662,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { for _, rangeID := range []roachpb.RangeID{lhsDesc.RangeID, rhsDesc.RangeID} { repl, err := store1.GetReplica(rangeID) if err != nil { - t.Fatal(err) + continue } if err := store1.ManualReplicaGC(repl); err != nil { - t.Fatal(err) + t.Logf("replica was already removed: %v", err) } if _, err := store1.GetReplica(rangeID); err == nil { t.Fatalf("replica of r%d not gc'd from s1", rangeID) @@ -2041,6 +2041,7 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) { storeCfg.TestingKnobs.DisableReplicaGCQueue = true storeCfg.TestingKnobs.DisableSplitQueue = true storeCfg.TestingKnobs.DisableMergeQueue = true + storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &storeCfg} mtc.Start(t, 3) defer mtc.Stop() @@ -2808,74 +2809,6 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) { } } -// unreliableRaftHandler drops all Raft messages that are addressed to the -// specified rangeID, but lets all other messages through. -type unreliableRaftHandler struct { - rangeID roachpb.RangeID - storage.RaftMessageHandler - // If non-nil, can return false to avoid dropping a msg to rangeID - dropReq func(*storage.RaftMessageRequest) bool - dropHB func(*storage.RaftHeartbeat) bool - dropResp func(*storage.RaftMessageResponse) bool -} - -func (h *unreliableRaftHandler) HandleRaftRequest( - ctx context.Context, - req *storage.RaftMessageRequest, - respStream storage.RaftMessageResponseStream, -) *roachpb.Error { - if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { - reqCpy := *req - req = &reqCpy - req.Heartbeats = h.filterHeartbeats(req.Heartbeats) - req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps) - if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 { - // Entirely filtered. - return nil - } - } else if req.RangeID == h.rangeID { - if h.dropReq == nil || h.dropReq(req) { - log.Infof( - ctx, - "dropping Raft message %s", - raft.DescribeMessage(req.Message, func([]byte) string { - return "" - }), - ) - - return nil - } - } - return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) -} - -func (h *unreliableRaftHandler) filterHeartbeats( - hbs []storage.RaftHeartbeat, -) []storage.RaftHeartbeat { - if len(hbs) == 0 { - return hbs - } - var cpy []storage.RaftHeartbeat - for i := range hbs { - hb := &hbs[i] - if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) { - cpy = append(cpy, *hb) - } - } - return cpy -} - -func (h *unreliableRaftHandler) HandleRaftResponse( - ctx context.Context, resp *storage.RaftMessageResponse, -) error { - if resp.RangeID == h.rangeID { - if h.dropResp == nil || h.dropResp(resp) { - return nil - } - } - return h.RaftMessageHandler.HandleRaftResponse(ctx, resp) -} - func TestStoreRangeMergeRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3353,9 +3286,12 @@ func TestMergeQueue(t *testing.T) { t.Run("non-collocated", func(t *testing.T) { reset(t) verifyUnmerged(t) - mtc.replicateRange(rhs().RangeID, 1) - mtc.transferLease(ctx, rhs().RangeID, 0, 1) - mtc.unreplicateRange(rhs().RangeID, 0) + rhsRangeID := rhs().RangeID + mtc.replicateRange(rhsRangeID, 1) + mtc.transferLease(ctx, rhsRangeID, 0, 1) + mtc.unreplicateRange(rhsRangeID, 0) + require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0)) + clearRange(t, lhsStartKey, rhsEndKey) store.MustForceMergeScanAndProcess() verifyMerged(t) diff --git a/pkg/storage/client_metrics_test.go b/pkg/storage/client_metrics_test.go index 4fd43fcbcc21..c91d386c9e81 100644 --- a/pkg/storage/client_metrics_test.go +++ b/pkg/storage/client_metrics_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func checkGauge(t *testing.T, id string, g *metric.Gauge, e int64) { @@ -313,8 +314,9 @@ func TestStoreMetrics(t *testing.T) { return mtc.unreplicateRangeNonFatal(replica.RangeID, 0) }) - // Force GC Scan on store 0 in order to fully remove range. - mtc.stores[1].MustForceReplicaGCScanAndProcess() + // Wait until we're sure that store 0 has successfully processed its removal. + require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0)) + mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5}) // Verify range count is as expected. diff --git a/pkg/storage/client_raft_helpers_test.go b/pkg/storage/client_raft_helpers_test.go new file mode 100644 index 000000000000..2e05bd9c4602 --- /dev/null +++ b/pkg/storage/client_raft_helpers_test.go @@ -0,0 +1,115 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage_test + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "go.etcd.io/etcd/raft" +) + +// unreliableRaftHandler drops all Raft messages that are addressed to the +// specified rangeID, but lets all other messages through. +type unreliableRaftHandler struct { + rangeID roachpb.RangeID + storage.RaftMessageHandler + // If non-nil, can return false to avoid dropping a msg to rangeID + dropReq func(*storage.RaftMessageRequest) bool + dropHB func(*storage.RaftHeartbeat) bool + dropResp func(*storage.RaftMessageResponse) bool +} + +func (h *unreliableRaftHandler) HandleRaftRequest( + ctx context.Context, + req *storage.RaftMessageRequest, + respStream storage.RaftMessageResponseStream, +) *roachpb.Error { + if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { + reqCpy := *req + req = &reqCpy + req.Heartbeats = h.filterHeartbeats(req.Heartbeats) + req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps) + if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 { + // Entirely filtered. + return nil + } + } else if req.RangeID == h.rangeID { + if h.dropReq == nil || h.dropReq(req) { + log.Infof( + ctx, + "dropping Raft message %s", + raft.DescribeMessage(req.Message, func([]byte) string { + return "" + }), + ) + + return nil + } + } + return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) +} + +func (h *unreliableRaftHandler) filterHeartbeats( + hbs []storage.RaftHeartbeat, +) []storage.RaftHeartbeat { + if len(hbs) == 0 { + return hbs + } + var cpy []storage.RaftHeartbeat + for i := range hbs { + hb := &hbs[i] + if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) { + cpy = append(cpy, *hb) + } + } + return cpy +} + +func (h *unreliableRaftHandler) HandleRaftResponse( + ctx context.Context, resp *storage.RaftMessageResponse, +) error { + if resp.RangeID == h.rangeID { + if h.dropResp == nil || h.dropResp(resp) { + return nil + } + } + return h.RaftMessageHandler.HandleRaftResponse(ctx, resp) +} + +// mtcStoreRaftMessageHandler exists to allows a store to be stopped and +// restarted while maintaining a partition using an unreliableRaftHandler. +type mtcStoreRaftMessageHandler struct { + mtc *multiTestContext + storeIdx int +} + +func (h *mtcStoreRaftMessageHandler) HandleRaftRequest( + ctx context.Context, + req *storage.RaftMessageRequest, + respStream storage.RaftMessageResponseStream, +) *roachpb.Error { + return h.mtc.Store(h.storeIdx).HandleRaftRequest(ctx, req, respStream) +} + +func (h *mtcStoreRaftMessageHandler) HandleRaftResponse( + ctx context.Context, resp *storage.RaftMessageResponse, +) error { + return h.mtc.Store(h.storeIdx).HandleRaftResponse(ctx, resp) +} + +func (h *mtcStoreRaftMessageHandler) HandleSnapshot( + header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream, +) error { + return h.mtc.Store(h.storeIdx).HandleSnapshot(header, respStream) +} diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index e37ea59247de..a53866764b0b 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1192,15 +1192,12 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { return err } - if err := replicateRHS(); !testutils.IsError(err, storage.IntersectingSnapshotMsg) { - t.Fatalf("unexpected error %v", err) - } - - // Enable the replica GC queue so that the next attempt to replicate the RHS - // to store 2 will cause the obsolete replica to be GC'd allowing a - // subsequent replication to succeed. - mtc.stores[2].SetReplicaGCQueueActive(true) - + // This used to fail with IntersectingSnapshotMsg because we relied on replica + // GC to remove the LHS and that queue is disabled. Now we will detect that + // the LHS is not part of the range because of a ReplicaTooOldError and then + // we'll replicaGC the LHS in response. + // TODO(ajwerner): filter the reponses to node 2 or disable this eager + // replicaGC. testutils.SucceedsSoon(t, replicateRHS) } @@ -2992,6 +2989,52 @@ func TestReplicateRogueRemovedNode(t *testing.T) { defer mtc.Stop() mtc.Start(t, 3) + // We're going to set up the cluster with partitioning so that we can + // partition node 0 from the others. We do this by installing + // unreliableRaftHandler listeners on all three Stores which we can enable + // and disable with an atomic. The handler on the partitioned store filters + // out all messages while the handler on the other two stores only filters + // out messages from the partitioned store. When activated the configuration + // looks like: + // + // [0] + // x x + // / \ + // x x + // [1]<---->[2] + const partStore = 0 + var partitioned atomic.Value + partitioned.Store(false) + partRepl, err := mtc.stores[partStore].GetReplica(1) + if err != nil { + t.Fatal(err) + } + partReplDesc, err := partRepl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + for _, s := range []int{0, 1, 2} { + s := s + h := &unreliableRaftHandler{ + rangeID: 1, + RaftMessageHandler: &mtcStoreRaftMessageHandler{ + mtc: mtc, + storeIdx: s, + }, + } + // Only filter messages from the partitioned store on the other + // two stores. + h.dropReq = func(req *storage.RaftMessageRequest) bool { + return partitioned.Load().(bool) && + (s == partStore || req.FromReplica.StoreID == partRepl.StoreID()) + } + h.dropHB = func(hb *storage.RaftHeartbeat) bool { + return partitioned.Load().(bool) && + (s == partStore || hb.FromReplicaID == partReplDesc.ReplicaID) + } + mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h) + } + // First put the range on all three nodes. raftID := roachpb.RangeID(1) mtc.replicateRange(raftID, 1, 2) @@ -3036,7 +3079,9 @@ func TestReplicateRogueRemovedNode(t *testing.T) { } return nil }) - + // Partition nodes 1 and 2 from node 0. Otherwise they'd get a + // ReplicaTooOldError from node 0 and proceed to remove themselves. + partitioned.Store(true) // Bring node 2 back up. mtc.restartStore(2) @@ -3539,6 +3584,7 @@ func TestRemoveRangeWithoutGC(t *testing.T) { sc := storage.TestStoreConfig(nil) sc.TestingKnobs.DisableReplicaGCQueue = true + sc.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &sc} defer mtc.Stop() mtc.Start(t, 2) @@ -3555,18 +3601,19 @@ func TestRemoveRangeWithoutGC(t *testing.T) { if err != nil { return err } - desc := rep.Desc() - if len(desc.InternalReplicas) != 1 { - return errors.Errorf("range has %d replicas", len(desc.InternalReplicas)) + if _, err := rep.IsDestroyed(); err == nil { + return errors.Errorf("range is still alive") } return nil }) // The replica's data is still on disk. + // We use an inconsistent scan because there's going to be an intent on the + // range descriptor to remove this replica. var desc roachpb.RangeDescriptor descKey := keys.RangeDescriptorKey(roachpb.RKeyMin) if ok, err := engine.MVCCGetProto(context.Background(), mtc.stores[0].Engine(), descKey, - mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{}); err != nil { + mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{Inconsistent: true}); err != nil { t.Fatal(err) } else if !ok { t.Fatal("expected range descriptor to be present") diff --git a/pkg/storage/client_replica_gc_test.go b/pkg/storage/client_replica_gc_test.go index 7b8e41cf1b36..4162ffca8d85 100644 --- a/pkg/storage/client_replica_gc_test.go +++ b/pkg/storage/client_replica_gc_test.go @@ -148,8 +148,11 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { // removes a range from a store that no longer should have a replica. func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) { defer leaktest.AfterTest(t)() - mtc := &multiTestContext{} + cfg := storage.TestStoreConfig(nil) + cfg.TestingKnobs.DisableEagerReplicaRemoval = true + mtc.storeConfig = &cfg + defer mtc.Stop() mtc.Start(t, 3) // Disable the replica gc queue to prevent direct removal of replica. diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 6db3d819b5f9..f3cb67ba9fc8 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -3314,8 +3314,12 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { // different replicaID than the split trigger expects. add := func() { _, err := tc.AddReplicas(kRHS, tc.Target(1)) - if !testutils.IsError(err, `snapshot intersects existing range`) { - t.Fatalf(`expected snapshot intersects existing range" error got: %+v`, err) + // The "snapshot intersects existing range" error is expected if the store + // has not heard a raft message addressed to a later replica ID while the + // "was not found on" error is expected if the store has heard that it has + // a newer replica ID before receiving the snapshot. + if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) { + t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err) } } for i := 0; i < 5; i++ { @@ -3361,7 +3365,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { if err != nil { return err } - if desc := repl.Desc(); !descLHS.Equal(desc) { + if desc := repl.Desc(); desc.IsInitialized() && !descLHS.Equal(desc) { + require.NoError(t, store.ManualReplicaGC(repl)) return errors.Errorf("expected %s got %s", &descLHS, desc) } return nil diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 11c57695ff08..ff93915bb36e 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -1243,6 +1243,21 @@ func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, des return err } +func (m *multiTestContext) waitForUnreplicated(rangeID roachpb.RangeID, dest int) error { + // Wait for the unreplications to complete on destination node. + return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + _, err := m.stores[dest].GetReplica(rangeID) + switch err.(type) { + case nil: + return fmt.Errorf("replica still exists on dest %d", dest) + case *roachpb.RangeNotFoundError: + return nil + default: + return err + } + }) +} + // readIntFromEngines reads the current integer value at the given key // from all configured engines, filling in zeros when the value is not // found. Returns a slice of the same length as mtc.engines. diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 3e3530971c45..ea54e03d5bd6 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -221,16 +221,6 @@ func NewTestStorePool(cfg StoreConfig) *StorePool { ) } -func (r *Replica) ReplicaID() roachpb.ReplicaID { - r.mu.RLock() - defer r.mu.RUnlock() - return r.ReplicaIDLocked() -} - -func (r *Replica) ReplicaIDLocked() roachpb.ReplicaID { - return r.mu.replicaID -} - func (r *Replica) AssertState(ctx context.Context, reader engine.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() @@ -269,10 +259,13 @@ func (r *Replica) InitQuotaPool(quota uint64) error { r.mu.Lock() defer r.mu.Unlock() var appliedIndex uint64 - err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { + isRemoved, err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { appliedIndex = r.BasicStatus().Applied return false, nil }) + if isRemoved { + _, err = r.IsDestroyed() + } if err != nil { return err } diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 208499998750..7db6813b146b 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -291,13 +291,11 @@ func (mq *mergeQueue) process( log.VEventf(ctx, 2, `%v`, err) return err } - rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, rhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) return err } - rhsDesc, err = removeLearners(ctx, db, rhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 3ef925cc0304..ca3a996f2210 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -59,8 +59,9 @@ type processCallback func(error) // A replicaItem holds a replica and metadata about its queue state and // processing state. type replicaItem struct { - value roachpb.RangeID - seq int // enforce FIFO order for equal priorities + value roachpb.RangeID + replicaID roachpb.ReplicaID + seq int // enforce FIFO order for equal priorities // fields used when a replicaItem is enqueued in a priority queue. priority float64 @@ -89,6 +90,12 @@ func (i *replicaItem) registerCallback(cb processCallback) { i.callbacks = append(i.callbacks, cb) } +// sameReplica returns true if the passed replicaID matches the item's replica +// ID or the item's replica ID is zero. +func (i *replicaItem) sameReplica(replicaID roachpb.ReplicaID) bool { + return i.replicaID == 0 || i.replicaID == replicaID +} + // A priorityQueue implements heap.Interface and holds replicaItems. type priorityQueue struct { seqGen int @@ -180,6 +187,7 @@ func shouldQueueAgain(now, last hlc.Timestamp, minInterval time.Duration) (bool, // extraction. Establish a sane interface and use that. type replicaInQueue interface { AnnotateCtx(context.Context) context.Context + ReplicaID() roachpb.ReplicaID StoreID() roachpb.StoreID GetRangeID() roachpb.RangeID IsInitialized() bool @@ -487,7 +495,7 @@ func (h baseQueueHelper) MaybeAdd(ctx context.Context, repl replicaInQueue, now } func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) { - _, err := h.bq.addInternal(ctx, repl.Desc(), prio) + _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio) if err != nil && log.V(1) { log.Infof(ctx, "during Add: %s", err) } @@ -595,7 +603,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. if !should { return } - if _, err := bq.addInternal(ctx, repl.Desc(), priority); !isExpectedQueueError(err) { + if _, err := bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority); !isExpectedQueueError(err) { log.Errorf(ctx, "unable to add: %+v", err) } } @@ -612,7 +620,7 @@ func (bq *baseQueue) requiresSplit(cfg *config.SystemConfig, repl replicaInQueue // the replica is already queued at a lower priority, updates the existing // priority. Expects the queue lock to be held by caller. func (bq *baseQueue) addInternal( - ctx context.Context, desc *roachpb.RangeDescriptor, priority float64, + ctx context.Context, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, priority float64, ) (bool, error) { // NB: this is intentionally outside of bq.mu to avoid having to consider // lock ordering constraints. @@ -665,7 +673,7 @@ func (bq *baseQueue) addInternal( if log.V(3) { log.Infof(ctx, "adding: priority=%0.3f", priority) } - item = &replicaItem{value: desc.RangeID, priority: priority} + item = &replicaItem{value: desc.RangeID, replicaID: replicaID, priority: priority} bq.addLocked(item) // If adding this replica has pushed the queue past its maximum size, @@ -856,6 +864,9 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er if !repl.IsInitialized() { // We checked this when adding the replica, but we need to check it again // in case this is a different replica with the same range ID (see #14193). + // This is possible in the case where the replica was enqueued while not + // having a replica ID, perhaps due to a pre-emptive snapshot, and has + // since been removed and re-added at a different replica ID. return errors.New("cannot process uninitialized replica") } @@ -1092,21 +1103,21 @@ func (bq *baseQueue) addToPurgatoryLocked( // Remove all items from purgatory into a copied slice. bq.mu.Lock() - ranges := make([]roachpb.RangeID, 0, len(bq.mu.purgatory)) + ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) for rangeID := range bq.mu.purgatory { item := bq.mu.replicas[rangeID] if item == nil { log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) } item.setProcessing() - ranges = append(ranges, item.value) + ranges = append(ranges, item) bq.removeFromPurgatoryLocked(item) } bq.mu.Unlock() - for _, id := range ranges { - repl, err := bq.getReplica(id) - if err != nil { + for _, item := range ranges { + repl, err := bq.getReplica(item.value) + if err != nil || !item.sameReplica(repl.ReplicaID()) { continue } annotatedCtx := repl.AnnotateCtx(ctx) @@ -1168,11 +1179,12 @@ func (bq *baseQueue) pop() replicaInQueue { bq.mu.Unlock() repl, _ := bq.getReplica(item.value) - if repl != nil { + if repl != nil && item.sameReplica(repl.ReplicaID()) { return repl } - // Replica not found, remove from set and try again. + // Replica not found or was recreated with a new replica ID, remove from + // set and try again. bq.mu.Lock() bq.removeFromReplicaSetLocked(item.value) } diff --git a/pkg/storage/queue_concurrency_test.go b/pkg/storage/queue_concurrency_test.go index b87d4a8f1c27..afe28799175d 100644 --- a/pkg/storage/queue_concurrency_test.go +++ b/pkg/storage/queue_concurrency_test.go @@ -145,7 +145,8 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time { } type fakeReplica struct { - id roachpb.RangeID + id roachpb.RangeID + replicaID roachpb.ReplicaID } func (fr *fakeReplica) AnnotateCtx(ctx context.Context) context.Context { return ctx } @@ -153,6 +154,7 @@ func (fr *fakeReplica) StoreID() roachpb.StoreID { return 1 } func (fr *fakeReplica) GetRangeID() roachpb.RangeID { return fr.id } +func (fr *fakeReplica) ReplicaID() roachpb.ReplicaID { return fr.replicaID } func (fr *fakeReplica) IsInitialized() bool { return true } func (fr *fakeReplica) IsDestroyed() (DestroyReason, error) { return destroyReasonAlive, nil } func (fr *fakeReplica) Desc() *roachpb.RangeDescriptor { diff --git a/pkg/storage/queue_helpers_testutil.go b/pkg/storage/queue_helpers_testutil.go index 687bebeb68be..addb45b3331d 100644 --- a/pkg/storage/queue_helpers_testutil.go +++ b/pkg/storage/queue_helpers_testutil.go @@ -23,7 +23,7 @@ import ( func (bq *baseQueue) testingAdd( ctx context.Context, repl replicaInQueue, priority float64, ) (bool, error) { - return bq.addInternal(ctx, repl.Desc(), priority) + return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) } func forceScanAndProcess(s *Store, q *baseQueue) error { diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index df2cd9bf4cf1..51d8dc4e18dd 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -13,6 +13,7 @@ package storage import ( "container/heap" "context" + "fmt" "strconv" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // testQueueImpl implements queueImpl with a closure for shouldQueue. @@ -1099,6 +1101,42 @@ func TestBaseQueueProcessConcurrently(t *testing.T) { assertProcessedAndProcessing(3, 0) } +// TestBaseQueueReplicaChange ensures that if a replica is added to the queue +// with a non-zero replica ID then it is only popped if the retrieved replica +// from the getReplica() function has the same replica ID. +func TestBaseQueueChangeReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + // The testContext exists only to construct the baseQueue. + tc := testContext{} + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + tc.Start(t, stopper) + testQueue := &testQueueImpl{ + shouldQueueFn: func(now hlc.Timestamp, r *Replica) (shouldQueue bool, priority float64) { + return true, 1.0 + }, + } + bq := makeTestBaseQueue("test", testQueue, tc.store, tc.gossip, queueConfig{ + maxSize: defaultQueueMaxSize, + acceptsUnsplitRanges: true, + }) + r := &fakeReplica{id: 1, replicaID: 1} + bq.mu.Lock() + bq.getReplica = func(rangeID roachpb.RangeID) (replicaInQueue, error) { + if rangeID != 1 { + panic(fmt.Errorf("expected range id 1, got %d", rangeID)) + } + return r, nil + } + bq.mu.Unlock() + bq.maybeAdd(ctx, r, tc.store.Clock().Now()) + require.Equal(t, r, bq.pop()) + bq.maybeAdd(ctx, r, tc.store.Clock().Now()) + r.replicaID = 2 + require.Nil(t, bq.pop()) +} + func TestBaseQueueRequeue(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index f6eaf43f0116..546cc0124db1 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -242,6 +242,7 @@ type Replica struct { syncutil.RWMutex // The destroyed status of a replica indicating if it's alive, corrupt, // scheduled for destruction or has been GCed. + // destroyStatus should only be set while also holding the raftMu. destroyStatus // Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce // whenever a Raft operation is performed. @@ -346,6 +347,7 @@ type Replica struct { // The minimum allowed ID for this replica. Initialized from // RaftTombstone.NextReplicaID. minReplicaID roachpb.ReplicaID + // The ID of the leader replica within the Raft group. Used to determine // when the leadership changes. leaderID roachpb.ReplicaID @@ -610,6 +612,14 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } +// ReplicaID returns the ID for the Replica. It may be zero if the replica does +// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +func (r *Replica) ReplicaID() roachpb.ReplicaID { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mu.replicaID +} + // cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. // It requires that both Replica.mu and Replica.raftMu are exclusively held. diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index aa3a0116a841..0e0b8afcfcb9 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -293,22 +293,35 @@ func (r *Replica) handleComputeChecksumResult(ctx context.Context, cc *storagepb r.computeChecksumPostApply(ctx, *cc) } -func (r *Replica) handleChangeReplicasResult(ctx context.Context, chng *storagepb.ChangeReplicas) { - storeID := r.store.StoreID() - var found bool - for _, rDesc := range chng.Replicas() { - if rDesc.StoreID == storeID { - found = true - break - } +func (r *Replica) handleChangeReplicasResult( + ctx context.Context, chng *storagepb.ChangeReplicas, +) (changeRemovedReplica bool) { + // If this command removes us then we need to go through the process of + // removing our replica from the store. After this method returns the code + // should roughly return all the way up to whoever called handleRaftReady + // and this Replica should never be heard from again. We can detect if this + // change removed us by inspecting the replica's destroyStatus. We check the + // destroy status before processing a raft ready so if we find ourselves with + // removal pending at this point then we know that this command must be + // responsible. + if ds, _ := r.IsDestroyed(); ds != destroyReasonRemovalPending { + return false + } + if r.store.TestingKnobs().DisableEagerReplicaRemoval { + return true + } + if log.V(1) { + log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng) + } + if err := r.postDestroyRaftMuLocked(ctx, r.GetMVCCStats()); err != nil { + log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err) } - if !found { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + if err := r.store.removeReplicaImpl(ctx, r, chng.Desc.NextReplicaID, RemoveOptions{ + DestroyData: false, // We already destroyed the data when the batch committed. + }); err != nil { + log.Fatalf(ctx, "failed to remove replica: %v", err) } + return true } func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) { diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index ac8432388c09..a6b83c3c846b 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -13,6 +13,7 @@ package storage import ( "context" "fmt" + "math" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -375,6 +376,9 @@ type replicaAppBatch struct { // triggered a migration to the replica applied state key. If so, this // migration will be performed when the application batch is committed. migrateToAppliedStateKey bool + // changeRemovesReplica tracks whether the command in the batch (there must + // be only one) removes this replica from the range. + changeRemovesReplica bool // Statistics. entries int @@ -514,6 +518,20 @@ func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *replicatedCm return nil } +// changeRemovesStore returns true if any of the removals in this change have storeID. +func changeRemovesStore( + desc *roachpb.RangeDescriptor, change *storagepb.ChangeReplicas, storeID roachpb.StoreID, +) bool { + _, existsInDesc := desc.GetReplicaDescriptor(storeID) + // NB: if we're catching up from a preemptive snapshot then we won't + // exist in the current descriptor. + if !existsInDesc { + return false + } + _, existsInChange := change.Desc.GetReplicaDescriptor(storeID) + return !existsInChange +} + // runPreApplyTriggers runs any triggers that must fire before a command is // applied. It may modify the command's ReplicatedEvalResult. func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicatedCmd) error { @@ -560,16 +578,23 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat if merge := res.Merge; merge != nil { // Merges require the subsumed range to be atomically deleted when the // merge transaction commits. + + // If our range currently has a non-zero replica ID then we know we're + // safe to commit this merge because of the invariants provided to us + // by the merge protocol. Namely if this committed we know that if the + // command committed then all of the replicas in the range descriptor + // are collocated when this command commits. If we do not have a non-zero + // replica ID then the logic in Stage should detect that and destroy our + // preemptive snapshot so we shouldn't ever get here. rhsRepl, err := b.r.store.GetReplica(merge.RightDesc.RangeID) if err != nil { return wrapWithNonDeterministicFailure(err, "unable to get replica for merge") } - const rangeIDLocalOnly = true const mustClearRange = false if err := rhsRepl.preDestroyRaftMuLocked( - ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange, + ctx, b.batch, b.batch, roachpb.ReplicaID(math.MaxInt32), clearRangeIDLocalOnly, mustClearRange, ); err != nil { - return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge") + return wrapWithNonDeterministicFailure(err, "unable to destroy replica before merge") } } @@ -597,6 +622,41 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat } } + // Detect if this command will remove us from the range. + // If so we stage the removal of all of our range data into this batch. + // We'll complete the removal when it commits. Later logic detects the + // removal by inspecting the destroy status + if change := res.ChangeReplicas; change != nil && + changeRemovesStore(b.state.Desc, change, b.r.store.StoreID()) { + // Delete all of the local data. We're going to delete this hard state too. + // In order for this to be safe we need code above this to promise that we're + // never going to write hard state in response to a message for a later + // replica (with a different replica ID) to this range state. + // Furthermore we mark the replica as destroyed so that new commands are not + // accepted. The replica will be destroyed in handleChangeReplicas. + // Note that we must be holding the raftMu here because we're in the + // midst of application. + b.r.mu.Lock() + b.r.mu.destroyStatus.Set( + roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()), + destroyReasonRemovalPending) + b.r.mu.Unlock() + b.changeRemovesReplica = true + const mustUseRangeDeletionTombstone = true + if !b.r.store.TestingKnobs().DisableEagerReplicaRemoval { + if err := b.r.preDestroyRaftMuLocked( + ctx, + b.batch, + b.batch, + change.Desc.NextReplicaID, + clearAll, + mustUseRangeDeletionTombstone, + ); err != nil { + return wrapWithNonDeterministicFailure(err, "unable to destroy replica before removal") + } + } + } + // Provide the command's corresponding logical operations to the Replica's // rangefeed. Only do so if the WriteBatch is non-nil, in which case the // rangefeed requires there to be a corresponding logical operation log or @@ -609,6 +669,7 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat } else if cmd.raftCmd.LogicalOpLog != nil { log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) } + return nil } @@ -729,6 +790,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // batch's RocksDB batch. This records the highest raft and lease index that // have been applied as of this batch. It also records the Range's mvcc stats. func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { + if b.changeRemovesReplica { + return nil + } loader := &b.r.raftMu.stateLoader if b.migrateToAppliedStateKey { // A Raft command wants us to begin using the RangeAppliedState key @@ -857,14 +921,18 @@ func (sm *replicaStateMachine) ApplySideEffects( // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult()) if !cmd.IsTrivial() { - shouldAssert := sm.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + shouldAssert, isRemoved := sm.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + + if isRemoved { + return nil, apply.ErrRemoved + } // NB: Perform state assertion before acknowledging the client. // Some tests (TestRangeStatsInit) assumes that once the store has started // and the first range has a lease that there will not be a later hard-state. if shouldAssert { + sm.r.mu.Lock() // Assert that the on-disk state doesn't diverge from the in-memory // state as a result of the side effects. - sm.r.mu.Lock() sm.r.assertStateLocked(ctx, sm.r.store.Engine()) sm.r.mu.Unlock() sm.stats.stateAssertions++ @@ -923,7 +991,7 @@ func (sm *replicaStateMachine) ApplySideEffects( // to pass a replicatedResult that does not imply any side-effects. func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( ctx context.Context, rResult storagepb.ReplicatedEvalResult, -) (shouldAssert bool) { +) (shouldAssert, isRemoved bool) { // Assert that this replicatedResult implies at least one side-effect. if rResult.Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult") @@ -955,7 +1023,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( // we want to assert that these two states do not diverge. shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) if !shouldAssert { - return false + return false, sm.batch.changeRemovesReplica } if rResult.Split != nil { @@ -995,7 +1063,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( } if rResult.ChangeReplicas != nil { - sm.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) + isRemoved = sm.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) rResult.ChangeReplicas = nil } @@ -1007,7 +1075,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) } - return true + return true, isRemoved } func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *replicatedCmd) error { @@ -1023,10 +1091,17 @@ func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *re // to raft. return nil } - return sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + // If we're removed then we know that must have happened due to this + // command and so we're happy to not apply this conf change. + isRemoved, err := sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ApplyConfChange(cmd.confChange.ConfChangeI) return true, nil }) + if isRemoved { + _, err = sm.r.IsDestroyed() + err = wrapWithNonDeterministicFailure(err, "failed to apply config change because replica was already removed") + } + return err default: panic("unexpected") } diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index 0fb60c307472..efd25b4de1b2 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -12,6 +12,7 @@ package storage import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -29,7 +30,10 @@ type DestroyReason int const ( // The replica is alive. destroyReasonAlive DestroyReason = iota - // The replica has been marked for GC, but hasn't been GCed yet. + // The replica is in the process of being removed but has not been removed + // yet. It exists to avoid races between two threads which may decide to + // destroy a replica (e.g. processing a ChangeRelicasTrigger removing the + // range and receiving a raft message with a higher replica ID). destroyReasonRemovalPending // The replica has been GCed. destroyReasonRemoved @@ -43,15 +47,15 @@ type destroyStatus struct { err error } +func (s destroyStatus) String() string { + return fmt.Sprintf("{%v %d}", s.err, s.reason) +} + func (s *destroyStatus) Set(err error, reason DestroyReason) { s.err = err s.reason = reason } -func (s *destroyStatus) Reset() { - s.Set(nil, destroyReasonAlive) -} - // IsAlive returns true when a replica is alive. func (s destroyStatus) IsAlive() bool { return s.reason == destroyReasonAlive @@ -62,16 +66,22 @@ func (s destroyStatus) Removed() bool { return s.reason == destroyReasonRemoved } +// RemovalPending returns whether the replica is removed or in the process of +// being removed. +func (s destroyStatus) RemovalPending() bool { + return s.reason == destroyReasonRemovalPending || s.reason == destroyReasonRemoved +} + func (r *Replica) preDestroyRaftMuLocked( ctx context.Context, reader engine.Reader, writer engine.Writer, nextReplicaID roachpb.ReplicaID, - rangeIDLocalOnly bool, + clearOpt clearRangeOption, mustClearRange bool, ) error { desc := r.Desc() - err := clearRangeData(desc, reader, writer, rangeIDLocalOnly, mustClearRange) + err := clearRangeData(desc, reader, writer, clearOpt, mustClearRange) if err != nil { return err } @@ -114,6 +124,53 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS return nil } +// destroyUninitializedReplicaRaftMuLocked is called when we know that an +// uninitialized replica (which knows its replica ID but not its key range) has +// been removed and re-added as a different replica (with a new replica +// ID). We're safe to GC its hard state because nobody cares about our votes +// anymore. We can't GC the range's data because we don't know where it is. +// Fortunately this isn't a problem because the range must not have data +// because a replica must apply a snapshot before having data and if we had +// applied a snapshot then we'd be initialized. This replica may have been +// created in anticipation of a split in which case we'll clear its data when +// the split trigger is applied. +func (r *Replica) destroyUninitializedReplicaRaftMuLocked( + ctx context.Context, nextReplicaID roachpb.ReplicaID, +) error { + batch := r.Engine().NewWriteOnlyBatch() + defer batch.Close() + + // Clear the range ID local data including the hard state. + // We don't know about any user data so we can't clear any user data. + // See the comment on this method for why this is safe. + if err := r.preDestroyRaftMuLocked( + ctx, + r.Engine(), + batch, + nextReplicaID, + clearRangeIDLocalOnly, + false, /* mustClearRange */ + ); err != nil { + return err + } + + // We need to sync here because we are potentially deleting sideloaded + // proposals from the file system next. We could write the tombstone only in + // a synchronous batch first and then delete the data alternatively, but + // then need to handle the case in which there is both the tombstone and + // leftover replica data. + if err := batch.Commit(true); err != nil { + return err + } + + if r.raftMu.sideloaded != nil { + if err := r.raftMu.sideloaded.Clear(ctx); err != nil { + log.Warningf(ctx, "failed to remove sideload storage for %v: %v", r, err) + } + } + return nil +} + // destroyRaftMuLocked deletes data associated with a replica, leaving a // tombstone. func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error { @@ -128,7 +185,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb r.Engine(), batch, nextReplicaID, - false, /* rangeIDLocalOnly */ + clearAll, false, /* mustClearRange */ ); err != nil { return err diff --git a/pkg/storage/replica_gc_queue.go b/pkg/storage/replica_gc_queue.go index 33d64df45dca..27878c876758 100644 --- a/pkg/storage/replica_gc_queue.go +++ b/pkg/storage/replica_gc_queue.go @@ -114,13 +114,13 @@ func newReplicaGCQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *repl // in the past. func (rgcq *replicaGCQueue) shouldQueue( ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig, -) (bool, float64) { +) (shouldQ bool, prio float64) { + lastCheck, err := repl.GetLastReplicaGCTimestamp(ctx) if err != nil { log.Errorf(ctx, "could not read last replica GC timestamp: %+v", err) return false, 0 } - if _, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID()); !currentMember { return true, replicaGCPriorityRemoved } @@ -215,6 +215,11 @@ func (rgcq *replicaGCQueue) process( } replyDesc := rs[0] + repl.mu.RLock() + replicaID := repl.mu.replicaID + ticks := repl.mu.ticks + repl.mu.RUnlock() + // Now check whether the replica is meant to still exist. // Maybe it was deleted "under us" by being moved. currentDesc, currentMember := replyDesc.GetReplicaDescriptor(repl.store.StoreID()) @@ -234,11 +239,6 @@ func (rgcq *replicaGCQueue) process( // We are no longer a member of this range, but the range still exists. // Clean up our local data. - repl.mu.RLock() - replicaID := repl.mu.replicaID - ticks := repl.mu.ticks - repl.mu.RUnlock() - if replicaID == 0 { // This is a preemptive replica. GC'ing a preemptive replica is a // good idea if and only if the up-replication that it was a part of @@ -284,13 +284,24 @@ func (rgcq *replicaGCQueue) process( rgcq.metrics.RemoveReplicaCount.Inc(1) log.VEventf(ctx, 1, "destroying local data") + + nextReplicaID := replyDesc.NextReplicaID + if currentMember { + // If we're a member of the current range descriptor then we must not put + // down a tombstone at replyDesc.NextReplicaID as that would prevent us + // from getting a snapshot and becoming a part of the rang.e + nextReplicaID = currentDesc.ReplicaID + } // Note that this seems racy - we didn't hold any locks between reading // the range descriptor above and deciding to remove the replica - but // we pass in the NextReplicaID to detect situations in which the // replica became "non-gc'able" in the meantime by checking (with raftMu // held throughout) whether the replicaID is still smaller than the - // NextReplicaID. - if err := repl.store.RemoveReplica(ctx, repl, replyDesc.NextReplicaID, RemoveOptions{ + // NextReplicaID. Given non-zero replica IDs don't change this is only + // possible if we currently think we're processing a pre-emptive snapshot + // but discover in RemoveReplica that this range has since been added and + // knows that. + if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{ DestroyData: true, }); err != nil { return err @@ -328,13 +339,6 @@ func (rgcq *replicaGCQueue) process( } } - // We don't have the last NextReplicaID for the subsumed range, nor can we - // obtain it, but that's OK: we can just be conservative and use the maximum - // possible replica ID. store.RemoveReplica will write a tombstone using - // this maximum possible replica ID, which would normally be problematic, as - // it would prevent this store from ever having a new replica of the removed - // range. In this case, however, it's copacetic, as subsumed ranges _can't_ - // have new replicas. const nextReplicaID = math.MaxInt32 if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{ DestroyData: true, diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index b28a3a33591b..e8deb1f9181d 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -150,26 +150,25 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( } r.rangeStr.store(replicaID, r.mu.state.Desc) r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) - if err := r.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil { - return err + if r.mu.replicaID == 0 { + if err := r.setReplicaIDRaftMuLockedMuLocked(ctx, replicaID); err != nil { + return err + } + } else if r.mu.replicaID != replicaID { + log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d", + r.mu.replicaID, replicaID) } - r.assertStateLocked(ctx, r.store.Engine()) return nil } -func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - defer r.mu.Unlock() - return r.setReplicaIDRaftMuLockedMuLocked(replicaID) -} - -func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) error { - if r.mu.replicaID == replicaID { +func (r *Replica) setReplicaIDRaftMuLockedMuLocked( + ctx context.Context, replicaID roachpb.ReplicaID, +) error { + if r.mu.replicaID != 0 { + log.Fatalf(ctx, "cannot set replica ID from anything other than 0, currently %d", + r.mu.replicaID) // The common case: the replica ID is unchanged. - return nil } if replicaID == 0 { // If the incoming message does not have a new replica ID it is a @@ -183,17 +182,11 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) if r.mu.replicaID > replicaID { return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID) } - - if r.mu.destroyStatus.reason == destroyReasonRemovalPending { - // An earlier incarnation of this replica was removed, but apparently it has been re-added - // now, so reset the status. - r.mu.destroyStatus.Reset() + if r.mu.destroyStatus.RemovalPending() { + // This replica has been marked for removal and we're trying to resurrect it. + log.Fatalf(ctx, "cannot resurect replica %d", r.mu.replicaID) } - // if r.mu.replicaID != 0 { - // // TODO(bdarnell): clean up previous raftGroup (update peers) - // } - // Initialize or update the sideloaded storage. If the sideloaded storage // already exists (which is iff the previous replicaID was non-zero), then // we have to move the contained files over (this corresponds to the case in @@ -220,24 +213,13 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) return errors.Wrap(err, "while initializing sideloaded storage") } - previousReplicaID := r.mu.replicaID r.mu.replicaID = replicaID + r.mu.minReplicaID = replicaID + 1 - if replicaID >= r.mu.minReplicaID { - r.mu.minReplicaID = replicaID + 1 - } - // Reset the raft group to force its recreation on next usage. - r.mu.internalRaftGroup = nil - - // If there was a previous replica, repropose its pending commands under - // this new incarnation. - if previousReplicaID != 0 { - if log.V(1) { - log.Infof(r.AnnotateCtx(context.TODO()), "changed replica ID from %d to %d", - previousReplicaID, replicaID) - } - // repropose all pending commands under new replicaID. - r.refreshProposalsLocked(0, reasonReplicaIDChanged) + // Sanity check that we do not already have a raft group as we did not + // know our replica ID before this call. + if r.mu.internalRaftGroup != nil { + log.Fatalf(ctx, "somehow had an initialized raft group on a zero valued replica") } return nil @@ -270,8 +252,9 @@ func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { // If this replica hasn't initialized the Raft group, create it and // unquiesce and wake the leader to ensure the replica comes up to date. initialized := r.mu.internalRaftGroup != nil + removed := !r.mu.destroyStatus.IsAlive() r.mu.RUnlock() - if initialized { + if initialized || removed { return } @@ -281,7 +264,9 @@ func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { r.mu.Lock() defer r.mu.Unlock() - if err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + // If we raced on checking the destroyStatus above that's fine as + // the below withRaftGroupLocked will no-op. + if _, err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { return true, nil }); err != nil { log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err) diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 6f2a5ff6825a..b8d3564f7a0e 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -429,13 +429,25 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) { require.Contains(t, tracing.FormatRecordedSpans(trace), msg) return tc.LookupRangeOrFatal(t, scratchStartKey) } - desc := checkNoGC() // Make sure it didn't collect the learner. require.NotEmpty(t, desc.Replicas().Learners()) // Now get the range into a joint config. tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(1)) // remove learner + // We need to wait until the LHS range has been GC'd on server 1 before the + // RHS can be successfully added. Wait for the RHS to exist. + testutils.SucceedsSoon(t, func() error { + s := tc.Server(1) + firstStore, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := firstStore.LookupReplica(roachpb.RKey(scratchStartKey)) + if repl != nil { + return fmt.Errorf("replica has not yet been GC'd") + } + return nil + }) + ltk.withStopAfterJointConfig(func() { desc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) require.Len(t, desc.Replicas().Filter(predIncoming), 1, desc) @@ -549,7 +561,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) { // that the descriptor has changed since the AdminChangeReplicas command // started. close(blockSnapshotsCh) - if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) { + if err := g.Wait(); !testutils.IsError(err, `descriptor changed|raft group deleted`) { t.Fatalf(`expected "descriptor changed" error got: %+v`, err) } desc = tc.LookupRangeOrFatal(t, scratchStartKey) diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index c60a48d35816..5235d5b0e5ca 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -618,12 +618,16 @@ func (rp *replicaProposer) enqueueUpdateCheck() { func (rp *replicaProposer) withGroupLocked(fn func(*raft.RawNode) error) error { // Pass true for mayCampaignOnWake because we're about to propose a command. - return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err := (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader // if we were quiesced. However, we should make sure we are unquiesced. (*Replica)(rp).unquiesceLocked() return false /* unquiesceLocked */, fn(raftGroup) }) + if isRemoved { + err = rp.mu.destroyStatus.err + } + return err } func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 52b577da569c..aa68401ea88a 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -382,7 +382,7 @@ func (r *Replica) hasPendingProposalsRLocked() bool { // stepRaftGroup calls Step on the replica's RawNode with the provided request's // message. Before doing so, it assures that the replica is unquiesced and ready // to handle the request. -func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { +func (r *Replica) stepRaftGroup(req *RaftMessageRequest) (isRemoved bool, _ error) { // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -445,14 +445,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var hasReady bool var rd raft.Ready r.mu.Lock() - lastIndex := r.mu.lastIndex // used for append below lastTerm := r.mu.lastTerm raftLogSize := r.mu.raftLogSize leaderID := r.mu.leaderID lastLeaderID := leaderID - - err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil { return false, err } @@ -466,6 +464,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "while checking raft group for Ready" return stats, expl, errors.Wrap(err, expl) } + // If we've been removed then return. + if isRemoved { + return stats, "", nil + } if !hasReady { // We must update the proposal quota even if we don't have a ready. @@ -723,7 +725,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Might have gone negative if node was recently restarted. raftLogSize = 0 } - } // Update protected state - last index, last term, raft log size, and raft @@ -756,10 +757,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - if err := appTask.ApplyCommittedEntries(ctx); err != nil { + err := appTask.ApplyCommittedEntries(ctx) + stats.applyCommittedEntriesStats = sm.moveStats() + switch err { + case nil: + case apply.ErrRemoved: + // We know that our replica has been removed. We also know that we've + // stepped the state machine up to the point where it's been removed. + // TODO(ajwerner): decide if there's more to be done here. + return stats, "", err + default: return stats, err.(*nonDeterministicFailure).safeExpl, err } - stats.applyCommittedEntriesStats = sm.moveStats() // etcd raft occasionally adds a nil entry (our own commands are never // empty). This happens in two situations: When a new leader is elected, and @@ -789,11 +798,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Unlock() } - // TODO(bdarnell): need to check replica id and not Advance if it - // has changed. Or do we need more locking to guarantee that replica - // ID cannot change during handleRaftReady? + // NB: if we just processed a command which removed this replica from the + // raft group we will early return before this point. This, combined with + // the fact that we'll refuse to process messages intended for a higher + // replica ID ensures that our replica ID could not have changed. const expl = "during advance" - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err = r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.Advance(rd) // If the Raft group still has more to process then we immediately @@ -804,9 +814,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.enqueueRaftUpdateCheck(r.RangeID) } return true, nil - }); err != nil { + }) + if err != nil { return stats, expl, errors.Wrap(err, expl) } + // If we removed ourselves during this call we would have early returned + // above with the apply.ErrRemoved. + if isRemoved { + return stats, expl, errors.Errorf("replica unexpectedly removed") + } // NB: All early returns other than the one due to not having a ready // which also makes the below call are due to fatal errors. @@ -1153,11 +1169,11 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { Message: msg, RangeStartKey: startKey, // usually nil }) { - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + if isRemoved, err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { r.mu.droppedMessages++ raftGroup.ReportUnreachable(msg.To) return true, nil - }); err != nil { + }); !isRemoved && err != nil { log.Fatal(ctx, err) } } @@ -1197,10 +1213,10 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID snapStatus = raft.SnapshotFailure } - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + if isRemoved, err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ReportSnapshot(uint64(to), snapStatus) return true, nil - }); err != nil { + }); !isRemoved && err != nil { log.Fatal(ctx, err) } } @@ -1322,19 +1338,22 @@ func (s pendingCmdSlice) Less(i, j int) bool { // varies. // // Requires that Replica.mu is held. +// +// If this Replica is in the process of being removed this method will return +// true and a nil error. func (r *Replica) withRaftGroupLocked( mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { - if r.mu.destroyStatus.Removed() { +) (isRemoved bool, _ error) { + if r.mu.destroyStatus.RemovalPending() { // Silently ignore all operations on destroyed replicas. We can't return an // error here as all errors returned from this method are considered fatal. - return nil + return true, nil } if r.mu.replicaID == 0 { // The replica's raft group has not yet been configured (i.e. the replica // was created from a preemptive snapshot). - return nil + return false, nil } if r.mu.internalRaftGroup == nil { @@ -1347,7 +1366,7 @@ func (r *Replica) withRaftGroupLocked( &raftLogger{ctx: ctx}, )) if err != nil { - return err + return false, err } r.mu.internalRaftGroup = raftGroup @@ -1364,7 +1383,7 @@ func (r *Replica) withRaftGroupLocked( if unquiesce { r.unquiesceAndWakeLeaderLocked() } - return err + return false, err } // withRaftGroup calls the supplied function with the (lazily initialized) @@ -1378,9 +1397,12 @@ func (r *Replica) withRaftGroupLocked( // should not initiate an election while handling incoming raft // messages (which may include MsgVotes from an election in progress, // and this election would be disrupted if we started our own). +// +// If this Replica is in the process of being removed this method will return +// true and a nil error. func (r *Replica) withRaftGroup( mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { +) (isRemoved bool, _ error) { r.mu.Lock() defer r.mu.Unlock() return r.withRaftGroupLocked(mayCampaignOnWake, f) diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 7132eae707e0..aa0e62b93de2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -678,6 +678,14 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } +type clearRangeOption int + +const ( + clearRangeIDLocalOnly clearRangeOption = iota + clearReplicatedOnly + clearAll +) + // clearRangeData clears the data associated with a range descriptor. If // rangeIDLocalOnly is true, then only the range-id local keys are deleted. // Otherwise, the range-id local keys, range local keys, and user keys are all @@ -689,16 +697,20 @@ func clearRangeData( desc *roachpb.RangeDescriptor, eng engine.Reader, writer engine.Writer, - rangeIDLocalOnly bool, + opt clearRangeOption, mustClearRange bool, ) error { var keyRanges []rditer.KeyRange - if rangeIDLocalOnly { - keyRanges = []rditer.KeyRange{rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false)} - } else { + switch opt { + case clearRangeIDLocalOnly: + keyRanges = []rditer.KeyRange{ + rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false), + } + case clearReplicatedOnly: + keyRanges = rditer.MakeReplicatedKeyRanges(desc) + case clearAll: keyRanges = rditer.MakeAllKeyRanges(desc) } - var clearRangeFn func(engine.Reader, engine.Writer, engine.MVCCKey, engine.MVCCKey) error if mustClearRange { clearRangeFn = func(eng engine.Reader, writer engine.Writer, start, end engine.MVCCKey) error { @@ -1013,7 +1025,7 @@ func (r *Replica) clearSubsumedReplicaDiskData( r.store.Engine(), &subsumedReplSST, subsumedNextReplicaID, - true, /* rangeIDLocalOnly */ + clearRangeIDLocalOnly, true, /* mustClearRange */ ); err != nil { subsumedReplSST.Close() diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 05490d143468..bcbf3a983dea 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7278,117 +7278,6 @@ func TestSyncSnapshot(t *testing.T) { } } -// TestReplicaIDChangePending verifies that on a replica ID change, pending -// commands are re-proposed on the new raft group. -func TestReplicaIDChangePending(t *testing.T) { - defer leaktest.AfterTest(t)() - - tc := testContext{} - cfg := TestStoreConfig(nil) - // Disable ticks to avoid automatic reproposals after a timeout, which - // would pass this test. - cfg.RaftTickInterval = math.MaxInt32 - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.StartWithStoreConfig(t, stopper, cfg) - repl := tc.repl - - // Stop the command from being proposed to the raft group and being removed. - proposedOnOld := make(chan struct{}, 1) - repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool, _ error) { - select { - case proposedOnOld <- struct{}{}: - default: - } - return true, nil - } - lease := *repl.mu.state.Lease - repl.mu.Unlock() - - // Add a command to the pending list and wait for it to be proposed. - magicTS := tc.Clock().Now() - ba := roachpb.BatchRequest{} - ba.Timestamp = magicTS - ba.Add(&roachpb.PutRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key("a"), - }, - Value: roachpb.MakeValueFromBytes([]byte("val")), - }) - _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, &allSpans, endCmds{}) - if err != nil { - t.Fatal(err) - } - <-proposedOnOld - - // Set the raft command handler so we can tell if the command has been - // re-proposed. - proposedOnNew := make(chan struct{}, 1) - repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { - if p.Request.Timestamp == magicTS { - select { - case proposedOnNew <- struct{}{}: - default: - } - } - return false, nil - } - repl.mu.Unlock() - - // Set the ReplicaID on the replica. - if err := repl.setReplicaID(2); err != nil { - t.Fatal(err) - } - - <-proposedOnNew -} - -func TestSetReplicaID(t *testing.T) { - defer leaktest.AfterTest(t)() - - tsc := TestStoreConfig(nil) - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.StartWithStoreConfig(t, stopper, tsc) - - repl := tc.repl - - testCases := []struct { - replicaID roachpb.ReplicaID - minReplicaID roachpb.ReplicaID - newReplicaID roachpb.ReplicaID - expectedMinReplicaID roachpb.ReplicaID - expectedErr string - }{ - {0, 0, 1, 2, ""}, - {0, 1, 1, 2, ""}, - {0, 2, 1, 2, "raft group deleted"}, - {1, 2, 1, 2, ""}, // not an error; replicaID == newReplicaID is checked first - {2, 0, 1, 0, "replicaID cannot move backwards"}, - } - for _, c := range testCases { - t.Run("", func(t *testing.T) { - repl.mu.Lock() - repl.mu.replicaID = c.replicaID - repl.mu.minReplicaID = c.minReplicaID - repl.mu.Unlock() - - err := repl.setReplicaID(c.newReplicaID) - repl.mu.Lock() - if repl.mu.minReplicaID != c.expectedMinReplicaID { - t.Errorf("expected minReplicaID=%d, but found %d", c.expectedMinReplicaID, repl.mu.minReplicaID) - } - repl.mu.Unlock() - if !testutils.IsError(err, c.expectedErr) { - t.Fatalf("expected %q, but found %v", c.expectedErr, err) - } - }) - } -} - func TestReplicaRetryRaftProposal(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index 7f864f1f77b4..f91b4057e8ef 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -46,7 +46,7 @@ func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, * func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { r := (*Replica)(sdh) r.raftMu.Lock() - _ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) { + _, _ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) { // NB: intentionally ignore the error (which can be ErrProposalDropped // when there's an SST inflight). data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 99fa19230a5a..068270f9bcc6 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/storage/apply" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/closedts/container" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" @@ -1142,7 +1143,7 @@ func (s *Store) IsStarted() bool { return atomic.LoadInt32(&s.started) == 1 } -// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing ( such as +// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such as // RaftHardStateKey, RaftTombstoneKey, and many others). Such keys could in principle exist at any // RangeID, and this helper efficiently discovers all the keys of the desired type (as specified by // the supplied `keyFn`) and, for each key-value pair discovered, unmarshals it into `msg` and then @@ -2117,38 +2118,20 @@ func splitPostApply( rightRng.mu.Lock() // The right hand side of the split may have been removed and re-added // in the meantime, and the replicaID in RightDesc may be stale. - // Consequently the call below may fail with a RaftGroupDeletedError. In - // general, this protects earlier incarnations of the replica that were + // In general, this protects earlier incarnations of the replica that were // since replicaGC'ed from reneging on promises made earlier (for // example, once the HardState is removed, a replica could cast a // different vote for the same term). // - // It is safe to circumvent that in this case because the RHS must have - // remained uninitialized (the LHS blocks all user data, and since our - // Raft logs start at a nonzero index a snapshot must go through before - // any log entries are appended). This means the data in that range is - // just a HardState which we "logically" snapshot by assigning it data - // formerly located within the LHS. + // The RHS must have remained uninitialized (the LHS blocks all user data, + // and since our Raft logs start at a nonzero index a snapshot must go + // through before any log entries are appended). This means the data in + // that range is just a HardState which we "logically" snapshot by + // assigning it data formerly located within the LHS. // - // Note that if we ever have a way to replicaGC uninitialized replicas, - // the RHS may have been gc'ed and so the HardState would be gone. In - // that case, the requirement that the HardState remains would have been - // violated if the bypass below were used, which is why we place an - // assertion. - // - // See: - // https://github.com/cockroachdb/cockroach/issues/21146#issuecomment-365757329 - // - // TODO(tbg): this argument is flawed - it's possible for a tombstone - // to exist on the RHS: - // https://github.com/cockroachdb/cockroach/issues/40470 - // Morally speaking, this means that we should throw away the data we - // moved from the LHS to the RHS (depending on the tombstone). - // Realistically speaking it will probably be easier to create the RHS - // anyway, even though there's a tombstone and it may just get gc'ed - // again. Note that for extra flavor, we may not even know whether the - // RHS is currently supposed to exist or not, lending more weight to the - // second approach. + // We detect this case and pass the old range descriptor for the RHS + // to SpitRange below which will clear the RHS data rather than installing + // the RHS in the store. tombstoneKey := keys.RaftTombstoneKey(rightRng.RangeID) var tombstone roachpb.RaftTombstone if ok, err := engine.MVCCGetProto( @@ -2156,49 +2139,56 @@ func splitPostApply( ); err != nil { log.Fatalf(ctx, "unable to load tombstone for RHS: %+v", err) } else if ok { - log.Fatalf(ctx, "split trigger found right-hand side with tombstone %+v: %v", tombstone, rightRng) + log.Warningf(ctx, "split trigger found right-hand side with tombstone %+v, "+ + "RHS must have been removed at this ID: %v", tombstone, rightRng) } rightDesc, ok := split.RightDesc.GetReplicaDescriptor(r.StoreID()) if !ok { - // This is yet another special quirky case. The local store is not - // necessarily a member of the split; this can occur if this store - // wasn't a member at the time of the split, but is nevertheless - // catching up across the split. For example, add a learner, and - // while it is being caught up via a snapshot, remove the learner - // again, then execute a split, and re-add it. Upon being re-added - // the learner will likely catch up from where the snapshot left it, - // and it will see itself get removed, then we hit this branch when - // the split trigger is applied, and eventually there's a - // ChangeReplicas that re-adds the local store under a new - // replicaID. - // - // So our trigger will have a smaller replicaID for our RHS, which - // will blow up in initRaftMuLockedReplicaMuLocked. We still want - // to force the RHS to accept the descriptor, even though that - // rewinds the replicaID. To do that we want to change the existing - // replicaID, but we didn't find one -- zero is then the only reasonable - // choice. Note that this is also the replicaID a replica that is - // not reflected in its own descriptor will have, i.e. we're cooking - // up less of a special case here than you'd expect at first glance. - // - // Note that futzing with the replicaID is a high-risk operation as - // it is what the raft peer will believe itself to be identified by. - // Under no circumstances must we use a replicaID that belongs to - // someone else, or a byzantine situation will result. Zero is - // special-cased and will never init a raft group until the real - // ID is known from inbound raft traffic. - rightDesc.ReplicaID = 0 // for clarity only; it's already zero + // The only time that this case can happen is if we are currently not + // a part of this range (i.e. catching up from a preemptive snapshot). + // In this case it's fine and the logic below is reasonable. + _, lhsExists := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()) + if lhsExists { + log.Fatalf(ctx, "we can't have had the local store change replica ID: %d %v", r.mu.replicaID, rightRng) + } } + // If we're in this case then we know that the RHS has since been removed + // and re-added with a higher replica ID. We know we've never processed a + // snapshot for the right range because up to this point it would overlap + // with the left and ranges cannot move rightwards. Furthermore if it didn't + // overlap because the LHS used to be smaller because it still hasn't + // processed a merge then there's no way we could possibly be processing + // this split. + // + // We might however have already voted at a higher term. In general + // this shouldn't happen because we add learners and then promote them + // only after we snapshot but for the sake of mixed version clusters and + // other potential behavior changes we pass the desc for the known to be + // old RHS and have SplitRange clear its data. This call to SplitRange + // short circuits the rest of this call which would set up the RHS. if rightRng.mu.replicaID > rightDesc.ReplicaID { - rightRng.mu.replicaID = rightDesc.ReplicaID + rightRng.mu.Unlock() + err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc, &split.RightDesc) + if err != nil { + log.Fatal(ctx, err) + } + return + } + // If we are uninitialized and have a zero-value replica ID then we're safe + // to ignore this tombstone because we couldn't have ever promised anything + // as the newer replica. This is true because we would have needed a + // snapshot we couldn't possibly have received. If the tombsone does not + // exceed the rightDesc.ReplicaID then we have no need to reset + // minReplicaID. This case can occur if we created the RHS due to raft + // messages before the LHS acquired the split lock and then the RHS + // discovered that it was removed and re-added at a higher replica ID which + // lead to a tombstone being laid down but then the node forgot about the + // higher replica ID. + if tombstone.NextReplicaID > rightDesc.ReplicaID { + rightRng.mu.minReplicaID = 0 } - // NB: the safety argument above implies that we don't have to worry - // about restoring the existing minReplicaID if it's nonzero. No - // promises have been made, so none need to be kept. So we clear this - // unconditionally, making sure that it doesn't block us from init'ing - // the RHS. - rightRng.mu.minReplicaID = 0 err := rightRng.initRaftMuLockedReplicaMuLocked(&split.RightDesc, r.store.Clock(), 0) + rightRng.mu.Unlock() if err != nil { log.Fatal(ctx, err) @@ -2237,10 +2227,17 @@ func splitPostApply( // until it receives a Raft message addressed to the right-hand range. But // since new replicas start out quiesced, unless we explicitly awaken the // Raft group, there might not be any Raft traffic for quite a while. - if err := rightRng.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + rightRngIsRemoved, err := rightRng.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { return true, nil - }); err != nil { + }) + if err != nil { log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err) + } else if rightRngIsRemoved { + // This case should not be possible because the destroyStatus for a replica + // can only change while holding the raftMu and in order to have reached + // this point we know that we must be holding the raftMu for the RHS + // because we acquired it beneath raft. + log.Fatalf(ctx, "unable to create raft group for right-hand range in split: range is removed") } // Invoke the leasePostApply method to ensure we properly initialize @@ -2251,7 +2248,7 @@ func splitPostApply( // Add the RHS replica to the store. This step atomically updates // the EndKey of the LHS replica and also adds the RHS replica // to the store's replica map. - if err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc); err != nil { + if err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc, nil /* oldRightDesc */); err != nil { // Our in-memory state has diverged from the on-disk state. log.Fatalf(ctx, "%s: failed to update Store after split: %+v", r, err) } @@ -2289,11 +2286,23 @@ func splitPostApply( // // This is only called from the split trigger in the context of the execution // of a Raft command. +// +// The funky case is if rightDesc is non-nil. If so then we know that the right +// replica has been removed and re-added before applying this split and we need +// to leave it uninitialized. It's not possible for it to have become +// initialized as it would have needed a snapshot which must have overlapped the +// leftRepl. The user data in oldRightDesc will be cleared. func (s *Store) SplitRange( - ctx context.Context, leftRepl, rightRepl *Replica, newLeftDesc roachpb.RangeDescriptor, + ctx context.Context, + leftRepl, rightRepl *Replica, + newLeftDesc roachpb.RangeDescriptor, + oldRightDesc *roachpb.RangeDescriptor, ) error { oldLeftDesc := leftRepl.Desc() - rightDesc := rightRepl.Desc() + rightDesc := oldRightDesc + if rightDesc == nil { + rightDesc = rightRepl.Desc() + } if !bytes.Equal(oldLeftDesc.EndKey, rightDesc.EndKey) || bytes.Compare(oldLeftDesc.StartKey, rightDesc.StartKey) >= 0 { @@ -2308,11 +2317,18 @@ func (s *Store) SplitRange( if exRng != rightRepl { log.Fatalf(ctx, "found unexpected uninitialized replica: %s vs %s", exRng, rightRepl) } - // NB: We only remove from uninitReplicas and the replicaQueues maps here - // so that we don't leave open a window where a replica is temporarily not - // present in Store.mu.replicas. - delete(s.mu.uninitReplicas, rightDesc.RangeID) - s.replicaQueues.Delete(int64(rightDesc.RangeID)) + // If oldRightDesc is not nil then we know that exRng refers to a later + // replica in the RHS range and should remain uninitialized. + if oldRightDesc == nil { + // NB: We only remove from uninitReplicas and the replicaQueues maps here + // so that we don't leave open a window where a replica is temporarily not + // present in Store.mu.replicas. + delete(s.mu.uninitReplicas, rightDesc.RangeID) + s.replicaQueues.Delete(int64(rightDesc.RangeID)) + } + } else if oldRightDesc != nil { + log.Fatalf(ctx, "found initialized replica despite knowing that the post "+ + "split replica has been removed") } leftRepl.setDesc(ctx, &newLeftDesc) @@ -2336,22 +2352,40 @@ func (s *Store) SplitRange( // Clear the original range's request stats, since they include requests for // spans that are now owned by the new range. leftRepl.leaseholderStats.resetRequestCounts() - leftRepl.writeStats.splitRequestCounts(rightRepl.writeStats) + if oldRightDesc == nil { + leftRepl.writeStats.splitRequestCounts(rightRepl.writeStats) - if err := s.addReplicaInternalLocked(rightRepl); err != nil { - return errors.Errorf("unable to add replica %v: %s", rightRepl, err) - } + if err := s.addReplicaInternalLocked(rightRepl); err != nil { + return errors.Errorf("unable to add replica %v: %s", rightRepl, err) + } - // Update the replica's cached byte thresholds. This is a no-op if the system - // config is not available, in which case we rely on the next gossip update - // to perform the update. - if err := rightRepl.updateRangeInfo(rightRepl.Desc()); err != nil { - return err + // Update the replica's cached byte thresholds. This is a no-op if the system + // config is not available, in which case we rely on the next gossip update + // to perform the update. + if err := rightRepl.updateRangeInfo(rightRepl.Desc()); err != nil { + return err + } + // Add the range to metrics and maybe gossip on capacity change. + s.metrics.ReplicaCount.Inc(1) + s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) + } else { + if rightRepl.IsInitialized() { + log.Fatalf(ctx, "refusing to clear replicated data for an initialized range %v in SplitRange", rightRepl) + } + // We need to clear the data which the RHS would have inherited. + // The uninitialized RHS doesn't know about this data so we must clear it + // lest it be leaked potentially forever. + batch := rightRepl.Engine().NewWriteOnlyBatch() + defer batch.Close() + if err := clearRangeData(oldRightDesc, rightRepl.Engine(), batch, clearReplicatedOnly, false); err != nil { + log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) + } + if err := batch.Commit(true); err != nil { + log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) + } + return nil } - // Add the range to metrics and maybe gossip on capacity change. - s.metrics.ReplicaCount.Inc(1) - s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) return nil } @@ -2580,7 +2614,12 @@ func (s *Store) removeReplicaImpl( // We check both rep.mu.ReplicaID and rep.mu.state.Desc's replica ID because // they can differ in cases when a replica's ID is increased due to an // incoming raft message (see #14231 for background). + // TODO(ajwerner): reconsider some of this sanity checking. rep.mu.Lock() + if rep.mu.destroyStatus.Removed() { + rep.mu.Unlock() + return nil + } replicaID := rep.mu.replicaID if rep.mu.replicaID >= nextReplicaID { rep.mu.Unlock() @@ -2600,12 +2639,7 @@ func (s *Store) removeReplicaImpl( } if !rep.IsInitialized() { - // The split trigger relies on the fact that it can bypass the tombstone - // check for the RHS, but this is only true as long as we never delete - // its HardState. - // - // See the comment in splitPostApply for details. - log.Fatalf(ctx, "can not replicaGC uninitialized replicas") + log.Fatalf(ctx, "can not replicaGC uninitialized replicas in this method") } // During merges, the context might have the subsuming range, so we explicitly @@ -2666,7 +2700,74 @@ func (s *Store) removeReplicaImpl( // TODO(peter): Could release s.mu.Lock() here. s.maybeGossipOnCapacityChange(ctx, rangeRemoveEvent) s.scanner.RemoveReplica(rep) + return nil +} + +func (s *Store) removeUninitializedReplicaRaftMuLocked( + ctx context.Context, rep *Replica, nextReplicaID roachpb.ReplicaID, +) error { + rep.raftMu.AssertHeld() + + // Sanity check this removal. + rep.mu.RLock() + ds := rep.mu.destroyStatus + isInitialized := rep.isInitializedRLocked() + rep.mu.RUnlock() + // Somebody already removed this Replica. + if ds.Removed() { + return nil + } + + if !ds.RemovalPending() { + log.Fatalf(ctx, "cannot remove uninitialized replica which is not removal pending: %v", ds) + } + + // When we're in this state we should have already had our destroy status set + // so it shouldn't have been possible to process any raft messages or apply + // any snapshots. + if isInitialized { + log.Fatalf(ctx, "previously uninitialized replica became initialized before removal") + } + + // Proceed with the removal. + rep.readOnlyCmdMu.Lock() + rep.mu.Lock() + rep.cancelPendingCommandsLocked() + rep.mu.internalRaftGroup = nil + rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.store.StoreID()), destroyReasonRemoved) + rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() + + if err := rep.destroyUninitializedReplicaRaftMuLocked(ctx, nextReplicaID); err != nil { + log.Fatalf(ctx, "failed to remove uninitialized replica %v: %v", rep, err) + } + + s.mu.Lock() + defer s.mu.Unlock() + // Sanity check, could be removed. + value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + if !stillExists { + log.Fatalf(ctx, "uninitialized replica was removed in the meantime") + } + existing := (*Replica)(value) + if existing == rep { + log.Infof(ctx, "removing uninitialized replica %v", rep) + } else { + log.Fatalf(ctx, "uninitialized replica %v was already removed", rep) + } + + s.metrics.ReplicaCount.Dec(1) + + // Only an uninitialized replica can have a placeholder since, by + // definition, an initialized replica will be present in the + // replicasByKey map. While the replica will usually consume the + // placeholder itself, that isn't guaranteed and so this invocation + // here is crucial (i.e. don't remove it). + if s.removePlaceholderLocked(ctx, rep.RangeID) { + atomic.AddInt32(&s.counts.droppedPlaceholders, 1) + } + s.unlinkReplicaByRangeIDLocked(rep.RangeID) return nil } @@ -3428,7 +3529,11 @@ func (s *Store) processRaftRequestWithReplica( drop := maybeDropMsgApp(ctx, (*replicaMsgAppDropper)(r), &req.Message, req.RangeStartKey) if !drop { - if err := r.stepRaftGroup(req); err != nil { + isRemoved, err := r.stepRaftGroup(req) + if isRemoved { + _, err = r.IsDestroyed() + } + if err != nil { return roachpb.NewError(err) } } @@ -3501,10 +3606,13 @@ func (s *Store) processRaftSnapshotRequest( }() } - if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { + isRemoved, err := r.stepRaftGroup(&snapHeader.RaftMessageRequest) + if isRemoved { + _, err = r.IsDestroyed() + } + if err != nil { return roachpb.NewError(err) } - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { fatalOnRaftReadyErr(ctx, expl, err) } @@ -3543,33 +3651,40 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons repl.raftMu.Lock() defer repl.raftMu.Unlock() repl.mu.Lock() - defer repl.mu.Unlock() // If the replica ID in the error does not match then we know // that the replica has been removed and re-added quickly. In // that case, we don't want to add it to the replicaGCQueue. - if tErr.ReplicaID != repl.mu.replicaID { - log.Infof(ctx, "replica too old response with old replica ID: %s", tErr.ReplicaID) + // If the replica is not alive then we also should ignore this error. + if tErr.ReplicaID != repl.mu.replicaID || !repl.mu.destroyStatus.IsAlive() { + repl.mu.Unlock() return nil } - // If the replica ID in the error does match, we know the replica - // will be removed and we can cancel any pending commands. This is - // sometimes necessary to unblock PushTxn operations that are - // necessary for the replica GC to succeed. - repl.cancelPendingCommandsLocked() - // The replica will be garbage collected soon (we are sure // since our replicaID is definitely too old), but in the meantime we // already want to bounce all traffic from it. Note that the replica - // could be re-added with a higher replicaID, in which this error is - // cleared in setReplicaIDRaftMuLockedMuLocked. - if repl.mu.destroyStatus.IsAlive() { - storeID := repl.store.StoreID() - repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID, storeID), destroyReasonRemovalPending) + // could be re-added with a higher replicaID, but we want to clear the + // replica's data before that happens. + if log.V(1) { + log.Infof(ctx, "setting local replica to destroyed due to ReplicaTooOld error") } - s.replicaGCQueue.AddAsync(ctx, repl, replicaGCPriorityRemoved) + storeID := repl.store.StoreID() + // NB: We know that there's a later copy of this range on this store but + // to introduce another more specific error in this case is overkill so + // we return RangeNotFoundError which the recipient will properly + // ignore. + repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID, storeID), + destroyReasonRemovalPending) + repl.mu.Unlock() + nextReplicaID := tErr.ReplicaID + 1 + if !repl.IsInitialized() { + return s.removeUninitializedReplicaRaftMuLocked(ctx, repl, nextReplicaID) + } + return s.removeReplicaImpl(ctx, repl, nextReplicaID, RemoveOptions{ + DestroyData: true, + }) case *roachpb.RaftGroupDeletedError: if replErr != nil { // RangeNotFoundErrors are expected here; nothing else is. @@ -3632,7 +3747,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // giving up the lock. Set lastRepl to nil, so we don't handle it // down below as well. lastRepl = nil - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, noSnap); err != nil { + switch _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, noSnap); err { + case nil: + case apply.ErrRemoved: + // return hopefully never to be heard from again. + default: fatalOnRaftReadyErr(ctx, expl, err) } } @@ -3677,7 +3796,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // handleRaftReadyRaftMuLocked) since racing to handle Raft Ready won't // have any undesirable results. ctx = lastRepl.AnnotateCtx(ctx) - if _, expl, err := lastRepl.handleRaftReady(ctx, noSnap); err != nil { + switch _, expl, err := lastRepl.handleRaftReady(ctx, noSnap); err { + case nil: + case apply.ErrRemoved: + // return hopefully never to be heard from again. + default: fatalOnRaftReadyErr(ctx, expl, err) } } @@ -3693,8 +3816,12 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { ctx = r.AnnotateCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) - if err != nil { - log.Fatalf(ctx, "%s: %+v", log.Safe(expl), err) // TODO(bdarnell) + switch err { + case nil: + case apply.ErrRemoved: + return + default: + fatalOnRaftReadyErr(ctx, expl, err) } elapsed := timeutil.Since(start) s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds()) @@ -3937,7 +4064,12 @@ func (s *Store) getOrCreateReplica( replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { + r := retry.Start(retry.Options{ + InitialBackoff: time.Microsecond, + MaxBackoff: 10 * time.Millisecond, + }) for { + r.Next() r, created, err := s.tryGetOrCreateReplica( ctx, rangeID, @@ -3965,33 +4097,77 @@ func (s *Store) tryGetOrCreateReplica( rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, -) (_ *Replica, created bool, _ error) { - // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) - repl.raftMu.Lock() // not unlocked - repl.mu.Lock() - defer repl.mu.Unlock() - - var replTooOldErr error - if creatingReplica != nil { +) (_ *Replica, created bool, err error) { + var ( + handleFromReplicaTooOld = func(repl *Replica) error { + if creatingReplica == nil { + return nil + } // Drop messages that come from a node that we believe was once a member of // the group but has been removed. desc := repl.mu.state.Desc _, found := desc.GetReplicaDescriptorByID(creatingReplica.ReplicaID) // It's not a current member of the group. Is it from the past? if !found && creatingReplica.ReplicaID < desc.NextReplicaID { - replTooOldErr = roachpb.NewReplicaTooOldError(creatingReplica.ReplicaID) + return roachpb.NewReplicaTooOldError(creatingReplica.ReplicaID) } + return nil + } + handleToReplicaTooOld = func(repl *Replica) error { + if replicaID == 0 || repl.mu.replicaID == 0 || repl.mu.replicaID >= replicaID { + return nil + } + if log.V(1) { + log.Infof(ctx, "found message for newer replica ID: %v %v %v %v %v", repl.mu.replicaID, replicaID, repl.mu.minReplicaID, repl.mu.state.Desc, &repl.mu.destroyStatus) + } + repl.mu.destroyStatus.Set(err, destroyReasonRemovalPending) + isInitialized := repl.isInitializedRLocked() + repl.mu.Unlock() + defer repl.mu.Lock() + if !isInitialized { + if err := s.removeUninitializedReplicaRaftMuLocked(ctx, repl, replicaID); err != nil { + log.Fatalf(ctx, "failed to remove uninitialized replica: %v", err) + } + } else { + if err := s.removeReplicaImpl(ctx, repl, replicaID, RemoveOptions{ + DestroyData: true, + }); err != nil { + log.Fatal(ctx, err) + } + } + return errRetry + } + ) + // The common case: look up an existing (initialized) replica. + if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { + repl := (*Replica)(value) + repl.raftMu.Lock() // not unlocked on success + repl.mu.Lock() + defer repl.mu.Unlock() + if err := handleFromReplicaTooOld(repl); err != nil { + repl.raftMu.Unlock() + return nil, false, err + } + if repl.mu.destroyStatus.RemovalPending() { + repl.raftMu.Unlock() + return nil, false, errRetry + } + if err := handleToReplicaTooOld(repl); err != nil { + repl.raftMu.Unlock() + return nil, false, err } var err error - if replTooOldErr != nil { - err = replTooOldErr - } else if ds := repl.mu.destroyStatus; ds.reason == destroyReasonRemoved { - err = errRetry - } else { - err = repl.setReplicaIDRaftMuLockedMuLocked(replicaID) + if repl.mu.replicaID == 0 { + // This message is telling us about our replica ID. + // This is a common case when dealing with preemptive snapshots. + err = repl.setReplicaIDRaftMuLockedMuLocked(repl.AnnotateCtx(ctx), replicaID) + } else if replicaID != 0 && repl.mu.replicaID > replicaID { + // TODO(ajwerner): probably just silently drop this message. + err = roachpb.NewRangeNotFoundError(rangeID, s.StoreID()) + } else if replicaID != 0 && repl.mu.replicaID != replicaID { + log.Fatalf(ctx, "we should never update the replica id based on a message %v %v", + repl.mu.replicaID, replicaID) } if err != nil { repl.raftMu.Unlock() diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 3fb7ff8b74d5..2674ed25f2e9 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -628,7 +628,9 @@ func (s *Store) canApplySnapshotLocked( existingRepl.raftMu.AssertHeld() existingRepl.mu.RLock() - existingIsInitialized := existingRepl.isInitializedRLocked() + existingDesc := existingRepl.mu.state.Desc + existingIsInitialized := existingDesc.IsInitialized() + existingDestroyStatus := existingRepl.mu.destroyStatus existingRepl.mu.RUnlock() if existingIsInitialized { @@ -637,15 +639,19 @@ func (s *Store) canApplySnapshotLocked( // in Replica.maybeAcquireSnapshotMergeLock for how this is // made safe. // - // NB: we expect the replica to know its replicaID at this point - // (i.e. !existingIsPreemptive), though perhaps it's possible - // that this isn't true if the leader initiates a Raft snapshot - // (that would provide a range descriptor with this replica in - // it) but this node reboots (temporarily forgetting its - // replicaID) before the snapshot arrives. + // NB: The snapshot must be intended for this replica as + // withReplicaForRequest ensures that requests with a non-zero replica + // id are passed to a replica with a matching id. Given this is not a + // preemptive snapshot we know that its id must be non-zero. return nil, nil } + // If we are not alive then we should not apply a snapshot as our removal + // is imminent. + if existingDestroyStatus.RemovalPending() { + return nil, existingDestroyStatus.err + } + // We have a key range [desc.StartKey,desc.EndKey) which we want to apply a // snapshot for. Is there a conflicting existing placeholder or an // overlapping range? @@ -670,7 +676,7 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canApplySnapshotLocked: cannot add placeholder, have an existing placeholder %s", s, exRng) + return errors.Errorf("%s: canApplySnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -732,47 +738,20 @@ func (s *Store) checkSnapshotOverlapLocked( // snapshot. func (s *Store) shouldAcceptSnapshotData( ctx context.Context, snapHeader *SnapshotRequest_Header, -) error { +) (err error) { if snapHeader.IsPreemptive() { return crdberrors.AssertionFailedf(`expected a raft or learner snapshot`) } - - s.mu.Lock() - defer s.mu.Unlock() - - // TODO(tbg): see the comment on desc.Generation for what seems to be a much - // saner way to handle overlap via generational semantics. - desc := *snapHeader.State.Desc - - // First, check for an existing Replica. - if v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ); ok { - existingRepl := (*Replica)(v) - existingRepl.mu.RLock() - existingIsInitialized := existingRepl.isInitializedRLocked() - existingRepl.mu.RUnlock() - - if existingIsInitialized { - // Regular Raft snapshots can't be refused at this point, - // even if they widen the existing replica. See the comments - // in Replica.maybeAcquireSnapshotMergeLock for how this is - // made safe. - // - // NB: we expect the replica to know its replicaID at this point - // (i.e. !existingIsPreemptive), though perhaps it's possible - // that this isn't true if the leader initiates a Raft snapshot - // (that would provide a range descriptor with this replica in - // it) but this node reboots (temporarily forgetting its - // replicaID) before the snapshot arrives. + pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, + func(ctx context.Context, r *Replica) *roachpb.Error { + if !r.IsInitialized() { + s.mu.Lock() + defer s.mu.Unlock() + return roachpb.NewError(s.checkSnapshotOverlapLocked(ctx, snapHeader)) + } return nil - } - } - - // We have a key range [desc.StartKey,desc.EndKey) which we want to apply a - // snapshot for. Is there a conflicting existing placeholder or an - // overlapping range? - return s.checkSnapshotOverlapLocked(ctx, snapHeader) + }) + return pErr.GoError() } // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 8829fd7a15cb..dc57a80cba01 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -569,8 +569,8 @@ func TestStoreAddRemoveRanges(t *testing.T) { // Try to remove range 1 again. if err := store.RemoveReplica(context.Background(), repl1, repl1.Desc().NextReplicaID, RemoveOptions{ DestroyData: true, - }); err == nil { - t.Fatal("expected error re-removing same range") + }); err != nil { + t.Fatalf("didn't expect error re-removing same range: %v", err) } // Try to add a range with previously-used (but now removed) ID. repl2Dup := createReplica(store, 1, roachpb.RKey("a"), roachpb.RKey("b")) @@ -712,11 +712,11 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { // Verify that removal of a replica marks it as destroyed so that future raft // commands on the Replica will silently be dropped. - if err := repl1.withRaftGroup(true, func(r *raft.RawNode) (bool, error) { + isRemoved, err := repl1.withRaftGroup(true, func(r *raft.RawNode) (bool, error) { return true, errors.Errorf("unexpectedly created a raft group") - }); err != nil { - t.Fatal(err) - } + }) + require.True(t, isRemoved) + require.NoError(t, err) repl1.mu.Lock() expErr := roachpb.NewError(repl1.mu.destroyStatus.err) @@ -1354,7 +1354,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep require.NoError(t, err) newLeftDesc := *repl.Desc() newLeftDesc.EndKey = splitKey - err = store.SplitRange(repl.AnnotateCtx(context.TODO()), repl, newRng, newLeftDesc) + err = store.SplitRange(repl.AnnotateCtx(context.TODO()), repl, newRng, newLeftDesc, nil) require.NoError(t, err) return newRng } @@ -2953,104 +2953,6 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { }) } -// Test that we set proper tombstones for removed replicas and use the -// tombstone to reject attempts to create a replica with a lesser ID. -func TestRemovedReplicaTombstone(t *testing.T) { - defer leaktest.AfterTest(t)() - - const rangeID = 1 - creatingReplica := roachpb.ReplicaDescriptor{ - NodeID: 2, - StoreID: 2, - ReplicaID: 2, - } - - // All test cases assume that the starting replica ID is 1. This assumption - // is enforced by a check within the test logic. - testCases := []struct { - setReplicaID roachpb.ReplicaID // set the existing replica to this before removing it - descNextReplicaID roachpb.ReplicaID // the descriptor's NextReplicaID during replica removal - createReplicaID roachpb.ReplicaID // try creating a replica at this ID - expectCreated bool - }{ - {1, 2, 2, true}, - {1, 2, 1, false}, - {1, 2, 1, false}, - {1, 3, 1, false}, - {1, 3, 2, false}, - {1, 3, 3, true}, - {1, 99, 98, false}, - {1, 99, 99, true}, - {2, 2, 2, false}, - {2, 2, 3, true}, - {2, 2, 99, true}, - {98, 2, 98, false}, - {98, 2, 99, true}, - } - for _, c := range testCases { - t.Run("", func(t *testing.T) { - tc := testContext{} - stopper := stop.NewStopper() - ctx := context.TODO() - defer stopper.Stop(ctx) - tc.Start(t, stopper) - s := tc.store - - repl1, err := s.GetReplica(rangeID) - if err != nil { - t.Fatal(err) - } - repl1.mu.Lock() - if repl1.mu.replicaID != 1 { - repl1.mu.Unlock() - t.Fatalf("test precondition not met; expected ReplicaID=1, got %d", repl1.mu.replicaID) - } - repl1.mu.Unlock() - - // Try to trigger a race where the replica ID gets increased during the GC - // process by taking the store lock and inserting a short sleep to cause - // the goroutine to start running the setReplicaID call. - errChan := make(chan error) - - func() { - repl1.raftMu.Lock() - defer repl1.raftMu.Unlock() - s.mu.Lock() - defer s.mu.Unlock() - repl1.mu.Lock() - defer repl1.mu.Unlock() - - go func() { - errChan <- s.RemoveReplica(ctx, repl1, c.descNextReplicaID, RemoveOptions{DestroyData: true}) - }() - - time.Sleep(1 * time.Millisecond) - - if err := repl1.setReplicaIDRaftMuLockedMuLocked(c.setReplicaID); err != nil { - t.Fatal(err) - } - }() - - if err := <-errChan; testutils.IsError(err, "replica ID has changed") { - // We didn't trigger the race, so just return success. - return - } else if err != nil { - t.Fatal(err) - } - - _, created, err := s.getOrCreateReplica(ctx, rangeID, c.createReplicaID, &creatingReplica) - if created != c.expectCreated { - t.Errorf("expected s.getOrCreateReplica(%d, %d, %v).created=%v, got %v", - rangeID, c.createReplicaID, creatingReplica, c.expectCreated, created) - } - if !c.expectCreated && !testutils.IsError(err, "raft group deleted") { - t.Errorf("expected s.getOrCreateReplica(%d, %d, %v).err='raft group deleted', got %v", - rangeID, c.createReplicaID, creatingReplica, err) - } - }) - } -} - type fakeSnapshotStream struct { nextResp *SnapshotResponse nextErr error diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 7ee8dccf6953..91d5b48e8f9f 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -139,6 +139,12 @@ type StoreTestingKnobs struct { // DisableRefreshReasonTicks disables refreshing pending commands // periodically. DisableRefreshReasonTicks bool + // DisableEagerReplicaRemoval prevents the Replica from destroying itself + // when it encounters a ChangeReplicasTrigger which would remove it. + // This option can lead to nasty cases during shutdown where a replica will + // spin attempting to acquire a split or merge lock on a RHS which will + // always fail and is generally not safe but is useful for testing. + DisableEagerReplicaRemoval bool // RefreshReasonTicksPeriod overrides the default period over which // pending commands are refreshed. The period is specified as a multiple // of Raft group ticks.