diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 93ebd3df7b31..ecf02b9e5ba0 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1328,6 +1328,16 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked( log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc) } } + diskReplID, found, err := r.mu.stateLoader.LoadRaftReplicaID(ctx, reader) + if err != nil { + log.Fatalf(ctx, "%s", err) + } + if !found { + log.Fatalf(ctx, "no replicaID persisted") + } + if diskReplID.ReplicaID != r.replicaID { + log.Fatalf(ctx, "disk replicaID %d does not match in-mem %d", diskReplID, r.replicaID) + } } // TODO(nvanbenschoten): move the following 5 methods to replica_send.go. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2ec6084a73e8..1a7602580b23 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -64,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -1905,19 +1907,46 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent return ident, err } +type engineReplicas struct { + uninitialized map[storage.FullReplicaID]struct{} + initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor +} + +// loadFullReplicaIDsFromDisk discovers all Replicas on this Store. +// There will only be one entry in the map for a given RangeID. +// +// TODO(sep-raft-log): the reader here is for the log engine. +func loadFullReplicaIDsFromDisk( + ctx context.Context, reader storage.Reader, +) (map[storage.FullReplicaID]struct{}, error) { + m := map[storage.FullReplicaID]struct{}{} + var msg roachpb.RaftReplicaID + if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key { + return keys.RaftReplicaIDKey(rangeID) + }, &msg, func(rangeID roachpb.RangeID) error { + m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{} + return nil + }); err != nil { + return nil, err + } + + // TODO(sep-raft-log): if there is any other data that we mandate is present here + // (like a HardState), validate that here. + + return m, nil +} + // loadAndReconcileReplicas loads the Replicas present on this // store. It reconciles inconsistent state and runs validation checks. // // TODO(sep-raft-log): also load *uninitialized* Replicas. -func loadAndReconcileReplicas( - ctx context.Context, eng storage.Engine, -) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) { +func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) { ident, err := ReadStoreIdent(ctx, eng) if err != nil { return nil, err } - m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} + initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} // INVARIANT: the latest visible committed version of the RangeDescriptor // (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting // the state of the Replica. @@ -1934,12 +1963,7 @@ func loadAndReconcileReplicas( ident.StoreID, desc) } - // INVARIANT: every Replica has a persisted ReplicaID. For initialized - // Replicas, it matches that of the descriptor. - // - // TODO(sep-raft-log): actually load the persisted ReplicaID and validate - // the above invariant. - m[storage.FullReplicaID{ + initM[storage.FullReplicaID{ RangeID: desc.RangeID, ReplicaID: repDesc.ReplicaID, }] = &desc @@ -1948,7 +1972,38 @@ func loadAndReconcileReplicas( return nil, err } - return m, nil + allM, err := loadFullReplicaIDsFromDisk(ctx, eng) + if err != nil { + return nil, err + } + + for id := range initM { + if _, ok := allM[id]; !ok { + // INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk"). + // + // This invariant is true for replicas created in 22.2, but no migration + // was ever written. So we backfill the replicaID here (as of 23.1) and + // remove this code in the future (the follow-up release, assuming it is + // forced to migrate through 23.1, otherwise later). + if buildutil.CrdbTestBuild { + return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id]) + } + if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil { + return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID) + } + log.Eventf(ctx, "backfilled replicaID for %s", id) + } + // `allM` will be our map of uninitialized replicas. + // + // A replica is "uninitialized" if it's not in initM (i.e. is at log position + // zero and has no visible RangeDescriptor). + delete(allM, id) + } + + return &engineReplicas{ + initialized: initM, + uninitialized: allM, // NB: init'ed ones were deleted earlier + }, nil } // Start the engine, set the GC and read the StoreIdent. @@ -2066,11 +2121,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // concurrently. Note that while we can perform this initialization // concurrently, all of the initialization must be performed before we start // listening for Raft messages and starting the process Raft loop. - descs, err := loadAndReconcileReplicas(ctx, s.engine) + engRepls, err := loadAndReconcileReplicas(ctx, s.engine) if err != nil { return err } - for fullID, desc := range descs { + for fullID, desc := range engRepls.initialized { rep, err := newReplica(ctx, desc, s, fullID.ReplicaID) if err != nil { return err