diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 7d1adb9ec861..b95b513d50f0 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -87,10 +87,11 @@ var ( // LocalRangeAppliedStateSuffix is the suffix for the range applied state // key. LocalRangeAppliedStateSuffix = []byte("rask") - // LocalRaftTruncatedStateSuffix is the suffix for the + // This was previously used for the replicated RaftTruncatedState. It is no + // longer used and this key has been removed via a migration. See + // LocalRaftTruncatedStateSuffix for the corresponding unreplicated // RaftTruncatedState. - // Note: This suffix is also used for unreplicated Range-ID keys. - LocalRaftTruncatedStateSuffix = []byte("rftt") + _ = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") // LocalRangePriorReadSummarySuffix is the suffix for a range's prior read @@ -122,6 +123,13 @@ var ( localRaftLastIndexSuffix = []byte("rfti") // LocalRaftLogSuffix is the suffix for the raft log. LocalRaftLogSuffix = []byte("rftl") + // LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is + // written when a replica is created. + LocalRaftReplicaIDSuffix = []byte("rftr") + // LocalRaftTruncatedStateSuffix is the suffix for the unreplicated + // RaftTruncatedState. + LocalRaftTruncatedStateSuffix = []byte("rftt") + // LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last // replica GC timestamp (for GC of old replicas). LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt") diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 7a4248130ed7..47f5fc5dfc58 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -199,6 +199,7 @@ var _ = [...]interface{}{ RangeTombstoneKey, // "rftb" RaftHardStateKey, // "rfth" RaftLogKey, // "rftl" + RaftReplicaIDKey, // "rftr" RaftTruncatedStateKey, // "rftt" RangeLastReplicaGCTimestampKey, // "rlrt" diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index de6eae1dcbb3..8e768cc8eafa 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey() +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key { @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key { return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...) +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key { diff --git a/pkg/kv/kvserver/below_raft_protos_test.go b/pkg/kv/kvserver/below_raft_protos_test.go index ab2f072d9c26..f0c7884c8382 100644 --- a/pkg/kv/kvserver/below_raft_protos_test.go +++ b/pkg/kv/kvserver/below_raft_protos_test.go @@ -133,6 +133,13 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{ emptySum: 14695981039346656037, populatedSum: 1187861800212570275, }, + reflect.TypeOf(&roachpb.RaftReplicaID{}): { + populatedConstructor: func(r *rand.Rand) protoutil.Message { + return roachpb.NewPopulatedRaftReplicaID(r, false) + }, + emptySum: 598336668751268149, + populatedSum: 9313101058286450988, + }, } func TestBelowRaftProtos(t *testing.T) { diff --git a/pkg/kv/kvserver/client_store_test.go b/pkg/kv/kvserver/client_store_test.go index f47006c66943..6e997152004d 100644 --- a/pkg/kv/kvserver/client_store_test.go +++ b/pkg/kv/kvserver/client_store_test.go @@ -15,6 +15,8 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -23,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) // TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then @@ -70,3 +73,38 @@ func TestStoreSetRangesMaxBytes(t *testing.T) { return nil }) } + +// TestStoreRaftReplicaID tests that initialized replicas have a +// RaftReplicaID. +func TestStoreRaftReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + srv := tc.Server(0) + store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID()) + require.NoError(t, err) + + scratchKey := tc.ScratchRange(t) + desc, err := tc.LookupRange(scratchKey) + require.NoError(t, err) + repl, err := store.GetReplica(desc.RangeID) + require.NoError(t, err) + replicaID, found, err := stateloader.Make(desc.RangeID).LoadRaftReplicaID(ctx, store.Engine()) + require.True(t, found) + require.NoError(t, err) + require.Equal(t, repl.ReplicaID(), replicaID.ReplicaID) + + // RHS of a split also has ReplicaID. + splitKey := append(scratchKey, '0', '0') + _, rhsDesc, err := tc.SplitRange(splitKey) + require.NoError(t, err) + rhsRepl, err := store.GetReplica(rhsDesc.RangeID) + require.NoError(t, err) + rhsReplicaID, found, err := + stateloader.Make(rhsDesc.RangeID).LoadRaftReplicaID(ctx, store.Engine()) + require.True(t, found) + require.NoError(t, err) + require.Equal(t, rhsRepl.ReplicaID(), rhsReplicaID.ReplicaID) +} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e71f71ec83e1..4cade492df87 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -520,6 +520,8 @@ type Replica struct { // It will not change over the lifetime of this replica. If addressed under // a newer replicaID, the replica immediately replicaGCs itself to make // way for the newer incarnation. + // TODO(sumeer): since this is initialized in newUnloadedReplica and never + // changed, lift this out of the mu struct. replicaID roachpb.ReplicaID // The minimum allowed ID for this replica. Initialized from // RangeTombstone.NextReplicaID. @@ -710,9 +712,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) { r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get()) } -// ReplicaID returns the ID for the Replica. It may be zero if the replica does -// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +// ReplicaID returns the ID for the Replica. This value is fixed for the +// lifetime of the Replica. func (r *Replica) ReplicaID() roachpb.ReplicaID { + // The locking of mu is unnecessary. It will be removed when we lift + // replicaID out of the mu struct. r.mu.RLock() defer r.mu.RUnlock() return r.mu.replicaID diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..a864cb2f1d70 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -863,6 +863,12 @@ func (r *Replica) applySnapshot( if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := r.raftMu.stateLoader.SetRaftReplicaID( + ctx, &unreplicatedSST, r.mu.replicaID); err != nil { + return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..ebf49c47f52b 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -96,6 +96,7 @@ func WriteInitialRangeState( ctx context.Context, readWriter storage.ReadWriter, desc roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, replicaVersion roachpb.Version, ) error { initialLease := roachpb.Lease{} @@ -108,7 +109,13 @@ func WriteInitialRangeState( ); err != nil { return err } - if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil { + sl := Make(desc.RangeID) + if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil { + return err + } + // Maintain the invariant that any replica (uninitialized or initialized), + // with persistent state, has a RaftReplicaID. + if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil { return err } return nil diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..d6bef28ca3eb 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -434,3 +434,29 @@ func (rsl StateLoader) SynthesizeHardState( err := rsl.SetHardState(ctx, readWriter, newHS) return errors.Wrapf(err, "writing HardState %+v", &newHS) } + +// SetRaftReplicaID overwrites the RaftReplicaID. +func (rsl StateLoader) SetRaftReplicaID( + ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID, +) error { + rid := roachpb.RaftReplicaID{ReplicaID: replicaID} + // "Blind" because ms == nil and timestamp.IsEmpty(). + return storage.MVCCBlindPutProto( + ctx, + writer, + nil, /* ms */ + rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, /* timestamp */ + &rid, + nil, /* txn */ + ) +} + +// LoadRaftReplicaID loads the RaftReplicaID. +func (rsl StateLoader) LoadRaftReplicaID( + ctx context.Context, reader storage.Reader, +) (replicaID roachpb.RaftReplicaID, found bool, err error) { + found, err = storage.MVCCGetProto(ctx, reader, rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, &replicaID, storage.MVCCGetOptions{}) + return +} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 81f05d939837..6418941e88cd 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -220,6 +220,60 @@ func (s *Store) tryGetOrCreateReplica( } else if hs.Commit != 0 { log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica %s. HS=%+v", repl, hs) } + + // Write the RaftReplicaID for this replica. This is the only place in the + // CockroachDB code that we are creating a new *uninitialized* replica. + // Note that it is possible that we have already created the HardState for + // an uninitialized replica, then crashed, and on recovery are receiving a + // raft message for the same or later replica. + // - Same replica: we are overwriting the RaftReplicaID with the same + // value, which is harmless. + // - Later replica: there may be an existing HardState for the older + // uninitialized replica with Commit=0 and non-zero Term and Vote. Using + // the Term and Vote values for that older replica in the context of + // this newer replica is harmless since it just limits the votes for + // this replica. + // + // + // Compatibility: + // - v21.2 and v22.1: v22.1 unilaterally introduces RaftReplicaID (an + // unreplicated range-id local key). If a v22.1 binary is rolled back at + // a node, the fact that RaftReplicaID was written is harmless to a + // v21.2 node since it does not read it. When a v21.2 drops an + // initialized range, the RaftReplicaID will also be deleted because the + // whole range-ID local key space is deleted. + // + // - v22.2: we will start relying on the presence of RaftReplicaID, and + // remove any unitialized replicas that have a HardState but no + // RaftReplicaID. This removal will happen in ReplicasStorage.Init and + // allow us to tighten invariants. Additionally, knowing the ReplicaID + // for an unitialized range could allow a node to somehow contact the + // raft group (say by broadcasting to all nodes in the cluster), and if + // the ReplicaID is stale, would allow the node to remove the HardState + // and RaftReplicaID. See + // https://github.com/cockroachdb/cockroach/issues/75740. + // + // There is a concern that there could be some replica that survived + // from v21.2 to v22.1 to v22.2 in unitialized state and will be + // incorrectly removed in ReplicasStorage.Init causing the loss of the + // HardState.{Term,Vote} and lead to a "split-brain" wrt leader + // election. + // + // Even though this seems theoretically possible, it is considered + // practically impossible, and not just because a replica's vote is + // unlikely to stay relevant across 2 upgrades. For one, we're always + // going through learners and don't promote until caught up, so + // uninitialized replicas generally never get to vote. Second, even if + // their vote somehow mattered (perhaps we sent a learner a snap which + // was not durably persisted - which we also know is impossible, but + // let's assume it - and then promoted the node and it immediately + // power-cycled, losing the snapshot) the fire-and-forget way in which + // raft votes are requested (in the same raft cycle) makes it extremely + // unlikely that the restarted node would then receive it. + if err := repl.mu.stateLoader.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil { + return err + } + return repl.loadRaftMuLockedReplicaMuLocked(uninitializedDesc) }(); err != nil { // Mark the replica as destroyed and remove it from the replicas maps to diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 33aac9a89072..321c8fc21ac1 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -173,11 +173,12 @@ func WriteInitialClusterData( EndKey: endKey, NextReplicaID: 2, } + const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { NodeID: FirstNodeID, StoreID: FirstStoreID, - ReplicaID: 1, + ReplicaID: firstReplicaID, }, } desc.SetReplicas(roachpb.MakeReplicaSet(replicas)) @@ -244,7 +245,8 @@ func WriteInitialClusterData( } } - if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil { + if err := stateloader.WriteInitialRangeState( + ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil { return err } computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime) diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 2294ca8ecd90..45327e102047 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -42,7 +42,7 @@ func splitPreApply( // // The exception to that is if the DisableEagerReplicaRemoval testing flag is // enabled. - _, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) + rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) _, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID()) if !hasRightDesc || !hasLeftDesc { log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v", @@ -100,9 +100,20 @@ func splitPreApply( log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) } if rightRepl != nil { + // Cleared the HardState and RaftReplicaID, so rewrite them to the + // current values. + // TODO(sumeer): we know HardState.Commit cannot advance since the RHS + // cannot apply a snapshot yet. But there could be a concurrent change + // to HardState.{Term,Vote} that we would accidentally undo here, + // because we are not actually holding the appropriate mutex. See + // https://github.com/cockroachdb/cockroach/issues/75918. if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil { log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err) } + if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID( + ctx, readWriter, rightRepl.ReplicaID()); err != nil { + log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err) + } } return } @@ -114,7 +125,14 @@ func splitPreApply( if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil { log.Fatalf(ctx, "%v", err) } - + // Write the RaftReplicaID for the RHS to maintain the invariant that any + // replica (uninitialized or initialized), with persistent state, has a + // RaftReplicaID. NB: this invariant will not be universally true until we + // introduce node startup code that will write this value for existing + // ranges. + if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil { + log.Fatalf(ctx, "%v", err) + } // Persist the closed timestamp. // // In order to tolerate a nil initClosedTS input, let's forward to diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9f7d033cff05..03a4b1721772 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID} if err := stateloader.WriteInitialRangeState( - ctx, s.Engine(), uninitDesc, roachpb.Version{}, + ctx, s.Engine(), uninitDesc, 2, roachpb.Version{}, ); err != nil { t.Fatal(err) } @@ -3046,6 +3046,32 @@ func TestManuallyEnqueueUninitializedReplica(t *testing.T) { require.Contains(t, err.Error(), "not enqueueing uninitialized replica") } +// TestStoreGetOrCreateReplicaWritesRaftReplicaID tests that an uninitialized +// replica has a RaftReplicaID. +func TestStoreGetOrCreateReplicaWritesRaftReplicaID(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc := testContext{} + tc.Start(ctx, t, stopper) + + repl, created, err := tc.store.getOrCreateReplica( + ctx, 42, 7, &roachpb.ReplicaDescriptor{ + NodeID: tc.store.NodeID(), + StoreID: tc.store.StoreID(), + ReplicaID: 7, + }) + require.NoError(t, err) + require.True(t, created) + replicaID, found, err := repl.mu.stateLoader.LoadRaftReplicaID(ctx, tc.store.Engine()) + require.NoError(t, err) + require.True(t, found) + require.Equal(t, roachpb.RaftReplicaID{ReplicaID: 7}, replicaID) +} + func BenchmarkStoreGetReplica(b *testing.B) { ctx := context.Background() stopper := stop.NewStopper() diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 08d7a2322efc..5837411d43ff 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -49,3 +49,12 @@ message RaftSnapshotData { repeated bytes log_entries = 3; reserved 1; } + +message RaftReplicaID { + option (gogoproto.equal) = true; + option (gogoproto.populate) = true; + + // ReplicaID is the ID of the replica with the corresponding HardState. + optional int32 replica_id = 1 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"]; +}