From 863932d1d8ce1504a55ae9d39b37c048a6f4583f Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Fri, 9 Dec 2022 12:10:41 +0100 Subject: [PATCH] kvserver: also load uninitialized replicas, verify replicaID To support a separate raft log[^1] we need to perform certain reconciliation operations at start-up to recover from a lack of atomicity between the state and log engines. This commit gets us closer to being able to do so by listing all replicas before starting the store, which means we now have a handle on which uninitialized replicas exist in the system. As a first application of this knowledge, we now ensure that every replica has a persisted FullReplicaID. Since this would not necessarily be true for stores that have seen older releases, we backfill the ReplicaID in 23.1 and can then require it to be present in a future release that forces a migration through 23.1. [^1]: https://github.com/cockroachdb/cockroach/issues/16624 Epic: CRDB-220 Release note: None --- pkg/kv/kvserver/replica.go | 10 +++++ pkg/kv/kvserver/store.go | 81 ++++++++++++++++++++++++++++++++------ 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 93ebd3df7b31..ecf02b9e5ba0 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1328,6 +1328,16 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked( log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc) } } + diskReplID, found, err := r.mu.stateLoader.LoadRaftReplicaID(ctx, reader) + if err != nil { + log.Fatalf(ctx, "%s", err) + } + if !found { + log.Fatalf(ctx, "no replicaID persisted") + } + if diskReplID.ReplicaID != r.replicaID { + log.Fatalf(ctx, "disk replicaID %d does not match in-mem %d", diskReplID, r.replicaID) + } } // TODO(nvanbenschoten): move the following 5 methods to replica_send.go. diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2ec6084a73e8..1a7602580b23 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -64,6 +65,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -1905,19 +1907,46 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent return ident, err } +type engineReplicas struct { + uninitialized map[storage.FullReplicaID]struct{} + initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor +} + +// loadFullReplicaIDsFromDisk discovers all Replicas on this Store. +// There will only be one entry in the map for a given RangeID. +// +// TODO(sep-raft-log): the reader here is for the log engine. +func loadFullReplicaIDsFromDisk( + ctx context.Context, reader storage.Reader, +) (map[storage.FullReplicaID]struct{}, error) { + m := map[storage.FullReplicaID]struct{}{} + var msg roachpb.RaftReplicaID + if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key { + return keys.RaftReplicaIDKey(rangeID) + }, &msg, func(rangeID roachpb.RangeID) error { + m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{} + return nil + }); err != nil { + return nil, err + } + + // TODO(sep-raft-log): if there is any other data that we mandate is present here + // (like a HardState), validate that here. + + return m, nil +} + // loadAndReconcileReplicas loads the Replicas present on this // store. It reconciles inconsistent state and runs validation checks. // // TODO(sep-raft-log): also load *uninitialized* Replicas. -func loadAndReconcileReplicas( - ctx context.Context, eng storage.Engine, -) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) { +func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) { ident, err := ReadStoreIdent(ctx, eng) if err != nil { return nil, err } - m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} + initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{} // INVARIANT: the latest visible committed version of the RangeDescriptor // (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting // the state of the Replica. @@ -1934,12 +1963,7 @@ func loadAndReconcileReplicas( ident.StoreID, desc) } - // INVARIANT: every Replica has a persisted ReplicaID. For initialized - // Replicas, it matches that of the descriptor. - // - // TODO(sep-raft-log): actually load the persisted ReplicaID and validate - // the above invariant. - m[storage.FullReplicaID{ + initM[storage.FullReplicaID{ RangeID: desc.RangeID, ReplicaID: repDesc.ReplicaID, }] = &desc @@ -1948,7 +1972,38 @@ func loadAndReconcileReplicas( return nil, err } - return m, nil + allM, err := loadFullReplicaIDsFromDisk(ctx, eng) + if err != nil { + return nil, err + } + + for id := range initM { + if _, ok := allM[id]; !ok { + // INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk"). + // + // This invariant is true for replicas created in 22.2, but no migration + // was ever written. So we backfill the replicaID here (as of 23.1) and + // remove this code in the future (the follow-up release, assuming it is + // forced to migrate through 23.1, otherwise later). + if buildutil.CrdbTestBuild { + return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id]) + } + if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil { + return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID) + } + log.Eventf(ctx, "backfilled replicaID for %s", id) + } + // `allM` will be our map of uninitialized replicas. + // + // A replica is "uninitialized" if it's not in initM (i.e. is at log position + // zero and has no visible RangeDescriptor). + delete(allM, id) + } + + return &engineReplicas{ + initialized: initM, + uninitialized: allM, // NB: init'ed ones were deleted earlier + }, nil } // Start the engine, set the GC and read the StoreIdent. @@ -2066,11 +2121,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // concurrently. Note that while we can perform this initialization // concurrently, all of the initialization must be performed before we start // listening for Raft messages and starting the process Raft loop. - descs, err := loadAndReconcileReplicas(ctx, s.engine) + engRepls, err := loadAndReconcileReplicas(ctx, s.engine) if err != nil { return err } - for fullID, desc := range descs { + for fullID, desc := range engRepls.initialized { rep, err := newReplica(ctx, desc, s, fullID.ReplicaID) if err != nil { return err