Skip to content

Commit

Permalink
kvserver: also load uninitialized replicas, verify replicaID
Browse files Browse the repository at this point in the history
To support a separate raft log[^1] we need to perform certain
reconciliation operations at start-up to recover from a lack
of atomicity between the state and log engines.

This commit gets us closer to being able to do so by listing
all replicas before starting the store, which means we now
have a handle on which uninitialized replicas exist in the
system.

As a first application of this knowledge, we now ensure that every
replica has a persisted FullReplicaID. Since this would not necessarily
be true for stores that have seen older releases, we backfill the
ReplicaID in 23.1 and can then require it to be present in a future
release that forces a migration through 23.1.

[^1]: #16624

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Dec 16, 2022
1 parent 4c4f26d commit 863932d
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 13 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,16 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc)
}
}
diskReplID, found, err := r.mu.stateLoader.LoadRaftReplicaID(ctx, reader)
if err != nil {
log.Fatalf(ctx, "%s", err)
}
if !found {
log.Fatalf(ctx, "no replicaID persisted")
}
if diskReplID.ReplicaID != r.replicaID {
log.Fatalf(ctx, "disk replicaID %d does not match in-mem %d", diskReplID, r.replicaID)
}
}

// TODO(nvanbenschoten): move the following 5 methods to replica_send.go.
Expand Down
81 changes: 68 additions & 13 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -1905,19 +1907,46 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
return ident, err
}

type engineReplicas struct {
uninitialized map[storage.FullReplicaID]struct{}
initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor
}

// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{}
return nil
}); err != nil {
return nil, err
}

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.

return m, nil
}

// loadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TODO(sep-raft-log): also load *uninitialized* Replicas.
func loadAndReconcileReplicas(
ctx context.Context, eng storage.Engine,
) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) {
func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
}

m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
Expand All @@ -1934,12 +1963,7 @@ func loadAndReconcileReplicas(
ident.StoreID, desc)
}

// INVARIANT: every Replica has a persisted ReplicaID. For initialized
// Replicas, it matches that of the descriptor.
//
// TODO(sep-raft-log): actually load the persisted ReplicaID and validate
// the above invariant.
m[storage.FullReplicaID{
initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
Expand All @@ -1948,7 +1972,38 @@ func loadAndReconcileReplicas(
return nil, err
}

return m, nil
allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
if err != nil {
return nil, err
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
}
// `allM` will be our map of uninitialized replicas.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).
delete(allM, id)
}

return &engineReplicas{
initialized: initM,
uninitialized: allM, // NB: init'ed ones were deleted earlier
}, nil
}

// Start the engine, set the GC and read the StoreIdent.
Expand Down Expand Up @@ -2066,11 +2121,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// concurrently. Note that while we can perform this initialization
// concurrently, all of the initialization must be performed before we start
// listening for Raft messages and starting the process Raft loop.
descs, err := loadAndReconcileReplicas(ctx, s.engine)
engRepls, err := loadAndReconcileReplicas(ctx, s.engine)
if err != nil {
return err
}
for fullID, desc := range descs {
for fullID, desc := range engRepls.initialized {
rep, err := newReplica(ctx, desc, s, fullID.ReplicaID)
if err != nil {
return err
Expand Down

0 comments on commit 863932d

Please sign in to comment.