diff --git a/pkg/storage/apply/task.go b/pkg/storage/apply/task.go index 8bfa62acd0a1..26e9ef9511dd 100644 --- a/pkg/storage/apply/task.go +++ b/pkg/storage/apply/task.go @@ -12,6 +12,7 @@ package apply import ( "context" + "errors" "go.etcd.io/etcd/raft/raftpb" ) @@ -54,6 +55,10 @@ type StateMachine interface { ApplySideEffects(CheckedCommand) (AppliedCommand, error) } +// ErrRemoved can be returned from ApplySideEffects which will stop the +// task from processing more commands and return immediately. +var ErrRemoved = errors.New("replica removed") + // Batch accumulates a series of updates from Commands and performs them // all at once to its StateMachine when applied. Groups of Commands will be // staged in the Batch such that one or more trivial Commands are staged or diff --git a/pkg/storage/client_merge_test.go b/pkg/storage/client_merge_test.go index f6073a23ec68..f3ee1338ae62 100644 --- a/pkg/storage/client_merge_test.go +++ b/pkg/storage/client_merge_test.go @@ -57,7 +57,6 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.etcd.io/etcd/raft" "go.etcd.io/etcd/raft/raftpb" ) @@ -1638,6 +1637,7 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { storeCfg.TestingKnobs.DisableReplicateQueue = true storeCfg.TestingKnobs.DisableReplicaGCQueue = true storeCfg.TestingKnobs.DisableMergeQueue = true + storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &storeCfg} mtc.Start(t, 2) defer mtc.Stop() @@ -1662,10 +1662,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { for _, rangeID := range []roachpb.RangeID{lhsDesc.RangeID, rhsDesc.RangeID} { repl, err := store1.GetReplica(rangeID) if err != nil { - t.Fatal(err) + continue } if err := store1.ManualReplicaGC(repl); err != nil { - t.Fatal(err) + t.Logf("replica was already removed: %v", err) } if _, err := store1.GetReplica(rangeID); err == nil { t.Fatalf("replica of r%d not gc'd from s1", rangeID) @@ -2041,6 +2041,7 @@ func TestStoreRangeMergeAbandonedFollowers(t *testing.T) { storeCfg.TestingKnobs.DisableReplicaGCQueue = true storeCfg.TestingKnobs.DisableSplitQueue = true storeCfg.TestingKnobs.DisableMergeQueue = true + storeCfg.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &storeCfg} mtc.Start(t, 3) defer mtc.Stop() @@ -2808,74 +2809,6 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) { } } -// unreliableRaftHandler drops all Raft messages that are addressed to the -// specified rangeID, but lets all other messages through. -type unreliableRaftHandler struct { - rangeID roachpb.RangeID - storage.RaftMessageHandler - // If non-nil, can return false to avoid dropping a msg to rangeID - dropReq func(*storage.RaftMessageRequest) bool - dropHB func(*storage.RaftHeartbeat) bool - dropResp func(*storage.RaftMessageResponse) bool -} - -func (h *unreliableRaftHandler) HandleRaftRequest( - ctx context.Context, - req *storage.RaftMessageRequest, - respStream storage.RaftMessageResponseStream, -) *roachpb.Error { - if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { - reqCpy := *req - req = &reqCpy - req.Heartbeats = h.filterHeartbeats(req.Heartbeats) - req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps) - if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 { - // Entirely filtered. - return nil - } - } else if req.RangeID == h.rangeID { - if h.dropReq == nil || h.dropReq(req) { - log.Infof( - ctx, - "dropping Raft message %s", - raft.DescribeMessage(req.Message, func([]byte) string { - return "" - }), - ) - - return nil - } - } - return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) -} - -func (h *unreliableRaftHandler) filterHeartbeats( - hbs []storage.RaftHeartbeat, -) []storage.RaftHeartbeat { - if len(hbs) == 0 { - return hbs - } - var cpy []storage.RaftHeartbeat - for i := range hbs { - hb := &hbs[i] - if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) { - cpy = append(cpy, *hb) - } - } - return cpy -} - -func (h *unreliableRaftHandler) HandleRaftResponse( - ctx context.Context, resp *storage.RaftMessageResponse, -) error { - if resp.RangeID == h.rangeID { - if h.dropResp == nil || h.dropResp(resp) { - return nil - } - } - return h.RaftMessageHandler.HandleRaftResponse(ctx, resp) -} - func TestStoreRangeMergeRaftSnapshot(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3353,9 +3286,12 @@ func TestMergeQueue(t *testing.T) { t.Run("non-collocated", func(t *testing.T) { reset(t) verifyUnmerged(t) - mtc.replicateRange(rhs().RangeID, 1) - mtc.transferLease(ctx, rhs().RangeID, 0, 1) - mtc.unreplicateRange(rhs().RangeID, 0) + rhsRangeID := rhs().RangeID + mtc.replicateRange(rhsRangeID, 1) + mtc.transferLease(ctx, rhsRangeID, 0, 1) + mtc.unreplicateRange(rhsRangeID, 0) + require.NoError(t, mtc.waitForUnreplicated(rhsRangeID, 0)) + clearRange(t, lhsStartKey, rhsEndKey) store.MustForceMergeScanAndProcess() verifyMerged(t) diff --git a/pkg/storage/client_metrics_test.go b/pkg/storage/client_metrics_test.go index 4fd43fcbcc21..c91d386c9e81 100644 --- a/pkg/storage/client_metrics_test.go +++ b/pkg/storage/client_metrics_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) func checkGauge(t *testing.T, id string, g *metric.Gauge, e int64) { @@ -313,8 +314,9 @@ func TestStoreMetrics(t *testing.T) { return mtc.unreplicateRangeNonFatal(replica.RangeID, 0) }) - // Force GC Scan on store 0 in order to fully remove range. - mtc.stores[1].MustForceReplicaGCScanAndProcess() + // Wait until we're sure that store 0 has successfully processed its removal. + require.NoError(t, mtc.waitForUnreplicated(replica.RangeID, 0)) + mtc.waitForValues(roachpb.Key("z"), []int64{0, 5, 5}) // Verify range count is as expected. diff --git a/pkg/storage/client_raft_helpers_test.go b/pkg/storage/client_raft_helpers_test.go new file mode 100644 index 000000000000..2e05bd9c4602 --- /dev/null +++ b/pkg/storage/client_raft_helpers_test.go @@ -0,0 +1,115 @@ +// Copyright 2019 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage_test + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "go.etcd.io/etcd/raft" +) + +// unreliableRaftHandler drops all Raft messages that are addressed to the +// specified rangeID, but lets all other messages through. +type unreliableRaftHandler struct { + rangeID roachpb.RangeID + storage.RaftMessageHandler + // If non-nil, can return false to avoid dropping a msg to rangeID + dropReq func(*storage.RaftMessageRequest) bool + dropHB func(*storage.RaftHeartbeat) bool + dropResp func(*storage.RaftMessageResponse) bool +} + +func (h *unreliableRaftHandler) HandleRaftRequest( + ctx context.Context, + req *storage.RaftMessageRequest, + respStream storage.RaftMessageResponseStream, +) *roachpb.Error { + if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { + reqCpy := *req + req = &reqCpy + req.Heartbeats = h.filterHeartbeats(req.Heartbeats) + req.HeartbeatResps = h.filterHeartbeats(req.HeartbeatResps) + if len(req.Heartbeats)+len(req.HeartbeatResps) == 0 { + // Entirely filtered. + return nil + } + } else if req.RangeID == h.rangeID { + if h.dropReq == nil || h.dropReq(req) { + log.Infof( + ctx, + "dropping Raft message %s", + raft.DescribeMessage(req.Message, func([]byte) string { + return "" + }), + ) + + return nil + } + } + return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) +} + +func (h *unreliableRaftHandler) filterHeartbeats( + hbs []storage.RaftHeartbeat, +) []storage.RaftHeartbeat { + if len(hbs) == 0 { + return hbs + } + var cpy []storage.RaftHeartbeat + for i := range hbs { + hb := &hbs[i] + if hb.RangeID != h.rangeID || (h.dropHB != nil && !h.dropHB(hb)) { + cpy = append(cpy, *hb) + } + } + return cpy +} + +func (h *unreliableRaftHandler) HandleRaftResponse( + ctx context.Context, resp *storage.RaftMessageResponse, +) error { + if resp.RangeID == h.rangeID { + if h.dropResp == nil || h.dropResp(resp) { + return nil + } + } + return h.RaftMessageHandler.HandleRaftResponse(ctx, resp) +} + +// mtcStoreRaftMessageHandler exists to allows a store to be stopped and +// restarted while maintaining a partition using an unreliableRaftHandler. +type mtcStoreRaftMessageHandler struct { + mtc *multiTestContext + storeIdx int +} + +func (h *mtcStoreRaftMessageHandler) HandleRaftRequest( + ctx context.Context, + req *storage.RaftMessageRequest, + respStream storage.RaftMessageResponseStream, +) *roachpb.Error { + return h.mtc.Store(h.storeIdx).HandleRaftRequest(ctx, req, respStream) +} + +func (h *mtcStoreRaftMessageHandler) HandleRaftResponse( + ctx context.Context, resp *storage.RaftMessageResponse, +) error { + return h.mtc.Store(h.storeIdx).HandleRaftResponse(ctx, resp) +} + +func (h *mtcStoreRaftMessageHandler) HandleSnapshot( + header *storage.SnapshotRequest_Header, respStream storage.SnapshotResponseStream, +) error { + return h.mtc.Store(h.storeIdx).HandleSnapshot(header, respStream) +} diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index e37ea59247de..a53866764b0b 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -1192,15 +1192,12 @@ func TestReplicateAfterRemoveAndSplit(t *testing.T) { return err } - if err := replicateRHS(); !testutils.IsError(err, storage.IntersectingSnapshotMsg) { - t.Fatalf("unexpected error %v", err) - } - - // Enable the replica GC queue so that the next attempt to replicate the RHS - // to store 2 will cause the obsolete replica to be GC'd allowing a - // subsequent replication to succeed. - mtc.stores[2].SetReplicaGCQueueActive(true) - + // This used to fail with IntersectingSnapshotMsg because we relied on replica + // GC to remove the LHS and that queue is disabled. Now we will detect that + // the LHS is not part of the range because of a ReplicaTooOldError and then + // we'll replicaGC the LHS in response. + // TODO(ajwerner): filter the reponses to node 2 or disable this eager + // replicaGC. testutils.SucceedsSoon(t, replicateRHS) } @@ -2992,6 +2989,52 @@ func TestReplicateRogueRemovedNode(t *testing.T) { defer mtc.Stop() mtc.Start(t, 3) + // We're going to set up the cluster with partitioning so that we can + // partition node 0 from the others. We do this by installing + // unreliableRaftHandler listeners on all three Stores which we can enable + // and disable with an atomic. The handler on the partitioned store filters + // out all messages while the handler on the other two stores only filters + // out messages from the partitioned store. When activated the configuration + // looks like: + // + // [0] + // x x + // / \ + // x x + // [1]<---->[2] + const partStore = 0 + var partitioned atomic.Value + partitioned.Store(false) + partRepl, err := mtc.stores[partStore].GetReplica(1) + if err != nil { + t.Fatal(err) + } + partReplDesc, err := partRepl.GetReplicaDescriptor() + if err != nil { + t.Fatal(err) + } + for _, s := range []int{0, 1, 2} { + s := s + h := &unreliableRaftHandler{ + rangeID: 1, + RaftMessageHandler: &mtcStoreRaftMessageHandler{ + mtc: mtc, + storeIdx: s, + }, + } + // Only filter messages from the partitioned store on the other + // two stores. + h.dropReq = func(req *storage.RaftMessageRequest) bool { + return partitioned.Load().(bool) && + (s == partStore || req.FromReplica.StoreID == partRepl.StoreID()) + } + h.dropHB = func(hb *storage.RaftHeartbeat) bool { + return partitioned.Load().(bool) && + (s == partStore || hb.FromReplicaID == partReplDesc.ReplicaID) + } + mtc.transport.Listen(mtc.stores[s].Ident.StoreID, h) + } + // First put the range on all three nodes. raftID := roachpb.RangeID(1) mtc.replicateRange(raftID, 1, 2) @@ -3036,7 +3079,9 @@ func TestReplicateRogueRemovedNode(t *testing.T) { } return nil }) - + // Partition nodes 1 and 2 from node 0. Otherwise they'd get a + // ReplicaTooOldError from node 0 and proceed to remove themselves. + partitioned.Store(true) // Bring node 2 back up. mtc.restartStore(2) @@ -3539,6 +3584,7 @@ func TestRemoveRangeWithoutGC(t *testing.T) { sc := storage.TestStoreConfig(nil) sc.TestingKnobs.DisableReplicaGCQueue = true + sc.TestingKnobs.DisableEagerReplicaRemoval = true mtc := &multiTestContext{storeConfig: &sc} defer mtc.Stop() mtc.Start(t, 2) @@ -3555,18 +3601,19 @@ func TestRemoveRangeWithoutGC(t *testing.T) { if err != nil { return err } - desc := rep.Desc() - if len(desc.InternalReplicas) != 1 { - return errors.Errorf("range has %d replicas", len(desc.InternalReplicas)) + if _, err := rep.IsDestroyed(); err == nil { + return errors.Errorf("range is still alive") } return nil }) // The replica's data is still on disk. + // We use an inconsistent scan because there's going to be an intent on the + // range descriptor to remove this replica. var desc roachpb.RangeDescriptor descKey := keys.RangeDescriptorKey(roachpb.RKeyMin) if ok, err := engine.MVCCGetProto(context.Background(), mtc.stores[0].Engine(), descKey, - mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{}); err != nil { + mtc.stores[0].Clock().Now(), &desc, engine.MVCCGetOptions{Inconsistent: true}); err != nil { t.Fatal(err) } else if !ok { t.Fatal("expected range descriptor to be present") diff --git a/pkg/storage/client_replica_gc_test.go b/pkg/storage/client_replica_gc_test.go index 7b8e41cf1b36..4162ffca8d85 100644 --- a/pkg/storage/client_replica_gc_test.go +++ b/pkg/storage/client_replica_gc_test.go @@ -148,8 +148,11 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { // removes a range from a store that no longer should have a replica. func TestReplicaGCQueueDropReplicaGCOnScan(t *testing.T) { defer leaktest.AfterTest(t)() - mtc := &multiTestContext{} + cfg := storage.TestStoreConfig(nil) + cfg.TestingKnobs.DisableEagerReplicaRemoval = true + mtc.storeConfig = &cfg + defer mtc.Stop() mtc.Start(t, 3) // Disable the replica gc queue to prevent direct removal of replica. diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 6db3d819b5f9..f3cb67ba9fc8 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -3314,8 +3314,12 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { // different replicaID than the split trigger expects. add := func() { _, err := tc.AddReplicas(kRHS, tc.Target(1)) - if !testutils.IsError(err, `snapshot intersects existing range`) { - t.Fatalf(`expected snapshot intersects existing range" error got: %+v`, err) + // The "snapshot intersects existing range" error is expected if the store + // has not heard a raft message addressed to a later replica ID while the + // "was not found on" error is expected if the store has heard that it has + // a newer replica ID before receiving the snapshot. + if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) { + t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err) } } for i := 0; i < 5; i++ { @@ -3361,7 +3365,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { if err != nil { return err } - if desc := repl.Desc(); !descLHS.Equal(desc) { + if desc := repl.Desc(); desc.IsInitialized() && !descLHS.Equal(desc) { + require.NoError(t, store.ManualReplicaGC(repl)) return errors.Errorf("expected %s got %s", &descLHS, desc) } return nil diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 11c57695ff08..ff93915bb36e 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -1243,6 +1243,21 @@ func (m *multiTestContext) unreplicateRangeNonFatal(rangeID roachpb.RangeID, des return err } +func (m *multiTestContext) waitForUnreplicated(rangeID roachpb.RangeID, dest int) error { + // Wait for the unreplications to complete on destination node. + return retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { + _, err := m.stores[dest].GetReplica(rangeID) + switch err.(type) { + case nil: + return fmt.Errorf("replica still exists on dest %d", dest) + case *roachpb.RangeNotFoundError: + return nil + default: + return err + } + }) +} + // readIntFromEngines reads the current integer value at the given key // from all configured engines, filling in zeros when the value is not // found. Returns a slice of the same length as mtc.engines. diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 3e3530971c45..ea54e03d5bd6 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -221,16 +221,6 @@ func NewTestStorePool(cfg StoreConfig) *StorePool { ) } -func (r *Replica) ReplicaID() roachpb.ReplicaID { - r.mu.RLock() - defer r.mu.RUnlock() - return r.ReplicaIDLocked() -} - -func (r *Replica) ReplicaIDLocked() roachpb.ReplicaID { - return r.mu.replicaID -} - func (r *Replica) AssertState(ctx context.Context, reader engine.Reader) { r.raftMu.Lock() defer r.raftMu.Unlock() @@ -269,10 +259,13 @@ func (r *Replica) InitQuotaPool(quota uint64) error { r.mu.Lock() defer r.mu.Unlock() var appliedIndex uint64 - err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { + isRemoved, err := r.withRaftGroupLocked(false, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, err error) { appliedIndex = r.BasicStatus().Applied return false, nil }) + if isRemoved { + _, err = r.IsDestroyed() + } if err != nil { return err } diff --git a/pkg/storage/merge_queue.go b/pkg/storage/merge_queue.go index 208499998750..7db6813b146b 100644 --- a/pkg/storage/merge_queue.go +++ b/pkg/storage/merge_queue.go @@ -291,13 +291,11 @@ func (mq *mergeQueue) process( log.VEventf(ctx, 2, `%v`, err) return err } - rhsDesc, err = maybeLeaveAtomicChangeReplicas(ctx, store, rhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) return err } - rhsDesc, err = removeLearners(ctx, db, rhsDesc) if err != nil { log.VEventf(ctx, 2, `%v`, err) diff --git a/pkg/storage/queue.go b/pkg/storage/queue.go index 3ef925cc0304..ca3a996f2210 100644 --- a/pkg/storage/queue.go +++ b/pkg/storage/queue.go @@ -59,8 +59,9 @@ type processCallback func(error) // A replicaItem holds a replica and metadata about its queue state and // processing state. type replicaItem struct { - value roachpb.RangeID - seq int // enforce FIFO order for equal priorities + value roachpb.RangeID + replicaID roachpb.ReplicaID + seq int // enforce FIFO order for equal priorities // fields used when a replicaItem is enqueued in a priority queue. priority float64 @@ -89,6 +90,12 @@ func (i *replicaItem) registerCallback(cb processCallback) { i.callbacks = append(i.callbacks, cb) } +// sameReplica returns true if the passed replicaID matches the item's replica +// ID or the item's replica ID is zero. +func (i *replicaItem) sameReplica(replicaID roachpb.ReplicaID) bool { + return i.replicaID == 0 || i.replicaID == replicaID +} + // A priorityQueue implements heap.Interface and holds replicaItems. type priorityQueue struct { seqGen int @@ -180,6 +187,7 @@ func shouldQueueAgain(now, last hlc.Timestamp, minInterval time.Duration) (bool, // extraction. Establish a sane interface and use that. type replicaInQueue interface { AnnotateCtx(context.Context) context.Context + ReplicaID() roachpb.ReplicaID StoreID() roachpb.StoreID GetRangeID() roachpb.RangeID IsInitialized() bool @@ -487,7 +495,7 @@ func (h baseQueueHelper) MaybeAdd(ctx context.Context, repl replicaInQueue, now } func (h baseQueueHelper) Add(ctx context.Context, repl replicaInQueue, prio float64) { - _, err := h.bq.addInternal(ctx, repl.Desc(), prio) + _, err := h.bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), prio) if err != nil && log.V(1) { log.Infof(ctx, "during Add: %s", err) } @@ -595,7 +603,7 @@ func (bq *baseQueue) maybeAdd(ctx context.Context, repl replicaInQueue, now hlc. if !should { return } - if _, err := bq.addInternal(ctx, repl.Desc(), priority); !isExpectedQueueError(err) { + if _, err := bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority); !isExpectedQueueError(err) { log.Errorf(ctx, "unable to add: %+v", err) } } @@ -612,7 +620,7 @@ func (bq *baseQueue) requiresSplit(cfg *config.SystemConfig, repl replicaInQueue // the replica is already queued at a lower priority, updates the existing // priority. Expects the queue lock to be held by caller. func (bq *baseQueue) addInternal( - ctx context.Context, desc *roachpb.RangeDescriptor, priority float64, + ctx context.Context, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, priority float64, ) (bool, error) { // NB: this is intentionally outside of bq.mu to avoid having to consider // lock ordering constraints. @@ -665,7 +673,7 @@ func (bq *baseQueue) addInternal( if log.V(3) { log.Infof(ctx, "adding: priority=%0.3f", priority) } - item = &replicaItem{value: desc.RangeID, priority: priority} + item = &replicaItem{value: desc.RangeID, replicaID: replicaID, priority: priority} bq.addLocked(item) // If adding this replica has pushed the queue past its maximum size, @@ -856,6 +864,9 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er if !repl.IsInitialized() { // We checked this when adding the replica, but we need to check it again // in case this is a different replica with the same range ID (see #14193). + // This is possible in the case where the replica was enqueued while not + // having a replica ID, perhaps due to a pre-emptive snapshot, and has + // since been removed and re-added at a different replica ID. return errors.New("cannot process uninitialized replica") } @@ -1092,21 +1103,21 @@ func (bq *baseQueue) addToPurgatoryLocked( // Remove all items from purgatory into a copied slice. bq.mu.Lock() - ranges := make([]roachpb.RangeID, 0, len(bq.mu.purgatory)) + ranges := make([]*replicaItem, 0, len(bq.mu.purgatory)) for rangeID := range bq.mu.purgatory { item := bq.mu.replicas[rangeID] if item == nil { log.Fatalf(ctx, "r%d is in purgatory but not in replicas", rangeID) } item.setProcessing() - ranges = append(ranges, item.value) + ranges = append(ranges, item) bq.removeFromPurgatoryLocked(item) } bq.mu.Unlock() - for _, id := range ranges { - repl, err := bq.getReplica(id) - if err != nil { + for _, item := range ranges { + repl, err := bq.getReplica(item.value) + if err != nil || !item.sameReplica(repl.ReplicaID()) { continue } annotatedCtx := repl.AnnotateCtx(ctx) @@ -1168,11 +1179,12 @@ func (bq *baseQueue) pop() replicaInQueue { bq.mu.Unlock() repl, _ := bq.getReplica(item.value) - if repl != nil { + if repl != nil && item.sameReplica(repl.ReplicaID()) { return repl } - // Replica not found, remove from set and try again. + // Replica not found or was recreated with a new replica ID, remove from + // set and try again. bq.mu.Lock() bq.removeFromReplicaSetLocked(item.value) } diff --git a/pkg/storage/queue_concurrency_test.go b/pkg/storage/queue_concurrency_test.go index b87d4a8f1c27..afe28799175d 100644 --- a/pkg/storage/queue_concurrency_test.go +++ b/pkg/storage/queue_concurrency_test.go @@ -145,7 +145,8 @@ func (fakeQueueImpl) purgatoryChan() <-chan time.Time { } type fakeReplica struct { - id roachpb.RangeID + id roachpb.RangeID + replicaID roachpb.ReplicaID } func (fr *fakeReplica) AnnotateCtx(ctx context.Context) context.Context { return ctx } @@ -153,6 +154,7 @@ func (fr *fakeReplica) StoreID() roachpb.StoreID { return 1 } func (fr *fakeReplica) GetRangeID() roachpb.RangeID { return fr.id } +func (fr *fakeReplica) ReplicaID() roachpb.ReplicaID { return fr.replicaID } func (fr *fakeReplica) IsInitialized() bool { return true } func (fr *fakeReplica) IsDestroyed() (DestroyReason, error) { return destroyReasonAlive, nil } func (fr *fakeReplica) Desc() *roachpb.RangeDescriptor { diff --git a/pkg/storage/queue_helpers_testutil.go b/pkg/storage/queue_helpers_testutil.go index 687bebeb68be..addb45b3331d 100644 --- a/pkg/storage/queue_helpers_testutil.go +++ b/pkg/storage/queue_helpers_testutil.go @@ -23,7 +23,7 @@ import ( func (bq *baseQueue) testingAdd( ctx context.Context, repl replicaInQueue, priority float64, ) (bool, error) { - return bq.addInternal(ctx, repl.Desc(), priority) + return bq.addInternal(ctx, repl.Desc(), repl.ReplicaID(), priority) } func forceScanAndProcess(s *Store, q *baseQueue) error { diff --git a/pkg/storage/queue_test.go b/pkg/storage/queue_test.go index df2cd9bf4cf1..51d8dc4e18dd 100644 --- a/pkg/storage/queue_test.go +++ b/pkg/storage/queue_test.go @@ -13,6 +13,7 @@ package storage import ( "container/heap" "context" + "fmt" "strconv" "sync/atomic" "testing" @@ -33,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // testQueueImpl implements queueImpl with a closure for shouldQueue. @@ -1099,6 +1101,42 @@ func TestBaseQueueProcessConcurrently(t *testing.T) { assertProcessedAndProcessing(3, 0) } +// TestBaseQueueReplicaChange ensures that if a replica is added to the queue +// with a non-zero replica ID then it is only popped if the retrieved replica +// from the getReplica() function has the same replica ID. +func TestBaseQueueChangeReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + // The testContext exists only to construct the baseQueue. + tc := testContext{} + stopper := stop.NewStopper() + ctx := context.Background() + defer stopper.Stop(ctx) + tc.Start(t, stopper) + testQueue := &testQueueImpl{ + shouldQueueFn: func(now hlc.Timestamp, r *Replica) (shouldQueue bool, priority float64) { + return true, 1.0 + }, + } + bq := makeTestBaseQueue("test", testQueue, tc.store, tc.gossip, queueConfig{ + maxSize: defaultQueueMaxSize, + acceptsUnsplitRanges: true, + }) + r := &fakeReplica{id: 1, replicaID: 1} + bq.mu.Lock() + bq.getReplica = func(rangeID roachpb.RangeID) (replicaInQueue, error) { + if rangeID != 1 { + panic(fmt.Errorf("expected range id 1, got %d", rangeID)) + } + return r, nil + } + bq.mu.Unlock() + bq.maybeAdd(ctx, r, tc.store.Clock().Now()) + require.Equal(t, r, bq.pop()) + bq.maybeAdd(ctx, r, tc.store.Clock().Now()) + r.replicaID = 2 + require.Nil(t, bq.pop()) +} + func TestBaseQueueRequeue(t *testing.T) { defer leaktest.AfterTest(t)() tc := testContext{} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index f6eaf43f0116..546cc0124db1 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -242,6 +242,7 @@ type Replica struct { syncutil.RWMutex // The destroyed status of a replica indicating if it's alive, corrupt, // scheduled for destruction or has been GCed. + // destroyStatus should only be set while also holding the raftMu. destroyStatus // Is the range quiescent? Quiescent ranges are not Tick()'d and unquiesce // whenever a Raft operation is performed. @@ -346,6 +347,7 @@ type Replica struct { // The minimum allowed ID for this replica. Initialized from // RaftTombstone.NextReplicaID. minReplicaID roachpb.ReplicaID + // The ID of the leader replica within the Raft group. Used to determine // when the leadership changes. leaderID roachpb.ReplicaID @@ -610,6 +612,14 @@ func (r *Replica) String() string { return fmt.Sprintf("[n%d,s%d,r%s]", r.store.Ident.NodeID, r.store.Ident.StoreID, &r.rangeStr) } +// ReplicaID returns the ID for the Replica. It may be zero if the replica does +// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +func (r *Replica) ReplicaID() roachpb.ReplicaID { + r.mu.RLock() + defer r.mu.RUnlock() + return r.mu.replicaID +} + // cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. // It requires that both Replica.mu and Replica.raftMu are exclusively held. diff --git a/pkg/storage/replica_application_result.go b/pkg/storage/replica_application_result.go index aa3a0116a841..0e0b8afcfcb9 100644 --- a/pkg/storage/replica_application_result.go +++ b/pkg/storage/replica_application_result.go @@ -293,22 +293,35 @@ func (r *Replica) handleComputeChecksumResult(ctx context.Context, cc *storagepb r.computeChecksumPostApply(ctx, *cc) } -func (r *Replica) handleChangeReplicasResult(ctx context.Context, chng *storagepb.ChangeReplicas) { - storeID := r.store.StoreID() - var found bool - for _, rDesc := range chng.Replicas() { - if rDesc.StoreID == storeID { - found = true - break - } +func (r *Replica) handleChangeReplicasResult( + ctx context.Context, chng *storagepb.ChangeReplicas, +) (changeRemovedReplica bool) { + // If this command removes us then we need to go through the process of + // removing our replica from the store. After this method returns the code + // should roughly return all the way up to whoever called handleRaftReady + // and this Replica should never be heard from again. We can detect if this + // change removed us by inspecting the replica's destroyStatus. We check the + // destroy status before processing a raft ready so if we find ourselves with + // removal pending at this point then we know that this command must be + // responsible. + if ds, _ := r.IsDestroyed(); ds != destroyReasonRemovalPending { + return false + } + if r.store.TestingKnobs().DisableEagerReplicaRemoval { + return true + } + if log.V(1) { + log.Infof(ctx, "removing replica due to ChangeReplicasTrigger: %v", chng) + } + if err := r.postDestroyRaftMuLocked(ctx, r.GetMVCCStats()); err != nil { + log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err) } - if !found { - // This wants to run as late as possible, maximizing the chances - // that the other nodes have finished this command as well (since - // processing the removal from the queue looks up the Range at the - // lease holder, being too early here turns this into a no-op). - r.store.replicaGCQueue.AddAsync(ctx, r, replicaGCPriorityRemoved) + if err := r.store.removeReplicaImpl(ctx, r, chng.Desc.NextReplicaID, RemoveOptions{ + DestroyData: false, // We already destroyed the data when the batch committed. + }); err != nil { + log.Fatalf(ctx, "failed to remove replica: %v", err) } + return true } func (r *Replica) handleRaftLogDeltaResult(ctx context.Context, delta int64) { diff --git a/pkg/storage/replica_application_state_machine.go b/pkg/storage/replica_application_state_machine.go index ac8432388c09..a6b83c3c846b 100644 --- a/pkg/storage/replica_application_state_machine.go +++ b/pkg/storage/replica_application_state_machine.go @@ -13,6 +13,7 @@ package storage import ( "context" "fmt" + "math" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -375,6 +376,9 @@ type replicaAppBatch struct { // triggered a migration to the replica applied state key. If so, this // migration will be performed when the application batch is committed. migrateToAppliedStateKey bool + // changeRemovesReplica tracks whether the command in the batch (there must + // be only one) removes this replica from the range. + changeRemovesReplica bool // Statistics. entries int @@ -514,6 +518,20 @@ func (b *replicaAppBatch) stageWriteBatch(ctx context.Context, cmd *replicatedCm return nil } +// changeRemovesStore returns true if any of the removals in this change have storeID. +func changeRemovesStore( + desc *roachpb.RangeDescriptor, change *storagepb.ChangeReplicas, storeID roachpb.StoreID, +) bool { + _, existsInDesc := desc.GetReplicaDescriptor(storeID) + // NB: if we're catching up from a preemptive snapshot then we won't + // exist in the current descriptor. + if !existsInDesc { + return false + } + _, existsInChange := change.Desc.GetReplicaDescriptor(storeID) + return !existsInChange +} + // runPreApplyTriggers runs any triggers that must fire before a command is // applied. It may modify the command's ReplicatedEvalResult. func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicatedCmd) error { @@ -560,16 +578,23 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat if merge := res.Merge; merge != nil { // Merges require the subsumed range to be atomically deleted when the // merge transaction commits. + + // If our range currently has a non-zero replica ID then we know we're + // safe to commit this merge because of the invariants provided to us + // by the merge protocol. Namely if this committed we know that if the + // command committed then all of the replicas in the range descriptor + // are collocated when this command commits. If we do not have a non-zero + // replica ID then the logic in Stage should detect that and destroy our + // preemptive snapshot so we shouldn't ever get here. rhsRepl, err := b.r.store.GetReplica(merge.RightDesc.RangeID) if err != nil { return wrapWithNonDeterministicFailure(err, "unable to get replica for merge") } - const rangeIDLocalOnly = true const mustClearRange = false if err := rhsRepl.preDestroyRaftMuLocked( - ctx, b.batch, b.batch, merge.RightDesc.NextReplicaID, rangeIDLocalOnly, mustClearRange, + ctx, b.batch, b.batch, roachpb.ReplicaID(math.MaxInt32), clearRangeIDLocalOnly, mustClearRange, ); err != nil { - return wrapWithNonDeterministicFailure(err, "unable to destroy range before merge") + return wrapWithNonDeterministicFailure(err, "unable to destroy replica before merge") } } @@ -597,6 +622,41 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat } } + // Detect if this command will remove us from the range. + // If so we stage the removal of all of our range data into this batch. + // We'll complete the removal when it commits. Later logic detects the + // removal by inspecting the destroy status + if change := res.ChangeReplicas; change != nil && + changeRemovesStore(b.state.Desc, change, b.r.store.StoreID()) { + // Delete all of the local data. We're going to delete this hard state too. + // In order for this to be safe we need code above this to promise that we're + // never going to write hard state in response to a message for a later + // replica (with a different replica ID) to this range state. + // Furthermore we mark the replica as destroyed so that new commands are not + // accepted. The replica will be destroyed in handleChangeReplicas. + // Note that we must be holding the raftMu here because we're in the + // midst of application. + b.r.mu.Lock() + b.r.mu.destroyStatus.Set( + roachpb.NewRangeNotFoundError(b.r.RangeID, b.r.store.StoreID()), + destroyReasonRemovalPending) + b.r.mu.Unlock() + b.changeRemovesReplica = true + const mustUseRangeDeletionTombstone = true + if !b.r.store.TestingKnobs().DisableEagerReplicaRemoval { + if err := b.r.preDestroyRaftMuLocked( + ctx, + b.batch, + b.batch, + change.Desc.NextReplicaID, + clearAll, + mustUseRangeDeletionTombstone, + ); err != nil { + return wrapWithNonDeterministicFailure(err, "unable to destroy replica before removal") + } + } + } + // Provide the command's corresponding logical operations to the Replica's // rangefeed. Only do so if the WriteBatch is non-nil, in which case the // rangefeed requires there to be a corresponding logical operation log or @@ -609,6 +669,7 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat } else if cmd.raftCmd.LogicalOpLog != nil { log.Fatalf(ctx, "non-nil logical op log with nil write batch: %v", cmd.raftCmd) } + return nil } @@ -729,6 +790,9 @@ func (b *replicaAppBatch) ApplyToStateMachine(ctx context.Context) error { // batch's RocksDB batch. This records the highest raft and lease index that // have been applied as of this batch. It also records the Range's mvcc stats. func (b *replicaAppBatch) addAppliedStateKeyToBatch(ctx context.Context) error { + if b.changeRemovesReplica { + return nil + } loader := &b.r.raftMu.stateLoader if b.migrateToAppliedStateKey { // A Raft command wants us to begin using the RangeAppliedState key @@ -857,14 +921,18 @@ func (sm *replicaStateMachine) ApplySideEffects( // before notifying a potentially waiting client. clearTrivialReplicatedEvalResultFields(cmd.replicatedResult()) if !cmd.IsTrivial() { - shouldAssert := sm.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + shouldAssert, isRemoved := sm.handleNonTrivialReplicatedEvalResult(ctx, *cmd.replicatedResult()) + + if isRemoved { + return nil, apply.ErrRemoved + } // NB: Perform state assertion before acknowledging the client. // Some tests (TestRangeStatsInit) assumes that once the store has started // and the first range has a lease that there will not be a later hard-state. if shouldAssert { + sm.r.mu.Lock() // Assert that the on-disk state doesn't diverge from the in-memory // state as a result of the side effects. - sm.r.mu.Lock() sm.r.assertStateLocked(ctx, sm.r.store.Engine()) sm.r.mu.Unlock() sm.stats.stateAssertions++ @@ -923,7 +991,7 @@ func (sm *replicaStateMachine) ApplySideEffects( // to pass a replicatedResult that does not imply any side-effects. func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( ctx context.Context, rResult storagepb.ReplicatedEvalResult, -) (shouldAssert bool) { +) (shouldAssert, isRemoved bool) { // Assert that this replicatedResult implies at least one side-effect. if rResult.Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "zero-value ReplicatedEvalResult passed to handleNonTrivialReplicatedEvalResult") @@ -955,7 +1023,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( // we want to assert that these two states do not diverge. shouldAssert = !rResult.Equal(storagepb.ReplicatedEvalResult{}) if !shouldAssert { - return false + return false, sm.batch.changeRemovesReplica } if rResult.Split != nil { @@ -995,7 +1063,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( } if rResult.ChangeReplicas != nil { - sm.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) + isRemoved = sm.r.handleChangeReplicasResult(ctx, rResult.ChangeReplicas) rResult.ChangeReplicas = nil } @@ -1007,7 +1075,7 @@ func (sm *replicaStateMachine) handleNonTrivialReplicatedEvalResult( if !rResult.Equal(storagepb.ReplicatedEvalResult{}) { log.Fatalf(ctx, "unhandled field in ReplicatedEvalResult: %s", pretty.Diff(rResult, storagepb.ReplicatedEvalResult{})) } - return true + return true, isRemoved } func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *replicatedCmd) error { @@ -1023,10 +1091,17 @@ func (sm *replicaStateMachine) maybeApplyConfChange(ctx context.Context, cmd *re // to raft. return nil } - return sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + // If we're removed then we know that must have happened due to this + // command and so we're happy to not apply this conf change. + isRemoved, err := sm.r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ApplyConfChange(cmd.confChange.ConfChangeI) return true, nil }) + if isRemoved { + _, err = sm.r.IsDestroyed() + err = wrapWithNonDeterministicFailure(err, "failed to apply config change because replica was already removed") + } + return err default: panic("unexpected") } diff --git a/pkg/storage/replica_destroy.go b/pkg/storage/replica_destroy.go index 0fb60c307472..efd25b4de1b2 100644 --- a/pkg/storage/replica_destroy.go +++ b/pkg/storage/replica_destroy.go @@ -12,6 +12,7 @@ package storage import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -29,7 +30,10 @@ type DestroyReason int const ( // The replica is alive. destroyReasonAlive DestroyReason = iota - // The replica has been marked for GC, but hasn't been GCed yet. + // The replica is in the process of being removed but has not been removed + // yet. It exists to avoid races between two threads which may decide to + // destroy a replica (e.g. processing a ChangeRelicasTrigger removing the + // range and receiving a raft message with a higher replica ID). destroyReasonRemovalPending // The replica has been GCed. destroyReasonRemoved @@ -43,15 +47,15 @@ type destroyStatus struct { err error } +func (s destroyStatus) String() string { + return fmt.Sprintf("{%v %d}", s.err, s.reason) +} + func (s *destroyStatus) Set(err error, reason DestroyReason) { s.err = err s.reason = reason } -func (s *destroyStatus) Reset() { - s.Set(nil, destroyReasonAlive) -} - // IsAlive returns true when a replica is alive. func (s destroyStatus) IsAlive() bool { return s.reason == destroyReasonAlive @@ -62,16 +66,22 @@ func (s destroyStatus) Removed() bool { return s.reason == destroyReasonRemoved } +// RemovalPending returns whether the replica is removed or in the process of +// being removed. +func (s destroyStatus) RemovalPending() bool { + return s.reason == destroyReasonRemovalPending || s.reason == destroyReasonRemoved +} + func (r *Replica) preDestroyRaftMuLocked( ctx context.Context, reader engine.Reader, writer engine.Writer, nextReplicaID roachpb.ReplicaID, - rangeIDLocalOnly bool, + clearOpt clearRangeOption, mustClearRange bool, ) error { desc := r.Desc() - err := clearRangeData(desc, reader, writer, rangeIDLocalOnly, mustClearRange) + err := clearRangeData(desc, reader, writer, clearOpt, mustClearRange) if err != nil { return err } @@ -114,6 +124,53 @@ func (r *Replica) postDestroyRaftMuLocked(ctx context.Context, ms enginepb.MVCCS return nil } +// destroyUninitializedReplicaRaftMuLocked is called when we know that an +// uninitialized replica (which knows its replica ID but not its key range) has +// been removed and re-added as a different replica (with a new replica +// ID). We're safe to GC its hard state because nobody cares about our votes +// anymore. We can't GC the range's data because we don't know where it is. +// Fortunately this isn't a problem because the range must not have data +// because a replica must apply a snapshot before having data and if we had +// applied a snapshot then we'd be initialized. This replica may have been +// created in anticipation of a split in which case we'll clear its data when +// the split trigger is applied. +func (r *Replica) destroyUninitializedReplicaRaftMuLocked( + ctx context.Context, nextReplicaID roachpb.ReplicaID, +) error { + batch := r.Engine().NewWriteOnlyBatch() + defer batch.Close() + + // Clear the range ID local data including the hard state. + // We don't know about any user data so we can't clear any user data. + // See the comment on this method for why this is safe. + if err := r.preDestroyRaftMuLocked( + ctx, + r.Engine(), + batch, + nextReplicaID, + clearRangeIDLocalOnly, + false, /* mustClearRange */ + ); err != nil { + return err + } + + // We need to sync here because we are potentially deleting sideloaded + // proposals from the file system next. We could write the tombstone only in + // a synchronous batch first and then delete the data alternatively, but + // then need to handle the case in which there is both the tombstone and + // leftover replica data. + if err := batch.Commit(true); err != nil { + return err + } + + if r.raftMu.sideloaded != nil { + if err := r.raftMu.sideloaded.Clear(ctx); err != nil { + log.Warningf(ctx, "failed to remove sideload storage for %v: %v", r, err) + } + } + return nil +} + // destroyRaftMuLocked deletes data associated with a replica, leaving a // tombstone. func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb.ReplicaID) error { @@ -128,7 +185,7 @@ func (r *Replica) destroyRaftMuLocked(ctx context.Context, nextReplicaID roachpb r.Engine(), batch, nextReplicaID, - false, /* rangeIDLocalOnly */ + clearAll, false, /* mustClearRange */ ); err != nil { return err diff --git a/pkg/storage/replica_gc_queue.go b/pkg/storage/replica_gc_queue.go index 33d64df45dca..27878c876758 100644 --- a/pkg/storage/replica_gc_queue.go +++ b/pkg/storage/replica_gc_queue.go @@ -114,13 +114,13 @@ func newReplicaGCQueue(store *Store, db *client.DB, gossip *gossip.Gossip) *repl // in the past. func (rgcq *replicaGCQueue) shouldQueue( ctx context.Context, now hlc.Timestamp, repl *Replica, _ *config.SystemConfig, -) (bool, float64) { +) (shouldQ bool, prio float64) { + lastCheck, err := repl.GetLastReplicaGCTimestamp(ctx) if err != nil { log.Errorf(ctx, "could not read last replica GC timestamp: %+v", err) return false, 0 } - if _, currentMember := repl.Desc().GetReplicaDescriptor(repl.store.StoreID()); !currentMember { return true, replicaGCPriorityRemoved } @@ -215,6 +215,11 @@ func (rgcq *replicaGCQueue) process( } replyDesc := rs[0] + repl.mu.RLock() + replicaID := repl.mu.replicaID + ticks := repl.mu.ticks + repl.mu.RUnlock() + // Now check whether the replica is meant to still exist. // Maybe it was deleted "under us" by being moved. currentDesc, currentMember := replyDesc.GetReplicaDescriptor(repl.store.StoreID()) @@ -234,11 +239,6 @@ func (rgcq *replicaGCQueue) process( // We are no longer a member of this range, but the range still exists. // Clean up our local data. - repl.mu.RLock() - replicaID := repl.mu.replicaID - ticks := repl.mu.ticks - repl.mu.RUnlock() - if replicaID == 0 { // This is a preemptive replica. GC'ing a preemptive replica is a // good idea if and only if the up-replication that it was a part of @@ -284,13 +284,24 @@ func (rgcq *replicaGCQueue) process( rgcq.metrics.RemoveReplicaCount.Inc(1) log.VEventf(ctx, 1, "destroying local data") + + nextReplicaID := replyDesc.NextReplicaID + if currentMember { + // If we're a member of the current range descriptor then we must not put + // down a tombstone at replyDesc.NextReplicaID as that would prevent us + // from getting a snapshot and becoming a part of the rang.e + nextReplicaID = currentDesc.ReplicaID + } // Note that this seems racy - we didn't hold any locks between reading // the range descriptor above and deciding to remove the replica - but // we pass in the NextReplicaID to detect situations in which the // replica became "non-gc'able" in the meantime by checking (with raftMu // held throughout) whether the replicaID is still smaller than the - // NextReplicaID. - if err := repl.store.RemoveReplica(ctx, repl, replyDesc.NextReplicaID, RemoveOptions{ + // NextReplicaID. Given non-zero replica IDs don't change this is only + // possible if we currently think we're processing a pre-emptive snapshot + // but discover in RemoveReplica that this range has since been added and + // knows that. + if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{ DestroyData: true, }); err != nil { return err @@ -328,13 +339,6 @@ func (rgcq *replicaGCQueue) process( } } - // We don't have the last NextReplicaID for the subsumed range, nor can we - // obtain it, but that's OK: we can just be conservative and use the maximum - // possible replica ID. store.RemoveReplica will write a tombstone using - // this maximum possible replica ID, which would normally be problematic, as - // it would prevent this store from ever having a new replica of the removed - // range. In this case, however, it's copacetic, as subsumed ranges _can't_ - // have new replicas. const nextReplicaID = math.MaxInt32 if err := repl.store.RemoveReplica(ctx, repl, nextReplicaID, RemoveOptions{ DestroyData: true, diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index b28a3a33591b..e8deb1f9181d 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -150,26 +150,25 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked( } r.rangeStr.store(replicaID, r.mu.state.Desc) r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) - if err := r.setReplicaIDRaftMuLockedMuLocked(replicaID); err != nil { - return err + if r.mu.replicaID == 0 { + if err := r.setReplicaIDRaftMuLockedMuLocked(ctx, replicaID); err != nil { + return err + } + } else if r.mu.replicaID != replicaID { + log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d", + r.mu.replicaID, replicaID) } - r.assertStateLocked(ctx, r.store.Engine()) return nil } -func (r *Replica) setReplicaID(replicaID roachpb.ReplicaID) error { - r.raftMu.Lock() - defer r.raftMu.Unlock() - r.mu.Lock() - defer r.mu.Unlock() - return r.setReplicaIDRaftMuLockedMuLocked(replicaID) -} - -func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) error { - if r.mu.replicaID == replicaID { +func (r *Replica) setReplicaIDRaftMuLockedMuLocked( + ctx context.Context, replicaID roachpb.ReplicaID, +) error { + if r.mu.replicaID != 0 { + log.Fatalf(ctx, "cannot set replica ID from anything other than 0, currently %d", + r.mu.replicaID) // The common case: the replica ID is unchanged. - return nil } if replicaID == 0 { // If the incoming message does not have a new replica ID it is a @@ -183,17 +182,11 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) if r.mu.replicaID > replicaID { return errors.Errorf("replicaID cannot move backwards from %d to %d", r.mu.replicaID, replicaID) } - - if r.mu.destroyStatus.reason == destroyReasonRemovalPending { - // An earlier incarnation of this replica was removed, but apparently it has been re-added - // now, so reset the status. - r.mu.destroyStatus.Reset() + if r.mu.destroyStatus.RemovalPending() { + // This replica has been marked for removal and we're trying to resurrect it. + log.Fatalf(ctx, "cannot resurect replica %d", r.mu.replicaID) } - // if r.mu.replicaID != 0 { - // // TODO(bdarnell): clean up previous raftGroup (update peers) - // } - // Initialize or update the sideloaded storage. If the sideloaded storage // already exists (which is iff the previous replicaID was non-zero), then // we have to move the contained files over (this corresponds to the case in @@ -220,24 +213,13 @@ func (r *Replica) setReplicaIDRaftMuLockedMuLocked(replicaID roachpb.ReplicaID) return errors.Wrap(err, "while initializing sideloaded storage") } - previousReplicaID := r.mu.replicaID r.mu.replicaID = replicaID + r.mu.minReplicaID = replicaID + 1 - if replicaID >= r.mu.minReplicaID { - r.mu.minReplicaID = replicaID + 1 - } - // Reset the raft group to force its recreation on next usage. - r.mu.internalRaftGroup = nil - - // If there was a previous replica, repropose its pending commands under - // this new incarnation. - if previousReplicaID != 0 { - if log.V(1) { - log.Infof(r.AnnotateCtx(context.TODO()), "changed replica ID from %d to %d", - previousReplicaID, replicaID) - } - // repropose all pending commands under new replicaID. - r.refreshProposalsLocked(0, reasonReplicaIDChanged) + // Sanity check that we do not already have a raft group as we did not + // know our replica ID before this call. + if r.mu.internalRaftGroup != nil { + log.Fatalf(ctx, "somehow had an initialized raft group on a zero valued replica") } return nil @@ -270,8 +252,9 @@ func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { // If this replica hasn't initialized the Raft group, create it and // unquiesce and wake the leader to ensure the replica comes up to date. initialized := r.mu.internalRaftGroup != nil + removed := !r.mu.destroyStatus.IsAlive() r.mu.RUnlock() - if initialized { + if initialized || removed { return } @@ -281,7 +264,9 @@ func (r *Replica) maybeInitializeRaftGroup(ctx context.Context) { r.mu.Lock() defer r.mu.Unlock() - if err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + // If we raced on checking the destroyStatus above that's fine as + // the below withRaftGroupLocked will no-op. + if _, err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { return true, nil }); err != nil { log.VErrEventf(ctx, 1, "unable to initialize raft group: %s", err) diff --git a/pkg/storage/replica_learner_test.go b/pkg/storage/replica_learner_test.go index 6f2a5ff6825a..b8d3564f7a0e 100644 --- a/pkg/storage/replica_learner_test.go +++ b/pkg/storage/replica_learner_test.go @@ -429,13 +429,25 @@ func TestReplicaGCQueueSeesLearnerOrJointConfig(t *testing.T) { require.Contains(t, tracing.FormatRecordedSpans(trace), msg) return tc.LookupRangeOrFatal(t, scratchStartKey) } - desc := checkNoGC() // Make sure it didn't collect the learner. require.NotEmpty(t, desc.Replicas().Learners()) // Now get the range into a joint config. tc.RemoveReplicasOrFatal(t, scratchStartKey, tc.Target(1)) // remove learner + // We need to wait until the LHS range has been GC'd on server 1 before the + // RHS can be successfully added. Wait for the RHS to exist. + testutils.SucceedsSoon(t, func() error { + s := tc.Server(1) + firstStore, err := s.GetStores().(*storage.Stores).GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + repl := firstStore.LookupReplica(roachpb.RKey(scratchStartKey)) + if repl != nil { + return fmt.Errorf("replica has not yet been GC'd") + } + return nil + }) + ltk.withStopAfterJointConfig(func() { desc = tc.AddReplicasOrFatal(t, scratchStartKey, tc.Target(1)) require.Len(t, desc.Replicas().Filter(predIncoming), 1, desc) @@ -549,7 +561,7 @@ func TestLearnerAdminChangeReplicasRace(t *testing.T) { // that the descriptor has changed since the AdminChangeReplicas command // started. close(blockSnapshotsCh) - if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) { + if err := g.Wait(); !testutils.IsError(err, `descriptor changed|raft group deleted`) { t.Fatalf(`expected "descriptor changed" error got: %+v`, err) } desc = tc.LookupRangeOrFatal(t, scratchStartKey) diff --git a/pkg/storage/replica_proposal_buf.go b/pkg/storage/replica_proposal_buf.go index c60a48d35816..5235d5b0e5ca 100644 --- a/pkg/storage/replica_proposal_buf.go +++ b/pkg/storage/replica_proposal_buf.go @@ -618,12 +618,16 @@ func (rp *replicaProposer) enqueueUpdateCheck() { func (rp *replicaProposer) withGroupLocked(fn func(*raft.RawNode) error) error { // Pass true for mayCampaignOnWake because we're about to propose a command. - return (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err := (*Replica)(rp).withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { // We're proposing a command here so there is no need to wake the leader // if we were quiesced. However, we should make sure we are unquiesced. (*Replica)(rp).unquiesceLocked() return false /* unquiesceLocked */, fn(raftGroup) }) + if isRemoved { + err = rp.mu.destroyStatus.err + } + return err } func (rp *replicaProposer) registerProposalLocked(p *ProposalData) { diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 52b577da569c..aa68401ea88a 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -382,7 +382,7 @@ func (r *Replica) hasPendingProposalsRLocked() bool { // stepRaftGroup calls Step on the replica's RawNode with the provided request's // message. Before doing so, it assures that the replica is unquiesced and ready // to handle the request. -func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error { +func (r *Replica) stepRaftGroup(req *RaftMessageRequest) (isRemoved bool, _ error) { // We're processing an incoming raft message (from a batch that may // include MsgVotes), so don't campaign if we wake up our raft // group. @@ -445,14 +445,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( var hasReady bool var rd raft.Ready r.mu.Lock() - lastIndex := r.mu.lastIndex // used for append below lastTerm := r.mu.lastTerm raftLogSize := r.mu.raftLogSize leaderID := r.mu.leaderID lastLeaderID := leaderID - - err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { if err := r.mu.proposalBuf.FlushLockedWithRaftGroup(raftGroup); err != nil { return false, err } @@ -466,6 +464,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "while checking raft group for Ready" return stats, expl, errors.Wrap(err, expl) } + // If we've been removed then return. + if isRemoved { + return stats, "", nil + } if !hasReady { // We must update the proposal quota even if we don't have a ready. @@ -723,7 +725,6 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Might have gone negative if node was recently restarted. raftLogSize = 0 } - } // Update protected state - last index, last term, raft log size, and raft @@ -756,10 +757,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( applicationStart := timeutil.Now() if len(rd.CommittedEntries) > 0 { - if err := appTask.ApplyCommittedEntries(ctx); err != nil { + err := appTask.ApplyCommittedEntries(ctx) + stats.applyCommittedEntriesStats = sm.moveStats() + switch err { + case nil: + case apply.ErrRemoved: + // We know that our replica has been removed. We also know that we've + // stepped the state machine up to the point where it's been removed. + // TODO(ajwerner): decide if there's more to be done here. + return stats, "", err + default: return stats, err.(*nonDeterministicFailure).safeExpl, err } - stats.applyCommittedEntriesStats = sm.moveStats() // etcd raft occasionally adds a nil entry (our own commands are never // empty). This happens in two situations: When a new leader is elected, and @@ -789,11 +798,12 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Unlock() } - // TODO(bdarnell): need to check replica id and not Advance if it - // has changed. Or do we need more locking to guarantee that replica - // ID cannot change during handleRaftReady? + // NB: if we just processed a command which removed this replica from the + // raft group we will early return before this point. This, combined with + // the fact that we'll refuse to process messages intended for a higher + // replica ID ensures that our replica ID could not have changed. const expl = "during advance" - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + isRemoved, err = r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.Advance(rd) // If the Raft group still has more to process then we immediately @@ -804,9 +814,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.enqueueRaftUpdateCheck(r.RangeID) } return true, nil - }); err != nil { + }) + if err != nil { return stats, expl, errors.Wrap(err, expl) } + // If we removed ourselves during this call we would have early returned + // above with the apply.ErrRemoved. + if isRemoved { + return stats, expl, errors.Errorf("replica unexpectedly removed") + } // NB: All early returns other than the one due to not having a ready // which also makes the below call are due to fatal errors. @@ -1153,11 +1169,11 @@ func (r *Replica) sendRaftMessage(ctx context.Context, msg raftpb.Message) { Message: msg, RangeStartKey: startKey, // usually nil }) { - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + if isRemoved, err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { r.mu.droppedMessages++ raftGroup.ReportUnreachable(msg.To) return true, nil - }); err != nil { + }); !isRemoved && err != nil { log.Fatal(ctx, err) } } @@ -1197,10 +1213,10 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID snapStatus = raft.SnapshotFailure } - if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { + if isRemoved, err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { raftGroup.ReportSnapshot(uint64(to), snapStatus) return true, nil - }); err != nil { + }); !isRemoved && err != nil { log.Fatal(ctx, err) } } @@ -1322,19 +1338,22 @@ func (s pendingCmdSlice) Less(i, j int) bool { // varies. // // Requires that Replica.mu is held. +// +// If this Replica is in the process of being removed this method will return +// true and a nil error. func (r *Replica) withRaftGroupLocked( mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { - if r.mu.destroyStatus.Removed() { +) (isRemoved bool, _ error) { + if r.mu.destroyStatus.RemovalPending() { // Silently ignore all operations on destroyed replicas. We can't return an // error here as all errors returned from this method are considered fatal. - return nil + return true, nil } if r.mu.replicaID == 0 { // The replica's raft group has not yet been configured (i.e. the replica // was created from a preemptive snapshot). - return nil + return false, nil } if r.mu.internalRaftGroup == nil { @@ -1347,7 +1366,7 @@ func (r *Replica) withRaftGroupLocked( &raftLogger{ctx: ctx}, )) if err != nil { - return err + return false, err } r.mu.internalRaftGroup = raftGroup @@ -1364,7 +1383,7 @@ func (r *Replica) withRaftGroupLocked( if unquiesce { r.unquiesceAndWakeLeaderLocked() } - return err + return false, err } // withRaftGroup calls the supplied function with the (lazily initialized) @@ -1378,9 +1397,12 @@ func (r *Replica) withRaftGroupLocked( // should not initiate an election while handling incoming raft // messages (which may include MsgVotes from an election in progress, // and this election would be disrupted if we started our own). +// +// If this Replica is in the process of being removed this method will return +// true and a nil error. func (r *Replica) withRaftGroup( mayCampaignOnWake bool, f func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error), -) error { +) (isRemoved bool, _ error) { r.mu.Lock() defer r.mu.Unlock() return r.withRaftGroupLocked(mayCampaignOnWake, f) diff --git a/pkg/storage/replica_raftstorage.go b/pkg/storage/replica_raftstorage.go index 7132eae707e0..aa0e62b93de2 100644 --- a/pkg/storage/replica_raftstorage.go +++ b/pkg/storage/replica_raftstorage.go @@ -678,6 +678,14 @@ func (r *Replica) updateRangeInfo(desc *roachpb.RangeDescriptor) error { return nil } +type clearRangeOption int + +const ( + clearRangeIDLocalOnly clearRangeOption = iota + clearReplicatedOnly + clearAll +) + // clearRangeData clears the data associated with a range descriptor. If // rangeIDLocalOnly is true, then only the range-id local keys are deleted. // Otherwise, the range-id local keys, range local keys, and user keys are all @@ -689,16 +697,20 @@ func clearRangeData( desc *roachpb.RangeDescriptor, eng engine.Reader, writer engine.Writer, - rangeIDLocalOnly bool, + opt clearRangeOption, mustClearRange bool, ) error { var keyRanges []rditer.KeyRange - if rangeIDLocalOnly { - keyRanges = []rditer.KeyRange{rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false)} - } else { + switch opt { + case clearRangeIDLocalOnly: + keyRanges = []rditer.KeyRange{ + rditer.MakeRangeIDLocalKeyRange(desc.RangeID, false), + } + case clearReplicatedOnly: + keyRanges = rditer.MakeReplicatedKeyRanges(desc) + case clearAll: keyRanges = rditer.MakeAllKeyRanges(desc) } - var clearRangeFn func(engine.Reader, engine.Writer, engine.MVCCKey, engine.MVCCKey) error if mustClearRange { clearRangeFn = func(eng engine.Reader, writer engine.Writer, start, end engine.MVCCKey) error { @@ -1013,7 +1025,7 @@ func (r *Replica) clearSubsumedReplicaDiskData( r.store.Engine(), &subsumedReplSST, subsumedNextReplicaID, - true, /* rangeIDLocalOnly */ + clearRangeIDLocalOnly, true, /* mustClearRange */ ); err != nil { subsumedReplSST.Close() diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 05490d143468..bcbf3a983dea 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7278,117 +7278,6 @@ func TestSyncSnapshot(t *testing.T) { } } -// TestReplicaIDChangePending verifies that on a replica ID change, pending -// commands are re-proposed on the new raft group. -func TestReplicaIDChangePending(t *testing.T) { - defer leaktest.AfterTest(t)() - - tc := testContext{} - cfg := TestStoreConfig(nil) - // Disable ticks to avoid automatic reproposals after a timeout, which - // would pass this test. - cfg.RaftTickInterval = math.MaxInt32 - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.StartWithStoreConfig(t, stopper, cfg) - repl := tc.repl - - // Stop the command from being proposed to the raft group and being removed. - proposedOnOld := make(chan struct{}, 1) - repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(*ProposalData) (drop bool, _ error) { - select { - case proposedOnOld <- struct{}{}: - default: - } - return true, nil - } - lease := *repl.mu.state.Lease - repl.mu.Unlock() - - // Add a command to the pending list and wait for it to be proposed. - magicTS := tc.Clock().Now() - ba := roachpb.BatchRequest{} - ba.Timestamp = magicTS - ba.Add(&roachpb.PutRequest{ - RequestHeader: roachpb.RequestHeader{ - Key: roachpb.Key("a"), - }, - Value: roachpb.MakeValueFromBytes([]byte("val")), - }) - _, _, _, err := repl.evalAndPropose(context.Background(), lease, &ba, &allSpans, endCmds{}) - if err != nil { - t.Fatal(err) - } - <-proposedOnOld - - // Set the raft command handler so we can tell if the command has been - // re-proposed. - proposedOnNew := make(chan struct{}, 1) - repl.mu.Lock() - repl.mu.proposalBuf.testing.submitProposalFilter = func(p *ProposalData) (drop bool, _ error) { - if p.Request.Timestamp == magicTS { - select { - case proposedOnNew <- struct{}{}: - default: - } - } - return false, nil - } - repl.mu.Unlock() - - // Set the ReplicaID on the replica. - if err := repl.setReplicaID(2); err != nil { - t.Fatal(err) - } - - <-proposedOnNew -} - -func TestSetReplicaID(t *testing.T) { - defer leaktest.AfterTest(t)() - - tsc := TestStoreConfig(nil) - tc := testContext{} - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - tc.StartWithStoreConfig(t, stopper, tsc) - - repl := tc.repl - - testCases := []struct { - replicaID roachpb.ReplicaID - minReplicaID roachpb.ReplicaID - newReplicaID roachpb.ReplicaID - expectedMinReplicaID roachpb.ReplicaID - expectedErr string - }{ - {0, 0, 1, 2, ""}, - {0, 1, 1, 2, ""}, - {0, 2, 1, 2, "raft group deleted"}, - {1, 2, 1, 2, ""}, // not an error; replicaID == newReplicaID is checked first - {2, 0, 1, 0, "replicaID cannot move backwards"}, - } - for _, c := range testCases { - t.Run("", func(t *testing.T) { - repl.mu.Lock() - repl.mu.replicaID = c.replicaID - repl.mu.minReplicaID = c.minReplicaID - repl.mu.Unlock() - - err := repl.setReplicaID(c.newReplicaID) - repl.mu.Lock() - if repl.mu.minReplicaID != c.expectedMinReplicaID { - t.Errorf("expected minReplicaID=%d, but found %d", c.expectedMinReplicaID, repl.mu.minReplicaID) - } - repl.mu.Unlock() - if !testutils.IsError(err, c.expectedErr) { - t.Fatalf("expected %q, but found %v", c.expectedErr, err) - } - }) - } -} - func TestReplicaRetryRaftProposal(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/storage/split_delay_helper.go b/pkg/storage/split_delay_helper.go index 7f864f1f77b4..f91b4057e8ef 100644 --- a/pkg/storage/split_delay_helper.go +++ b/pkg/storage/split_delay_helper.go @@ -46,7 +46,7 @@ func (sdh *splitDelayHelper) RaftStatus(ctx context.Context) (roachpb.RangeID, * func (sdh *splitDelayHelper) ProposeEmptyCommand(ctx context.Context) { r := (*Replica)(sdh) r.raftMu.Lock() - _ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) { + _, _ = r.withRaftGroup(true /* campaignOnWake */, func(rawNode *raft.RawNode) (bool, error) { // NB: intentionally ignore the error (which can be ErrProposalDropped // when there's an SST inflight). data := encodeRaftCommand(raftVersionStandard, makeIDKey(), nil) diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 99fa19230a5a..068270f9bcc6 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/storage/apply" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/closedts/container" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" @@ -1142,7 +1143,7 @@ func (s *Store) IsStarted() bool { return atomic.LoadInt32(&s.started) == 1 } -// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing ( such as +// IterateIDPrefixKeys helps visit system keys that use RangeID prefixing (such as // RaftHardStateKey, RaftTombstoneKey, and many others). Such keys could in principle exist at any // RangeID, and this helper efficiently discovers all the keys of the desired type (as specified by // the supplied `keyFn`) and, for each key-value pair discovered, unmarshals it into `msg` and then @@ -2117,38 +2118,20 @@ func splitPostApply( rightRng.mu.Lock() // The right hand side of the split may have been removed and re-added // in the meantime, and the replicaID in RightDesc may be stale. - // Consequently the call below may fail with a RaftGroupDeletedError. In - // general, this protects earlier incarnations of the replica that were + // In general, this protects earlier incarnations of the replica that were // since replicaGC'ed from reneging on promises made earlier (for // example, once the HardState is removed, a replica could cast a // different vote for the same term). // - // It is safe to circumvent that in this case because the RHS must have - // remained uninitialized (the LHS blocks all user data, and since our - // Raft logs start at a nonzero index a snapshot must go through before - // any log entries are appended). This means the data in that range is - // just a HardState which we "logically" snapshot by assigning it data - // formerly located within the LHS. + // The RHS must have remained uninitialized (the LHS blocks all user data, + // and since our Raft logs start at a nonzero index a snapshot must go + // through before any log entries are appended). This means the data in + // that range is just a HardState which we "logically" snapshot by + // assigning it data formerly located within the LHS. // - // Note that if we ever have a way to replicaGC uninitialized replicas, - // the RHS may have been gc'ed and so the HardState would be gone. In - // that case, the requirement that the HardState remains would have been - // violated if the bypass below were used, which is why we place an - // assertion. - // - // See: - // https://github.com/cockroachdb/cockroach/issues/21146#issuecomment-365757329 - // - // TODO(tbg): this argument is flawed - it's possible for a tombstone - // to exist on the RHS: - // https://github.com/cockroachdb/cockroach/issues/40470 - // Morally speaking, this means that we should throw away the data we - // moved from the LHS to the RHS (depending on the tombstone). - // Realistically speaking it will probably be easier to create the RHS - // anyway, even though there's a tombstone and it may just get gc'ed - // again. Note that for extra flavor, we may not even know whether the - // RHS is currently supposed to exist or not, lending more weight to the - // second approach. + // We detect this case and pass the old range descriptor for the RHS + // to SpitRange below which will clear the RHS data rather than installing + // the RHS in the store. tombstoneKey := keys.RaftTombstoneKey(rightRng.RangeID) var tombstone roachpb.RaftTombstone if ok, err := engine.MVCCGetProto( @@ -2156,49 +2139,56 @@ func splitPostApply( ); err != nil { log.Fatalf(ctx, "unable to load tombstone for RHS: %+v", err) } else if ok { - log.Fatalf(ctx, "split trigger found right-hand side with tombstone %+v: %v", tombstone, rightRng) + log.Warningf(ctx, "split trigger found right-hand side with tombstone %+v, "+ + "RHS must have been removed at this ID: %v", tombstone, rightRng) } rightDesc, ok := split.RightDesc.GetReplicaDescriptor(r.StoreID()) if !ok { - // This is yet another special quirky case. The local store is not - // necessarily a member of the split; this can occur if this store - // wasn't a member at the time of the split, but is nevertheless - // catching up across the split. For example, add a learner, and - // while it is being caught up via a snapshot, remove the learner - // again, then execute a split, and re-add it. Upon being re-added - // the learner will likely catch up from where the snapshot left it, - // and it will see itself get removed, then we hit this branch when - // the split trigger is applied, and eventually there's a - // ChangeReplicas that re-adds the local store under a new - // replicaID. - // - // So our trigger will have a smaller replicaID for our RHS, which - // will blow up in initRaftMuLockedReplicaMuLocked. We still want - // to force the RHS to accept the descriptor, even though that - // rewinds the replicaID. To do that we want to change the existing - // replicaID, but we didn't find one -- zero is then the only reasonable - // choice. Note that this is also the replicaID a replica that is - // not reflected in its own descriptor will have, i.e. we're cooking - // up less of a special case here than you'd expect at first glance. - // - // Note that futzing with the replicaID is a high-risk operation as - // it is what the raft peer will believe itself to be identified by. - // Under no circumstances must we use a replicaID that belongs to - // someone else, or a byzantine situation will result. Zero is - // special-cased and will never init a raft group until the real - // ID is known from inbound raft traffic. - rightDesc.ReplicaID = 0 // for clarity only; it's already zero + // The only time that this case can happen is if we are currently not + // a part of this range (i.e. catching up from a preemptive snapshot). + // In this case it's fine and the logic below is reasonable. + _, lhsExists := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()) + if lhsExists { + log.Fatalf(ctx, "we can't have had the local store change replica ID: %d %v", r.mu.replicaID, rightRng) + } } + // If we're in this case then we know that the RHS has since been removed + // and re-added with a higher replica ID. We know we've never processed a + // snapshot for the right range because up to this point it would overlap + // with the left and ranges cannot move rightwards. Furthermore if it didn't + // overlap because the LHS used to be smaller because it still hasn't + // processed a merge then there's no way we could possibly be processing + // this split. + // + // We might however have already voted at a higher term. In general + // this shouldn't happen because we add learners and then promote them + // only after we snapshot but for the sake of mixed version clusters and + // other potential behavior changes we pass the desc for the known to be + // old RHS and have SplitRange clear its data. This call to SplitRange + // short circuits the rest of this call which would set up the RHS. if rightRng.mu.replicaID > rightDesc.ReplicaID { - rightRng.mu.replicaID = rightDesc.ReplicaID + rightRng.mu.Unlock() + err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc, &split.RightDesc) + if err != nil { + log.Fatal(ctx, err) + } + return + } + // If we are uninitialized and have a zero-value replica ID then we're safe + // to ignore this tombstone because we couldn't have ever promised anything + // as the newer replica. This is true because we would have needed a + // snapshot we couldn't possibly have received. If the tombsone does not + // exceed the rightDesc.ReplicaID then we have no need to reset + // minReplicaID. This case can occur if we created the RHS due to raft + // messages before the LHS acquired the split lock and then the RHS + // discovered that it was removed and re-added at a higher replica ID which + // lead to a tombstone being laid down but then the node forgot about the + // higher replica ID. + if tombstone.NextReplicaID > rightDesc.ReplicaID { + rightRng.mu.minReplicaID = 0 } - // NB: the safety argument above implies that we don't have to worry - // about restoring the existing minReplicaID if it's nonzero. No - // promises have been made, so none need to be kept. So we clear this - // unconditionally, making sure that it doesn't block us from init'ing - // the RHS. - rightRng.mu.minReplicaID = 0 err := rightRng.initRaftMuLockedReplicaMuLocked(&split.RightDesc, r.store.Clock(), 0) + rightRng.mu.Unlock() if err != nil { log.Fatal(ctx, err) @@ -2237,10 +2227,17 @@ func splitPostApply( // until it receives a Raft message addressed to the right-hand range. But // since new replicas start out quiesced, unless we explicitly awaken the // Raft group, there might not be any Raft traffic for quite a while. - if err := rightRng.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + rightRngIsRemoved, err := rightRng.withRaftGroup(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { return true, nil - }); err != nil { + }) + if err != nil { log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err) + } else if rightRngIsRemoved { + // This case should not be possible because the destroyStatus for a replica + // can only change while holding the raftMu and in order to have reached + // this point we know that we must be holding the raftMu for the RHS + // because we acquired it beneath raft. + log.Fatalf(ctx, "unable to create raft group for right-hand range in split: range is removed") } // Invoke the leasePostApply method to ensure we properly initialize @@ -2251,7 +2248,7 @@ func splitPostApply( // Add the RHS replica to the store. This step atomically updates // the EndKey of the LHS replica and also adds the RHS replica // to the store's replica map. - if err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc); err != nil { + if err := r.store.SplitRange(ctx, r, rightRng, split.LeftDesc, nil /* oldRightDesc */); err != nil { // Our in-memory state has diverged from the on-disk state. log.Fatalf(ctx, "%s: failed to update Store after split: %+v", r, err) } @@ -2289,11 +2286,23 @@ func splitPostApply( // // This is only called from the split trigger in the context of the execution // of a Raft command. +// +// The funky case is if rightDesc is non-nil. If so then we know that the right +// replica has been removed and re-added before applying this split and we need +// to leave it uninitialized. It's not possible for it to have become +// initialized as it would have needed a snapshot which must have overlapped the +// leftRepl. The user data in oldRightDesc will be cleared. func (s *Store) SplitRange( - ctx context.Context, leftRepl, rightRepl *Replica, newLeftDesc roachpb.RangeDescriptor, + ctx context.Context, + leftRepl, rightRepl *Replica, + newLeftDesc roachpb.RangeDescriptor, + oldRightDesc *roachpb.RangeDescriptor, ) error { oldLeftDesc := leftRepl.Desc() - rightDesc := rightRepl.Desc() + rightDesc := oldRightDesc + if rightDesc == nil { + rightDesc = rightRepl.Desc() + } if !bytes.Equal(oldLeftDesc.EndKey, rightDesc.EndKey) || bytes.Compare(oldLeftDesc.StartKey, rightDesc.StartKey) >= 0 { @@ -2308,11 +2317,18 @@ func (s *Store) SplitRange( if exRng != rightRepl { log.Fatalf(ctx, "found unexpected uninitialized replica: %s vs %s", exRng, rightRepl) } - // NB: We only remove from uninitReplicas and the replicaQueues maps here - // so that we don't leave open a window where a replica is temporarily not - // present in Store.mu.replicas. - delete(s.mu.uninitReplicas, rightDesc.RangeID) - s.replicaQueues.Delete(int64(rightDesc.RangeID)) + // If oldRightDesc is not nil then we know that exRng refers to a later + // replica in the RHS range and should remain uninitialized. + if oldRightDesc == nil { + // NB: We only remove from uninitReplicas and the replicaQueues maps here + // so that we don't leave open a window where a replica is temporarily not + // present in Store.mu.replicas. + delete(s.mu.uninitReplicas, rightDesc.RangeID) + s.replicaQueues.Delete(int64(rightDesc.RangeID)) + } + } else if oldRightDesc != nil { + log.Fatalf(ctx, "found initialized replica despite knowing that the post "+ + "split replica has been removed") } leftRepl.setDesc(ctx, &newLeftDesc) @@ -2336,22 +2352,40 @@ func (s *Store) SplitRange( // Clear the original range's request stats, since they include requests for // spans that are now owned by the new range. leftRepl.leaseholderStats.resetRequestCounts() - leftRepl.writeStats.splitRequestCounts(rightRepl.writeStats) + if oldRightDesc == nil { + leftRepl.writeStats.splitRequestCounts(rightRepl.writeStats) - if err := s.addReplicaInternalLocked(rightRepl); err != nil { - return errors.Errorf("unable to add replica %v: %s", rightRepl, err) - } + if err := s.addReplicaInternalLocked(rightRepl); err != nil { + return errors.Errorf("unable to add replica %v: %s", rightRepl, err) + } - // Update the replica's cached byte thresholds. This is a no-op if the system - // config is not available, in which case we rely on the next gossip update - // to perform the update. - if err := rightRepl.updateRangeInfo(rightRepl.Desc()); err != nil { - return err + // Update the replica's cached byte thresholds. This is a no-op if the system + // config is not available, in which case we rely on the next gossip update + // to perform the update. + if err := rightRepl.updateRangeInfo(rightRepl.Desc()); err != nil { + return err + } + // Add the range to metrics and maybe gossip on capacity change. + s.metrics.ReplicaCount.Inc(1) + s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) + } else { + if rightRepl.IsInitialized() { + log.Fatalf(ctx, "refusing to clear replicated data for an initialized range %v in SplitRange", rightRepl) + } + // We need to clear the data which the RHS would have inherited. + // The uninitialized RHS doesn't know about this data so we must clear it + // lest it be leaked potentially forever. + batch := rightRepl.Engine().NewWriteOnlyBatch() + defer batch.Close() + if err := clearRangeData(oldRightDesc, rightRepl.Engine(), batch, clearReplicatedOnly, false); err != nil { + log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) + } + if err := batch.Commit(true); err != nil { + log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) + } + return nil } - // Add the range to metrics and maybe gossip on capacity change. - s.metrics.ReplicaCount.Inc(1) - s.maybeGossipOnCapacityChange(ctx, rangeAddEvent) return nil } @@ -2580,7 +2614,12 @@ func (s *Store) removeReplicaImpl( // We check both rep.mu.ReplicaID and rep.mu.state.Desc's replica ID because // they can differ in cases when a replica's ID is increased due to an // incoming raft message (see #14231 for background). + // TODO(ajwerner): reconsider some of this sanity checking. rep.mu.Lock() + if rep.mu.destroyStatus.Removed() { + rep.mu.Unlock() + return nil + } replicaID := rep.mu.replicaID if rep.mu.replicaID >= nextReplicaID { rep.mu.Unlock() @@ -2600,12 +2639,7 @@ func (s *Store) removeReplicaImpl( } if !rep.IsInitialized() { - // The split trigger relies on the fact that it can bypass the tombstone - // check for the RHS, but this is only true as long as we never delete - // its HardState. - // - // See the comment in splitPostApply for details. - log.Fatalf(ctx, "can not replicaGC uninitialized replicas") + log.Fatalf(ctx, "can not replicaGC uninitialized replicas in this method") } // During merges, the context might have the subsuming range, so we explicitly @@ -2666,7 +2700,74 @@ func (s *Store) removeReplicaImpl( // TODO(peter): Could release s.mu.Lock() here. s.maybeGossipOnCapacityChange(ctx, rangeRemoveEvent) s.scanner.RemoveReplica(rep) + return nil +} + +func (s *Store) removeUninitializedReplicaRaftMuLocked( + ctx context.Context, rep *Replica, nextReplicaID roachpb.ReplicaID, +) error { + rep.raftMu.AssertHeld() + + // Sanity check this removal. + rep.mu.RLock() + ds := rep.mu.destroyStatus + isInitialized := rep.isInitializedRLocked() + rep.mu.RUnlock() + // Somebody already removed this Replica. + if ds.Removed() { + return nil + } + + if !ds.RemovalPending() { + log.Fatalf(ctx, "cannot remove uninitialized replica which is not removal pending: %v", ds) + } + + // When we're in this state we should have already had our destroy status set + // so it shouldn't have been possible to process any raft messages or apply + // any snapshots. + if isInitialized { + log.Fatalf(ctx, "previously uninitialized replica became initialized before removal") + } + + // Proceed with the removal. + rep.readOnlyCmdMu.Lock() + rep.mu.Lock() + rep.cancelPendingCommandsLocked() + rep.mu.internalRaftGroup = nil + rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.store.StoreID()), destroyReasonRemoved) + rep.mu.Unlock() + rep.readOnlyCmdMu.Unlock() + + if err := rep.destroyUninitializedReplicaRaftMuLocked(ctx, nextReplicaID); err != nil { + log.Fatalf(ctx, "failed to remove uninitialized replica %v: %v", rep, err) + } + + s.mu.Lock() + defer s.mu.Unlock() + // Sanity check, could be removed. + value, stillExists := s.mu.replicas.Load(int64(rep.RangeID)) + if !stillExists { + log.Fatalf(ctx, "uninitialized replica was removed in the meantime") + } + existing := (*Replica)(value) + if existing == rep { + log.Infof(ctx, "removing uninitialized replica %v", rep) + } else { + log.Fatalf(ctx, "uninitialized replica %v was already removed", rep) + } + + s.metrics.ReplicaCount.Dec(1) + + // Only an uninitialized replica can have a placeholder since, by + // definition, an initialized replica will be present in the + // replicasByKey map. While the replica will usually consume the + // placeholder itself, that isn't guaranteed and so this invocation + // here is crucial (i.e. don't remove it). + if s.removePlaceholderLocked(ctx, rep.RangeID) { + atomic.AddInt32(&s.counts.droppedPlaceholders, 1) + } + s.unlinkReplicaByRangeIDLocked(rep.RangeID) return nil } @@ -3428,7 +3529,11 @@ func (s *Store) processRaftRequestWithReplica( drop := maybeDropMsgApp(ctx, (*replicaMsgAppDropper)(r), &req.Message, req.RangeStartKey) if !drop { - if err := r.stepRaftGroup(req); err != nil { + isRemoved, err := r.stepRaftGroup(req) + if isRemoved { + _, err = r.IsDestroyed() + } + if err != nil { return roachpb.NewError(err) } } @@ -3501,10 +3606,13 @@ func (s *Store) processRaftSnapshotRequest( }() } - if err := r.stepRaftGroup(&snapHeader.RaftMessageRequest); err != nil { + isRemoved, err := r.stepRaftGroup(&snapHeader.RaftMessageRequest) + if isRemoved { + _, err = r.IsDestroyed() + } + if err != nil { return roachpb.NewError(err) } - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, inSnap); err != nil { fatalOnRaftReadyErr(ctx, expl, err) } @@ -3543,33 +3651,40 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons repl.raftMu.Lock() defer repl.raftMu.Unlock() repl.mu.Lock() - defer repl.mu.Unlock() // If the replica ID in the error does not match then we know // that the replica has been removed and re-added quickly. In // that case, we don't want to add it to the replicaGCQueue. - if tErr.ReplicaID != repl.mu.replicaID { - log.Infof(ctx, "replica too old response with old replica ID: %s", tErr.ReplicaID) + // If the replica is not alive then we also should ignore this error. + if tErr.ReplicaID != repl.mu.replicaID || !repl.mu.destroyStatus.IsAlive() { + repl.mu.Unlock() return nil } - // If the replica ID in the error does match, we know the replica - // will be removed and we can cancel any pending commands. This is - // sometimes necessary to unblock PushTxn operations that are - // necessary for the replica GC to succeed. - repl.cancelPendingCommandsLocked() - // The replica will be garbage collected soon (we are sure // since our replicaID is definitely too old), but in the meantime we // already want to bounce all traffic from it. Note that the replica - // could be re-added with a higher replicaID, in which this error is - // cleared in setReplicaIDRaftMuLockedMuLocked. - if repl.mu.destroyStatus.IsAlive() { - storeID := repl.store.StoreID() - repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID, storeID), destroyReasonRemovalPending) + // could be re-added with a higher replicaID, but we want to clear the + // replica's data before that happens. + if log.V(1) { + log.Infof(ctx, "setting local replica to destroyed due to ReplicaTooOld error") } - s.replicaGCQueue.AddAsync(ctx, repl, replicaGCPriorityRemoved) + storeID := repl.store.StoreID() + // NB: We know that there's a later copy of this range on this store but + // to introduce another more specific error in this case is overkill so + // we return RangeNotFoundError which the recipient will properly + // ignore. + repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID, storeID), + destroyReasonRemovalPending) + repl.mu.Unlock() + nextReplicaID := tErr.ReplicaID + 1 + if !repl.IsInitialized() { + return s.removeUninitializedReplicaRaftMuLocked(ctx, repl, nextReplicaID) + } + return s.removeReplicaImpl(ctx, repl, nextReplicaID, RemoveOptions{ + DestroyData: true, + }) case *roachpb.RaftGroupDeletedError: if replErr != nil { // RangeNotFoundErrors are expected here; nothing else is. @@ -3632,7 +3747,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // giving up the lock. Set lastRepl to nil, so we don't handle it // down below as well. lastRepl = nil - if _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, noSnap); err != nil { + switch _, expl, err := r.handleRaftReadyRaftMuLocked(ctx, noSnap); err { + case nil: + case apply.ErrRemoved: + // return hopefully never to be heard from again. + default: fatalOnRaftReadyErr(ctx, expl, err) } } @@ -3677,7 +3796,11 @@ func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID // handleRaftReadyRaftMuLocked) since racing to handle Raft Ready won't // have any undesirable results. ctx = lastRepl.AnnotateCtx(ctx) - if _, expl, err := lastRepl.handleRaftReady(ctx, noSnap); err != nil { + switch _, expl, err := lastRepl.handleRaftReady(ctx, noSnap); err { + case nil: + case apply.ErrRemoved: + // return hopefully never to be heard from again. + default: fatalOnRaftReadyErr(ctx, expl, err) } } @@ -3693,8 +3816,12 @@ func (s *Store) processReady(ctx context.Context, rangeID roachpb.RangeID) { ctx = r.AnnotateCtx(ctx) start := timeutil.Now() stats, expl, err := r.handleRaftReady(ctx, noSnap) - if err != nil { - log.Fatalf(ctx, "%s: %+v", log.Safe(expl), err) // TODO(bdarnell) + switch err { + case nil: + case apply.ErrRemoved: + return + default: + fatalOnRaftReadyErr(ctx, expl, err) } elapsed := timeutil.Since(start) s.metrics.RaftWorkingDurationNanos.Inc(elapsed.Nanoseconds()) @@ -3937,7 +4064,12 @@ func (s *Store) getOrCreateReplica( replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, ) (_ *Replica, created bool, _ error) { + r := retry.Start(retry.Options{ + InitialBackoff: time.Microsecond, + MaxBackoff: 10 * time.Millisecond, + }) for { + r.Next() r, created, err := s.tryGetOrCreateReplica( ctx, rangeID, @@ -3965,33 +4097,77 @@ func (s *Store) tryGetOrCreateReplica( rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, creatingReplica *roachpb.ReplicaDescriptor, -) (_ *Replica, created bool, _ error) { - // The common case: look up an existing (initialized) replica. - if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { - repl := (*Replica)(value) - repl.raftMu.Lock() // not unlocked - repl.mu.Lock() - defer repl.mu.Unlock() - - var replTooOldErr error - if creatingReplica != nil { +) (_ *Replica, created bool, err error) { + var ( + handleFromReplicaTooOld = func(repl *Replica) error { + if creatingReplica == nil { + return nil + } // Drop messages that come from a node that we believe was once a member of // the group but has been removed. desc := repl.mu.state.Desc _, found := desc.GetReplicaDescriptorByID(creatingReplica.ReplicaID) // It's not a current member of the group. Is it from the past? if !found && creatingReplica.ReplicaID < desc.NextReplicaID { - replTooOldErr = roachpb.NewReplicaTooOldError(creatingReplica.ReplicaID) + return roachpb.NewReplicaTooOldError(creatingReplica.ReplicaID) } + return nil + } + handleToReplicaTooOld = func(repl *Replica) error { + if replicaID == 0 || repl.mu.replicaID == 0 || repl.mu.replicaID >= replicaID { + return nil + } + if log.V(1) { + log.Infof(ctx, "found message for newer replica ID: %v %v %v %v %v", repl.mu.replicaID, replicaID, repl.mu.minReplicaID, repl.mu.state.Desc, &repl.mu.destroyStatus) + } + repl.mu.destroyStatus.Set(err, destroyReasonRemovalPending) + isInitialized := repl.isInitializedRLocked() + repl.mu.Unlock() + defer repl.mu.Lock() + if !isInitialized { + if err := s.removeUninitializedReplicaRaftMuLocked(ctx, repl, replicaID); err != nil { + log.Fatalf(ctx, "failed to remove uninitialized replica: %v", err) + } + } else { + if err := s.removeReplicaImpl(ctx, repl, replicaID, RemoveOptions{ + DestroyData: true, + }); err != nil { + log.Fatal(ctx, err) + } + } + return errRetry + } + ) + // The common case: look up an existing (initialized) replica. + if value, ok := s.mu.replicas.Load(int64(rangeID)); ok { + repl := (*Replica)(value) + repl.raftMu.Lock() // not unlocked on success + repl.mu.Lock() + defer repl.mu.Unlock() + if err := handleFromReplicaTooOld(repl); err != nil { + repl.raftMu.Unlock() + return nil, false, err + } + if repl.mu.destroyStatus.RemovalPending() { + repl.raftMu.Unlock() + return nil, false, errRetry + } + if err := handleToReplicaTooOld(repl); err != nil { + repl.raftMu.Unlock() + return nil, false, err } var err error - if replTooOldErr != nil { - err = replTooOldErr - } else if ds := repl.mu.destroyStatus; ds.reason == destroyReasonRemoved { - err = errRetry - } else { - err = repl.setReplicaIDRaftMuLockedMuLocked(replicaID) + if repl.mu.replicaID == 0 { + // This message is telling us about our replica ID. + // This is a common case when dealing with preemptive snapshots. + err = repl.setReplicaIDRaftMuLockedMuLocked(repl.AnnotateCtx(ctx), replicaID) + } else if replicaID != 0 && repl.mu.replicaID > replicaID { + // TODO(ajwerner): probably just silently drop this message. + err = roachpb.NewRangeNotFoundError(rangeID, s.StoreID()) + } else if replicaID != 0 && repl.mu.replicaID != replicaID { + log.Fatalf(ctx, "we should never update the replica id based on a message %v %v", + repl.mu.replicaID, replicaID) } if err != nil { repl.raftMu.Unlock() diff --git a/pkg/storage/store_snapshot.go b/pkg/storage/store_snapshot.go index 3fb7ff8b74d5..2674ed25f2e9 100644 --- a/pkg/storage/store_snapshot.go +++ b/pkg/storage/store_snapshot.go @@ -628,7 +628,9 @@ func (s *Store) canApplySnapshotLocked( existingRepl.raftMu.AssertHeld() existingRepl.mu.RLock() - existingIsInitialized := existingRepl.isInitializedRLocked() + existingDesc := existingRepl.mu.state.Desc + existingIsInitialized := existingDesc.IsInitialized() + existingDestroyStatus := existingRepl.mu.destroyStatus existingRepl.mu.RUnlock() if existingIsInitialized { @@ -637,15 +639,19 @@ func (s *Store) canApplySnapshotLocked( // in Replica.maybeAcquireSnapshotMergeLock for how this is // made safe. // - // NB: we expect the replica to know its replicaID at this point - // (i.e. !existingIsPreemptive), though perhaps it's possible - // that this isn't true if the leader initiates a Raft snapshot - // (that would provide a range descriptor with this replica in - // it) but this node reboots (temporarily forgetting its - // replicaID) before the snapshot arrives. + // NB: The snapshot must be intended for this replica as + // withReplicaForRequest ensures that requests with a non-zero replica + // id are passed to a replica with a matching id. Given this is not a + // preemptive snapshot we know that its id must be non-zero. return nil, nil } + // If we are not alive then we should not apply a snapshot as our removal + // is imminent. + if existingDestroyStatus.RemovalPending() { + return nil, existingDestroyStatus.err + } + // We have a key range [desc.StartKey,desc.EndKey) which we want to apply a // snapshot for. Is there a conflicting existing placeholder or an // overlapping range? @@ -670,7 +676,7 @@ func (s *Store) checkSnapshotOverlapLocked( // NB: this check seems redundant since placeholders are also represented in // replicasByKey (and thus returned in getOverlappingKeyRangeLocked). if exRng, ok := s.mu.replicaPlaceholders[desc.RangeID]; ok { - return errors.Errorf("%s: canApplySnapshotLocked: cannot add placeholder, have an existing placeholder %s", s, exRng) + return errors.Errorf("%s: canApplySnapshotLocked: cannot add placeholder, have an existing placeholder %s %v", s, exRng, snapHeader.RaftMessageRequest.FromReplica) } // TODO(benesch): consider discovering and GC'ing *all* overlapping ranges, @@ -732,47 +738,20 @@ func (s *Store) checkSnapshotOverlapLocked( // snapshot. func (s *Store) shouldAcceptSnapshotData( ctx context.Context, snapHeader *SnapshotRequest_Header, -) error { +) (err error) { if snapHeader.IsPreemptive() { return crdberrors.AssertionFailedf(`expected a raft or learner snapshot`) } - - s.mu.Lock() - defer s.mu.Unlock() - - // TODO(tbg): see the comment on desc.Generation for what seems to be a much - // saner way to handle overlap via generational semantics. - desc := *snapHeader.State.Desc - - // First, check for an existing Replica. - if v, ok := s.mu.replicas.Load( - int64(desc.RangeID), - ); ok { - existingRepl := (*Replica)(v) - existingRepl.mu.RLock() - existingIsInitialized := existingRepl.isInitializedRLocked() - existingRepl.mu.RUnlock() - - if existingIsInitialized { - // Regular Raft snapshots can't be refused at this point, - // even if they widen the existing replica. See the comments - // in Replica.maybeAcquireSnapshotMergeLock for how this is - // made safe. - // - // NB: we expect the replica to know its replicaID at this point - // (i.e. !existingIsPreemptive), though perhaps it's possible - // that this isn't true if the leader initiates a Raft snapshot - // (that would provide a range descriptor with this replica in - // it) but this node reboots (temporarily forgetting its - // replicaID) before the snapshot arrives. + pErr := s.withReplicaForRequest(ctx, &snapHeader.RaftMessageRequest, + func(ctx context.Context, r *Replica) *roachpb.Error { + if !r.IsInitialized() { + s.mu.Lock() + defer s.mu.Unlock() + return roachpb.NewError(s.checkSnapshotOverlapLocked(ctx, snapHeader)) + } return nil - } - } - - // We have a key range [desc.StartKey,desc.EndKey) which we want to apply a - // snapshot for. Is there a conflicting existing placeholder or an - // overlapping range? - return s.checkSnapshotOverlapLocked(ctx, snapHeader) + }) + return pErr.GoError() } // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 8829fd7a15cb..dc57a80cba01 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -569,8 +569,8 @@ func TestStoreAddRemoveRanges(t *testing.T) { // Try to remove range 1 again. if err := store.RemoveReplica(context.Background(), repl1, repl1.Desc().NextReplicaID, RemoveOptions{ DestroyData: true, - }); err == nil { - t.Fatal("expected error re-removing same range") + }); err != nil { + t.Fatalf("didn't expect error re-removing same range: %v", err) } // Try to add a range with previously-used (but now removed) ID. repl2Dup := createReplica(store, 1, roachpb.RKey("a"), roachpb.RKey("b")) @@ -712,11 +712,11 @@ func TestStoreRemoveReplicaDestroy(t *testing.T) { // Verify that removal of a replica marks it as destroyed so that future raft // commands on the Replica will silently be dropped. - if err := repl1.withRaftGroup(true, func(r *raft.RawNode) (bool, error) { + isRemoved, err := repl1.withRaftGroup(true, func(r *raft.RawNode) (bool, error) { return true, errors.Errorf("unexpectedly created a raft group") - }); err != nil { - t.Fatal(err) - } + }) + require.True(t, isRemoved) + require.NoError(t, err) repl1.mu.Lock() expErr := roachpb.NewError(repl1.mu.destroyStatus.err) @@ -1354,7 +1354,7 @@ func splitTestRange(store *Store, key, splitKey roachpb.RKey, t *testing.T) *Rep require.NoError(t, err) newLeftDesc := *repl.Desc() newLeftDesc.EndKey = splitKey - err = store.SplitRange(repl.AnnotateCtx(context.TODO()), repl, newRng, newLeftDesc) + err = store.SplitRange(repl.AnnotateCtx(context.TODO()), repl, newRng, newLeftDesc, nil) require.NoError(t, err) return newRng } @@ -2953,104 +2953,6 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { }) } -// Test that we set proper tombstones for removed replicas and use the -// tombstone to reject attempts to create a replica with a lesser ID. -func TestRemovedReplicaTombstone(t *testing.T) { - defer leaktest.AfterTest(t)() - - const rangeID = 1 - creatingReplica := roachpb.ReplicaDescriptor{ - NodeID: 2, - StoreID: 2, - ReplicaID: 2, - } - - // All test cases assume that the starting replica ID is 1. This assumption - // is enforced by a check within the test logic. - testCases := []struct { - setReplicaID roachpb.ReplicaID // set the existing replica to this before removing it - descNextReplicaID roachpb.ReplicaID // the descriptor's NextReplicaID during replica removal - createReplicaID roachpb.ReplicaID // try creating a replica at this ID - expectCreated bool - }{ - {1, 2, 2, true}, - {1, 2, 1, false}, - {1, 2, 1, false}, - {1, 3, 1, false}, - {1, 3, 2, false}, - {1, 3, 3, true}, - {1, 99, 98, false}, - {1, 99, 99, true}, - {2, 2, 2, false}, - {2, 2, 3, true}, - {2, 2, 99, true}, - {98, 2, 98, false}, - {98, 2, 99, true}, - } - for _, c := range testCases { - t.Run("", func(t *testing.T) { - tc := testContext{} - stopper := stop.NewStopper() - ctx := context.TODO() - defer stopper.Stop(ctx) - tc.Start(t, stopper) - s := tc.store - - repl1, err := s.GetReplica(rangeID) - if err != nil { - t.Fatal(err) - } - repl1.mu.Lock() - if repl1.mu.replicaID != 1 { - repl1.mu.Unlock() - t.Fatalf("test precondition not met; expected ReplicaID=1, got %d", repl1.mu.replicaID) - } - repl1.mu.Unlock() - - // Try to trigger a race where the replica ID gets increased during the GC - // process by taking the store lock and inserting a short sleep to cause - // the goroutine to start running the setReplicaID call. - errChan := make(chan error) - - func() { - repl1.raftMu.Lock() - defer repl1.raftMu.Unlock() - s.mu.Lock() - defer s.mu.Unlock() - repl1.mu.Lock() - defer repl1.mu.Unlock() - - go func() { - errChan <- s.RemoveReplica(ctx, repl1, c.descNextReplicaID, RemoveOptions{DestroyData: true}) - }() - - time.Sleep(1 * time.Millisecond) - - if err := repl1.setReplicaIDRaftMuLockedMuLocked(c.setReplicaID); err != nil { - t.Fatal(err) - } - }() - - if err := <-errChan; testutils.IsError(err, "replica ID has changed") { - // We didn't trigger the race, so just return success. - return - } else if err != nil { - t.Fatal(err) - } - - _, created, err := s.getOrCreateReplica(ctx, rangeID, c.createReplicaID, &creatingReplica) - if created != c.expectCreated { - t.Errorf("expected s.getOrCreateReplica(%d, %d, %v).created=%v, got %v", - rangeID, c.createReplicaID, creatingReplica, c.expectCreated, created) - } - if !c.expectCreated && !testutils.IsError(err, "raft group deleted") { - t.Errorf("expected s.getOrCreateReplica(%d, %d, %v).err='raft group deleted', got %v", - rangeID, c.createReplicaID, creatingReplica, err) - } - }) - } -} - type fakeSnapshotStream struct { nextResp *SnapshotResponse nextErr error diff --git a/pkg/storage/testing_knobs.go b/pkg/storage/testing_knobs.go index 7ee8dccf6953..91d5b48e8f9f 100644 --- a/pkg/storage/testing_knobs.go +++ b/pkg/storage/testing_knobs.go @@ -139,6 +139,12 @@ type StoreTestingKnobs struct { // DisableRefreshReasonTicks disables refreshing pending commands // periodically. DisableRefreshReasonTicks bool + // DisableEagerReplicaRemoval prevents the Replica from destroying itself + // when it encounters a ChangeReplicasTrigger which would remove it. + // This option can lead to nasty cases during shutdown where a replica will + // spin attempting to acquire a split or merge lock on a RHS which will + // always fail and is generally not safe but is useful for testing. + DisableEagerReplicaRemoval bool // RefreshReasonTicksPeriod overrides the default period over which // pending commands are refreshed. The period is specified as a multiple // of Raft group ticks.