From bcf42cc77b571fe8166af32ce2b2c281f236f2ea Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Mon, 31 Jan 2022 17:29:39 -0500 Subject: [PATCH] keys,kvserver: introduce RaftReplicaID The RaftReplicaIDKey is an unreplicated range-id local key that contains the ReplicaID of the replica whose HardState is represented in the RaftHardStateKey. These two keys share the same lifetime, and are removed atomically when we clear the range-id local keys for a replica. We currently do not utilize this information on node restart to figure out whether we should cleanup stale uninitialized replicas. Doing such cleanup can wait until we implement and start using ReplicasStorage. The change here is meant to set us up to rely on RaftReplicaID from the next release onwards. Informs #75740 Release note: None --- pkg/keys/constants.go | 14 +++++++++++--- pkg/keys/doc.go | 1 + pkg/keys/keys.go | 10 ++++++++++ pkg/kv/kvserver/replica.go | 11 +++++++++-- pkg/kv/kvserver/replica_raft.go | 14 ++++++++++++++ pkg/kv/kvserver/replica_raftstorage.go | 14 ++++++++++++++ pkg/kv/kvserver/stateloader/initial.go | 11 ++++++++++- pkg/kv/kvserver/stateloader/stateloader.go | 17 +++++++++++++++++ pkg/kv/kvserver/store_create_replica.go | 19 +++++++++++++++++++ pkg/kv/kvserver/store_init.go | 6 ++++-- pkg/kv/kvserver/store_split.go | 19 +++++++++++++++++-- pkg/kv/kvserver/store_test.go | 2 +- pkg/roachpb/internal_raft.proto | 7 +++++++ 13 files changed, 134 insertions(+), 11 deletions(-) diff --git a/pkg/keys/constants.go b/pkg/keys/constants.go index 7d1adb9ec861..4db431b498f6 100644 --- a/pkg/keys/constants.go +++ b/pkg/keys/constants.go @@ -87,10 +87,11 @@ var ( // LocalRangeAppliedStateSuffix is the suffix for the range applied state // key. LocalRangeAppliedStateSuffix = []byte("rask") - // LocalRaftTruncatedStateSuffix is the suffix for the + // This was previously used for the replicated RaftTruncatedState. It is no + // longer used and this key has been removed via a migration. See + // LocalRaftTruncatedStateSuffix for the corresponding unreplicated // RaftTruncatedState. - // Note: This suffix is also used for unreplicated Range-ID keys. - LocalRaftTruncatedStateSuffix = []byte("rftt") + _ = []byte("rftt") // LocalRangeLeaseSuffix is the suffix for a range lease. LocalRangeLeaseSuffix = []byte("rll-") // LocalRangePriorReadSummarySuffix is the suffix for a range's prior read @@ -122,6 +123,13 @@ var ( localRaftLastIndexSuffix = []byte("rfti") // LocalRaftLogSuffix is the suffix for the raft log. LocalRaftLogSuffix = []byte("rftl") + // LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is + // written when the HardState for a particular ReplicaID is first written. + LocalRaftReplicaIDSuffix = []byte("rftr") + // LocalRaftTruncatedStateSuffix is the suffix for the unreplicated + // RaftTruncatedState. + LocalRaftTruncatedStateSuffix = []byte("rftt") + // LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last // replica GC timestamp (for GC of old replicas). LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt") diff --git a/pkg/keys/doc.go b/pkg/keys/doc.go index 7a4248130ed7..47f5fc5dfc58 100644 --- a/pkg/keys/doc.go +++ b/pkg/keys/doc.go @@ -199,6 +199,7 @@ var _ = [...]interface{}{ RangeTombstoneKey, // "rftb" RaftHardStateKey, // "rfth" RaftLogKey, // "rftl" + RaftReplicaIDKey, // "rftr" RaftTruncatedStateKey, // "rftt" RangeLastReplicaGCTimestampKey, // "rlrt" diff --git a/pkg/keys/keys.go b/pkg/keys/keys.go index de6eae1dcbb3..8e768cc8eafa 100644 --- a/pkg/keys/keys.go +++ b/pkg/keys/keys.go @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key { return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key { + return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey() +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key { @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key { return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex) } +// RaftReplicaIDKey returns a system-local key for a RaftReplicaID. +func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key { + return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...) +} + // RangeLastReplicaGCTimestampKey returns a range-local key for // the range's last replica GC timestamp. func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key { diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index e71f71ec83e1..1502de6500b1 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -520,7 +520,12 @@ type Replica struct { // It will not change over the lifetime of this replica. If addressed under // a newer replicaID, the replica immediately replicaGCs itself to make // way for the newer incarnation. + // TODO(sumeer): since this is initialized in newUnloadedReplica and never + // changed, lift this out of the mu struct. replicaID roachpb.ReplicaID + // wroteReplicaID transitions once to true, when RaftReplicaID is written + // to the store. + wroteReplicaID bool // The minimum allowed ID for this replica. Initialized from // RangeTombstone.NextReplicaID. tombstoneMinReplicaID roachpb.ReplicaID @@ -710,9 +715,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) { r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get()) } -// ReplicaID returns the ID for the Replica. It may be zero if the replica does -// not know its ID. Once a Replica has a non-zero ReplicaID it will never change. +// ReplicaID returns the ID for the Replica. This value is fixed for the +// lifetime of the Replica. func (r *Replica) ReplicaID() roachpb.ReplicaID { + // The locking of mu is unnecessary. It will be removed when we lift + // replicaID out of the mu struct. r.mu.RLock() defer r.mu.RUnlock() return r.mu.replicaID diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f2680ff5b043..01a712a8e3d1 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -583,6 +583,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return unquiesceAndWakeLeader, nil }) r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + alreadyWroteReplicaID := r.mu.wroteReplicaID r.mu.Unlock() if errors.Is(err, errRemoved) { // If we've been removed then just return. @@ -796,6 +797,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, expl, errors.Wrap(err, expl) } } + wroteReplicaID := false if !raft.IsEmptyHardState(rd.HardState) { if !r.IsInitialized() && rd.HardState.Commit != 0 { log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) @@ -812,6 +814,15 @@ func (r *Replica) handleRaftReadyRaftMuLocked( const expl = "during setHardState" return stats, expl, errors.Wrap(err, expl) } + // It is possible that we have set HardState for the first time, in which + // case alreadyWroteReplicaID will be false. + if !alreadyWroteReplicaID { + if err := r.raftMu.stateLoader.SetRaftReplicaID(ctx, batch, r.ReplicaID()); err != nil { + const expl = "during setRaftReplicaID" + return stats, expl, errors.Wrap(err, expl) + } + wroteReplicaID = true + } } // Synchronously commit the batch with the Raft log entries and Raft hard // state as we're promising not to lose this data. @@ -870,6 +881,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // previously the leader. becameLeader = r.mu.leaderID == r.mu.replicaID } + if wroteReplicaID { + r.mu.wroteReplicaID = wroteReplicaID + } r.mu.Unlock() // When becoming the leader, proactively add the replica to the replicate diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ea5711c7bd14..a557cb43cc7d 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -859,10 +859,21 @@ func (r *Replica) applySnapshot( return errors.Wrapf(err, "error clearing range of unreplicated SST writer") } + // The HardState and RaftReplicaID should typically already exist for this + // replica, unless this snapshot application is the first time raft.Ready is + // being processed. In that case we must write the RaftReplicaID so that it + // shares the same lifetime as the HardState. Additionally, we've cleared + // all the raft state above, so we are forced to write the RaftReplicaID + // here regardless of what happened before. + // // Update HardState. if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") } + if err := r.raftMu.stateLoader.SetRaftReplicaID( + ctx, &unreplicatedSST, r.mu.replicaID); err != nil { + return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) @@ -989,6 +1000,9 @@ func (r *Replica) applySnapshot( // Snapshots typically have fewer log entries than the leaseholder. The next // time we hold the lease, recompute the log size before making decisions. r.mu.raftLogSizeTrusted = false + // RaftReplicaID is definitely written due to the earlier logic in this + // function. + r.mu.wroteReplicaID = true // Invoke the leasePostApply method to ensure we properly initialize the // replica according to whether it holds the lease. We allow jumps in the diff --git a/pkg/kv/kvserver/stateloader/initial.go b/pkg/kv/kvserver/stateloader/initial.go index f1688147a310..3ddc0d750226 100644 --- a/pkg/kv/kvserver/stateloader/initial.go +++ b/pkg/kv/kvserver/stateloader/initial.go @@ -96,6 +96,7 @@ func WriteInitialRangeState( ctx context.Context, readWriter storage.ReadWriter, desc roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, replicaVersion roachpb.Version, ) error { initialLease := roachpb.Lease{} @@ -108,7 +109,15 @@ func WriteInitialRangeState( ); err != nil { return err } - if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil { + sl := Make(desc.RangeID) + if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil { + return err + } + // It is inconvenient that we cannot set Replica.mu.wroteReplicaID=true + // since we don't have a Replica object yet. This is harmless since we will + // just write the RaftReplicaID again later. The invariant that HardState + // and RaftReplicaID both exist in the store is not being violated. + if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil { return err } return nil diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 2ab3273f5320..5790fdd8bed1 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -434,3 +434,20 @@ func (rsl StateLoader) SynthesizeHardState( err := rsl.SetHardState(ctx, readWriter, newHS) return errors.Wrapf(err, "writing HardState %+v", &newHS) } + +// SetRaftReplicaID overwrites the RaftReplicaID. +func (rsl StateLoader) SetRaftReplicaID( + ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID, +) error { + rid := roachpb.RaftReplicaID{ReplicaID: replicaID} + // "Blind" because ms == nil and timestamp.IsEmpty(). + return storage.MVCCBlindPutProto( + ctx, + writer, + nil, /* ms */ + rsl.RaftReplicaIDKey(), + hlc.Timestamp{}, /* timestamp */ + &rid, + nil, /* txn */ + ) +} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 81f05d939837..fc2c413a7cda 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -146,6 +146,25 @@ func (s *Store) tryGetOrCreateReplica( } else if ok && replicaID != 0 && replicaID < tombstone.NextReplicaID { return nil, false, &roachpb.RaftGroupDeletedError{} } + // It is possible that we have already created the HardState for an + // uninitialized replica, then crashed, and on recovery are receiving a raft + // message for the same or later replica. In either case we will create a + // Replica with Replica.mu.wroteReplicaID=false, and will eventually write + // the HardState and RaftReplicaID to the correct value. However, in the + // latter case there is some time interval during which the Replica object + // has a newer ReplicaID than the one in the store, and a stored HardState + // that is for the older ReplicaID. This seems harmless, but to be more + // precise about the invariants we should remove the stale + // persistent state here. + // + // TODO(sumeer): once we have RaftReplicaID being populated for all replicas + // and we have purged replicas that don't populate it, we can read the + // (HardState,RaftReplicaID) here and find one of the following cases: + // - HardState exists, RaftReplicaID not exists: must be a purged replica so + // we can delete HardState. + // - HardState exists, RaftReplicaID exists: if the latter is old, remove + // both HardState and RaftReplicaID. + // - Neither exits: nothing to do. // Create a new replica and lock it for raft processing. uninitializedDesc := &roachpb.RangeDescriptor{ diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 33aac9a89072..321c8fc21ac1 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -173,11 +173,12 @@ func WriteInitialClusterData( EndKey: endKey, NextReplicaID: 2, } + const firstReplicaID = 1 replicas := []roachpb.ReplicaDescriptor{ { NodeID: FirstNodeID, StoreID: FirstStoreID, - ReplicaID: 1, + ReplicaID: firstReplicaID, }, } desc.SetReplicas(roachpb.MakeReplicaSet(replicas)) @@ -244,7 +245,8 @@ func WriteInitialClusterData( } } - if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil { + if err := stateloader.WriteInitialRangeState( + ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil { return err } computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime) diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 2294ca8ecd90..5316b3fb242a 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -42,7 +42,7 @@ func splitPreApply( // // The exception to that is if the DisableEagerReplicaRemoval testing flag is // enabled. - _, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) + rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID()) _, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID()) if !hasRightDesc || !hasLeftDesc { log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v", @@ -100,9 +100,18 @@ func splitPreApply( log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err) } if rightRepl != nil { + // Cleared the HardState and RaftReplicaID, so rewrite them to the + // current values. + // TODO(sumeer): we know HardState.Commit cannot advance since the RHS + // cannot apply a snapshot yet. But what prevents a concurrent change to + // HardState.{Term,Vote} that we would accidentally undo here. if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil { log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err) } + if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID( + ctx, readWriter, rightRepl.ReplicaID()); err != nil { + log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err) + } } return } @@ -114,7 +123,10 @@ func splitPreApply( if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil { log.Fatalf(ctx, "%v", err) } - + // Write the RaftReplicaID for the RHS. + if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil { + log.Fatalf(ctx, "%v", err) + } // Persist the closed timestamp. // // In order to tolerate a nil initClosedTS input, let's forward to @@ -161,6 +173,9 @@ func splitPostApply( log.Fatalf(ctx, "%s: found replica which is RHS of a split "+ "without a valid tenant ID", rightReplOrNil) } + rightReplOrNil.mu.Lock() + rightReplOrNil.mu.wroteReplicaID = true + rightReplOrNil.mu.Unlock() } now := r.store.Clock().NowAsClockTimestamp() diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9f7d033cff05..17b6019248ea 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID} if err := stateloader.WriteInitialRangeState( - ctx, s.Engine(), uninitDesc, roachpb.Version{}, + ctx, s.Engine(), uninitDesc, 2, roachpb.Version{}, ); err != nil { t.Fatal(err) } diff --git a/pkg/roachpb/internal_raft.proto b/pkg/roachpb/internal_raft.proto index 08d7a2322efc..2c9ac3d6e724 100644 --- a/pkg/roachpb/internal_raft.proto +++ b/pkg/roachpb/internal_raft.proto @@ -49,3 +49,10 @@ message RaftSnapshotData { repeated bytes log_entries = 3; reserved 1; } + +message RaftReplicaID { + option (gogoproto.equal) = true; + // ReplicaID is the ID of the replica with the corresponding HardState. + optional int32 replica_id = 1 [(gogoproto.nullable) = false, + (gogoproto.customname) = "ReplicaID", (gogoproto.casttype) = "ReplicaID"]; +}