Skip to content

Commit

Permalink
Merge #75761
Browse files Browse the repository at this point in the history
75761: keys,kvserver: introduce RaftReplicaID r=tbg a=sumeerbhola

The RaftReplicaIDKey is an unreplicated range-id local key that
contains the ReplicaID of the replica whose HardState is represented
in the RaftHardStateKey. These two keys are removed atomically when
we clear the range-id local keys for a replica. See
store_create_replica.go for a detailed comment on correctness
and version compatibility.

We currently do not utilize this information on node restart
to figure out whether we should cleanup stale uninitialized replicas.
Doing such cleanup can wait until we implement and start using
ReplicasStorage. The change here is meant to set us up to rely
on RaftReplicaID from the next release onwards.

Informs #75740

Release note: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Feb 4, 2022
2 parents 5304fed + 0d6e8b6 commit 2065d3d
Show file tree
Hide file tree
Showing 14 changed files with 227 additions and 11 deletions.
14 changes: 11 additions & 3 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ var (
// LocalRangeAppliedStateSuffix is the suffix for the range applied state
// key.
LocalRangeAppliedStateSuffix = []byte("rask")
// LocalRaftTruncatedStateSuffix is the suffix for the
// This was previously used for the replicated RaftTruncatedState. It is no
// longer used and this key has been removed via a migration. See
// LocalRaftTruncatedStateSuffix for the corresponding unreplicated
// RaftTruncatedState.
// Note: This suffix is also used for unreplicated Range-ID keys.
LocalRaftTruncatedStateSuffix = []byte("rftt")
_ = []byte("rftt")
// LocalRangeLeaseSuffix is the suffix for a range lease.
LocalRangeLeaseSuffix = []byte("rll-")
// LocalRangePriorReadSummarySuffix is the suffix for a range's prior read
Expand Down Expand Up @@ -122,6 +123,13 @@ var (
localRaftLastIndexSuffix = []byte("rfti")
// LocalRaftLogSuffix is the suffix for the raft log.
LocalRaftLogSuffix = []byte("rftl")
// LocalRaftReplicaIDSuffix is the suffix for the RaftReplicaID. This is
// written when a replica is created.
LocalRaftReplicaIDSuffix = []byte("rftr")
// LocalRaftTruncatedStateSuffix is the suffix for the unreplicated
// RaftTruncatedState.
LocalRaftTruncatedStateSuffix = []byte("rftt")

// LocalRangeLastReplicaGCTimestampSuffix is the suffix for a range's last
// replica GC timestamp (for GC of old replicas).
LocalRangeLastReplicaGCTimestampSuffix = []byte("rlrt")
Expand Down
1 change: 1 addition & 0 deletions pkg/keys/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ var _ = [...]interface{}{
RangeTombstoneKey, // "rftb"
RaftHardStateKey, // "rfth"
RaftLogKey, // "rftl"
RaftReplicaIDKey, // "rftr"
RaftTruncatedStateKey, // "rftt"
RangeLastReplicaGCTimestampKey, // "rlrt"

Expand Down
10 changes: 10 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ func RaftLogKey(rangeID roachpb.RangeID, logIndex uint64) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftLogKey(logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func RaftReplicaIDKey(rangeID roachpb.RangeID) roachpb.Key {
return MakeRangeIDPrefixBuf(rangeID).RaftReplicaIDKey()
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func RangeLastReplicaGCTimestampKey(rangeID roachpb.RangeID) roachpb.Key {
Expand Down Expand Up @@ -1007,6 +1012,11 @@ func (b RangeIDPrefixBuf) RaftLogKey(logIndex uint64) roachpb.Key {
return encoding.EncodeUint64Ascending(b.RaftLogPrefix(), logIndex)
}

// RaftReplicaIDKey returns a system-local key for a RaftReplicaID.
func (b RangeIDPrefixBuf) RaftReplicaIDKey() roachpb.Key {
return append(b.unreplicatedPrefix(), LocalRaftReplicaIDSuffix...)
}

// RangeLastReplicaGCTimestampKey returns a range-local key for
// the range's last replica GC timestamp.
func (b RangeIDPrefixBuf) RangeLastReplicaGCTimestampKey() roachpb.Key {
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
emptySum: 14695981039346656037,
populatedSum: 1187861800212570275,
},
reflect.TypeOf(&roachpb.RaftReplicaID{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return roachpb.NewPopulatedRaftReplicaID(r, false)
},
emptySum: 598336668751268149,
populatedSum: 9313101058286450988,
},
}

func TestBelowRaftProtos(t *testing.T) {
Expand Down
38 changes: 38 additions & 0 deletions pkg/kv/kvserver/client_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -23,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then
Expand Down Expand Up @@ -70,3 +73,38 @@ func TestStoreSetRangesMaxBytes(t *testing.T) {
return nil
})
}

// TestStoreRaftReplicaID tests that initialized replicas have a
// RaftReplicaID.
func TestStoreRaftReplicaID(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
srv := tc.Server(0)
store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)

scratchKey := tc.ScratchRange(t)
desc, err := tc.LookupRange(scratchKey)
require.NoError(t, err)
repl, err := store.GetReplica(desc.RangeID)
require.NoError(t, err)
replicaID, found, err := stateloader.Make(desc.RangeID).LoadRaftReplicaID(ctx, store.Engine())
require.True(t, found)
require.NoError(t, err)
require.Equal(t, repl.ReplicaID(), replicaID.ReplicaID)

// RHS of a split also has ReplicaID.
splitKey := append(scratchKey, '0', '0')
_, rhsDesc, err := tc.SplitRange(splitKey)
require.NoError(t, err)
rhsRepl, err := store.GetReplica(rhsDesc.RangeID)
require.NoError(t, err)
rhsReplicaID, found, err :=
stateloader.Make(rhsDesc.RangeID).LoadRaftReplicaID(ctx, store.Engine())
require.True(t, found)
require.NoError(t, err)
require.Equal(t, rhsRepl.ReplicaID(), rhsReplicaID.ReplicaID)
}
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ type Replica struct {
// It will not change over the lifetime of this replica. If addressed under
// a newer replicaID, the replica immediately replicaGCs itself to make
// way for the newer incarnation.
// TODO(sumeer): since this is initialized in newUnloadedReplica and never
// changed, lift this out of the mu struct.
replicaID roachpb.ReplicaID
// The minimum allowed ID for this replica. Initialized from
// RangeTombstone.NextReplicaID.
Expand Down Expand Up @@ -710,9 +712,11 @@ func (r *Replica) SafeFormat(w redact.SafePrinter, _ rune) {
r.store.Ident.NodeID, r.store.Ident.StoreID, r.rangeStr.get())
}

// ReplicaID returns the ID for the Replica. It may be zero if the replica does
// not know its ID. Once a Replica has a non-zero ReplicaID it will never change.
// ReplicaID returns the ID for the Replica. This value is fixed for the
// lifetime of the Replica.
func (r *Replica) ReplicaID() roachpb.ReplicaID {
// The locking of mu is unnecessary. It will be removed when we lift
// replicaID out of the mu struct.
r.mu.RLock()
defer r.mu.RUnlock()
return r.mu.replicaID
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,12 @@ func (r *Replica) applySnapshot(
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.mu.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/stateloader/initial.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func WriteInitialRangeState(
ctx context.Context,
readWriter storage.ReadWriter,
desc roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
replicaVersion roachpb.Version,
) error {
initialLease := roachpb.Lease{}
Expand All @@ -108,7 +109,13 @@ func WriteInitialRangeState(
); err != nil {
return err
}
if err := Make(desc.RangeID).SynthesizeRaftState(ctx, readWriter); err != nil {
sl := Make(desc.RangeID)
if err := sl.SynthesizeRaftState(ctx, readWriter); err != nil {
return err
}
// Maintain the invariant that any replica (uninitialized or initialized),
// with persistent state, has a RaftReplicaID.
if err := sl.SetRaftReplicaID(ctx, readWriter, replicaID); err != nil {
return err
}
return nil
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvserver/stateloader/stateloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,29 @@ func (rsl StateLoader) SynthesizeHardState(
err := rsl.SetHardState(ctx, readWriter, newHS)
return errors.Wrapf(err, "writing HardState %+v", &newHS)
}

// SetRaftReplicaID overwrites the RaftReplicaID.
func (rsl StateLoader) SetRaftReplicaID(
ctx context.Context, writer storage.Writer, replicaID roachpb.ReplicaID,
) error {
rid := roachpb.RaftReplicaID{ReplicaID: replicaID}
// "Blind" because ms == nil and timestamp.IsEmpty().
return storage.MVCCBlindPutProto(
ctx,
writer,
nil, /* ms */
rsl.RaftReplicaIDKey(),
hlc.Timestamp{}, /* timestamp */
&rid,
nil, /* txn */
)
}

// LoadRaftReplicaID loads the RaftReplicaID.
func (rsl StateLoader) LoadRaftReplicaID(
ctx context.Context, reader storage.Reader,
) (replicaID roachpb.RaftReplicaID, found bool, err error) {
found, err = storage.MVCCGetProto(ctx, reader, rsl.RaftReplicaIDKey(),
hlc.Timestamp{}, &replicaID, storage.MVCCGetOptions{})
return
}
54 changes: 54 additions & 0 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,60 @@ func (s *Store) tryGetOrCreateReplica(
} else if hs.Commit != 0 {
log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica %s. HS=%+v", repl, hs)
}

// Write the RaftReplicaID for this replica. This is the only place in the
// CockroachDB code that we are creating a new *uninitialized* replica.
// Note that it is possible that we have already created the HardState for
// an uninitialized replica, then crashed, and on recovery are receiving a
// raft message for the same or later replica.
// - Same replica: we are overwriting the RaftReplicaID with the same
// value, which is harmless.
// - Later replica: there may be an existing HardState for the older
// uninitialized replica with Commit=0 and non-zero Term and Vote. Using
// the Term and Vote values for that older replica in the context of
// this newer replica is harmless since it just limits the votes for
// this replica.
//
//
// Compatibility:
// - v21.2 and v22.1: v22.1 unilaterally introduces RaftReplicaID (an
// unreplicated range-id local key). If a v22.1 binary is rolled back at
// a node, the fact that RaftReplicaID was written is harmless to a
// v21.2 node since it does not read it. When a v21.2 drops an
// initialized range, the RaftReplicaID will also be deleted because the
// whole range-ID local key space is deleted.
//
// - v22.2: we will start relying on the presence of RaftReplicaID, and
// remove any unitialized replicas that have a HardState but no
// RaftReplicaID. This removal will happen in ReplicasStorage.Init and
// allow us to tighten invariants. Additionally, knowing the ReplicaID
// for an unitialized range could allow a node to somehow contact the
// raft group (say by broadcasting to all nodes in the cluster), and if
// the ReplicaID is stale, would allow the node to remove the HardState
// and RaftReplicaID. See
// https://github.com/cockroachdb/cockroach/issues/75740.
//
// There is a concern that there could be some replica that survived
// from v21.2 to v22.1 to v22.2 in unitialized state and will be
// incorrectly removed in ReplicasStorage.Init causing the loss of the
// HardState.{Term,Vote} and lead to a "split-brain" wrt leader
// election.
//
// Even though this seems theoretically possible, it is considered
// practically impossible, and not just because a replica's vote is
// unlikely to stay relevant across 2 upgrades. For one, we're always
// going through learners and don't promote until caught up, so
// uninitialized replicas generally never get to vote. Second, even if
// their vote somehow mattered (perhaps we sent a learner a snap which
// was not durably persisted - which we also know is impossible, but
// let's assume it - and then promoted the node and it immediately
// power-cycled, losing the snapshot) the fire-and-forget way in which
// raft votes are requested (in the same raft cycle) makes it extremely
// unlikely that the restarted node would then receive it.
if err := repl.mu.stateLoader.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil {
return err
}

return repl.loadRaftMuLockedReplicaMuLocked(uninitializedDesc)
}(); err != nil {
// Mark the replica as destroyed and remove it from the replicas maps to
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/store_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,12 @@ func WriteInitialClusterData(
EndKey: endKey,
NextReplicaID: 2,
}
const firstReplicaID = 1
replicas := []roachpb.ReplicaDescriptor{
{
NodeID: FirstNodeID,
StoreID: FirstStoreID,
ReplicaID: 1,
ReplicaID: firstReplicaID,
},
}
desc.SetReplicas(roachpb.MakeReplicaSet(replicas))
Expand Down Expand Up @@ -244,7 +245,8 @@ func WriteInitialClusterData(
}
}

if err := stateloader.WriteInitialRangeState(ctx, batch, *desc, initialReplicaVersion); err != nil {
if err := stateloader.WriteInitialRangeState(
ctx, batch, *desc, firstReplicaID, initialReplicaVersion); err != nil {
return err
}
computedStats, err := rditer.ComputeStatsForRange(desc, batch, now.WallTime)
Expand Down
22 changes: 20 additions & 2 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func splitPreApply(
//
// The exception to that is if the DisableEagerReplicaRemoval testing flag is
// enabled.
_, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
rightDesc, hasRightDesc := split.RightDesc.GetReplicaDescriptor(r.StoreID())
_, hasLeftDesc := split.LeftDesc.GetReplicaDescriptor(r.StoreID())
if !hasRightDesc || !hasLeftDesc {
log.Fatalf(ctx, "cannot process split on s%s which does not exist in the split: %+v",
Expand Down Expand Up @@ -100,9 +100,20 @@ func splitPreApply(
log.Fatalf(ctx, "failed to clear range data for removed rhs: %v", err)
}
if rightRepl != nil {
// Cleared the HardState and RaftReplicaID, so rewrite them to the
// current values.
// TODO(sumeer): we know HardState.Commit cannot advance since the RHS
// cannot apply a snapshot yet. But there could be a concurrent change
// to HardState.{Term,Vote} that we would accidentally undo here,
// because we are not actually holding the appropriate mutex. See
// https://github.com/cockroachdb/cockroach/issues/75918.
if err := rightRepl.raftMu.stateLoader.SetHardState(ctx, readWriter, hs); err != nil {
log.Fatalf(ctx, "failed to set hard state with 0 commit index for removed rhs: %v", err)
}
if err := rightRepl.raftMu.stateLoader.SetRaftReplicaID(
ctx, readWriter, rightRepl.ReplicaID()); err != nil {
log.Fatalf(ctx, "failed to set RaftReplicaID for removed rhs: %v", err)
}
}
return
}
Expand All @@ -114,7 +125,14 @@ func splitPreApply(
if err := rsl.SynthesizeRaftState(ctx, readWriter); err != nil {
log.Fatalf(ctx, "%v", err)
}

// Write the RaftReplicaID for the RHS to maintain the invariant that any
// replica (uninitialized or initialized), with persistent state, has a
// RaftReplicaID. NB: this invariant will not be universally true until we
// introduce node startup code that will write this value for existing
// ranges.
if err := rsl.SetRaftReplicaID(ctx, readWriter, rightDesc.ReplicaID); err != nil {
log.Fatalf(ctx, "%v", err)
}
// Persist the closed timestamp.
//
// In order to tolerate a nil initClosedTS input, let's forward to
Expand Down
28 changes: 27 additions & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,7 +2633,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {

uninitDesc := roachpb.RangeDescriptor{RangeID: repl1.Desc().RangeID}
if err := stateloader.WriteInitialRangeState(
ctx, s.Engine(), uninitDesc, roachpb.Version{},
ctx, s.Engine(), uninitDesc, 2, roachpb.Version{},
); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -3046,6 +3046,32 @@ func TestManuallyEnqueueUninitializedReplica(t *testing.T) {
require.Contains(t, err.Error(), "not enqueueing uninitialized replica")
}

// TestStoreGetOrCreateReplicaWritesRaftReplicaID tests that an uninitialized
// replica has a RaftReplicaID.
func TestStoreGetOrCreateReplicaWritesRaftReplicaID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc := testContext{}
tc.Start(ctx, t, stopper)

repl, created, err := tc.store.getOrCreateReplica(
ctx, 42, 7, &roachpb.ReplicaDescriptor{
NodeID: tc.store.NodeID(),
StoreID: tc.store.StoreID(),
ReplicaID: 7,
})
require.NoError(t, err)
require.True(t, created)
replicaID, found, err := repl.mu.stateLoader.LoadRaftReplicaID(ctx, tc.store.Engine())
require.NoError(t, err)
require.True(t, found)
require.Equal(t, roachpb.RaftReplicaID{ReplicaID: 7}, replicaID)
}

func BenchmarkStoreGetReplica(b *testing.B) {
ctx := context.Background()
stopper := stop.NewStopper()
Expand Down
Loading

0 comments on commit 2065d3d

Please sign in to comment.