From 9fc61415c27830db87c7262e5497db3befe484b1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 1 Feb 2023 15:33:53 +0000 Subject: [PATCH] kvserver: factor out replica loading phase This commit factors out Replica state loading and invariant checking so that it is more consolidated rather than interleaved into Replica creation. Replica was loading the state, to assert that the in-memory state matches the state in storage post-factum. Now this is a pre-requisite and input into creating the Replica, and turning it from uninitialized to initialized state. This change also eliminates the concept of "unloaded" Replica. Now the Replica is created in a valid state (either initialized or uninitialized). Release note: none Epic: none --- pkg/kv/kvserver/replica_init.go | 183 +++++++++++++++--------- pkg/kv/kvserver/split_queue_test.go | 2 +- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/store_create_replica.go | 24 +--- pkg/kv/kvserver/store_pool_test.go | 2 +- pkg/kv/kvserver/store_split.go | 14 +- pkg/kv/kvserver/store_test.go | 16 +-- pkg/kv/kvserver/stores_test.go | 2 +- 8 files changed, 141 insertions(+), 104 deletions(-) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index b5964c917b3b..db59ff390636 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -34,6 +36,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" ) const ( @@ -41,56 +44,107 @@ const ( mergeQueueThrottleDuration = 5 * time.Second ) -// newReplica constructs a new Replica. If the desc is initialized, the store -// must be present in it and the corresponding replica descriptor must have -// replicaID as its ReplicaID. -func newReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID, -) (*Replica, error) { - repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID) - repl.raftMu.Lock() - defer repl.raftMu.Unlock() - repl.mu.Lock() - defer repl.mu.Unlock() - - // TODO(pavelkalinnikov): this path is taken only in tests. Remove it and - // assert desc.IsInitialized(). +// loadedReplicaState represents the state of a Replica loaded from storage, and +// is used to initialize the in-memory Replica instance. +type loadedReplicaState struct { + replicaID roachpb.ReplicaID + hardState raftpb.HardState + lastIndex uint64 + replState kvserverpb.ReplicaState +} + +// loadReplicaState loads the state necessary to create a Replica from storage. +func loadReplicaState( + ctx context.Context, + eng storage.Reader, + desc *roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, +) (loadedReplicaState, error) { + sl := stateloader.Make(desc.RangeID) + id, found, err := sl.LoadRaftReplicaID(ctx, eng) + if err != nil { + return loadedReplicaState{}, err + } else if !found { + return loadedReplicaState{}, errors.AssertionFailedf( + "r%d: RaftReplicaID not found", desc.RangeID) + } else if loaded := id.ReplicaID; loaded != replicaID { + return loadedReplicaState{}, errors.AssertionFailedf( + "r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID) + } + + ls := loadedReplicaState{replicaID: replicaID} + if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil { + return loadedReplicaState{}, err + } + if ls.lastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil { + return loadedReplicaState{}, err + } + if ls.replState, err = sl.Load(ctx, eng, desc); err != nil { + return loadedReplicaState{}, err + } + return ls, nil +} + +// check makes sure that the replica invariants hold for the loaded state. +func (r loadedReplicaState) check(storeID roachpb.StoreID) error { + desc := r.replState.Desc + if id := r.replicaID; id == 0 { + return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID) + } + if !desc.IsInitialized() { - repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine()) - return repl, nil + // An uninitialized replica must have an empty HardState.Commit at all + // times. Failure to maintain this invariant indicates corruption. And yet, + // we have observed this in the wild. See #40213. + if hs := r.hardState; hs.Commit != 0 { + return errors.AssertionFailedf( + "r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.replicaID, hs) + } + return nil } + // desc.IsInitialized() == true - if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil { - return nil, err + if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok { + return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID) + } else if replDesc.ReplicaID != r.replicaID { + return errors.AssertionFailedf( + "%+v does not contain replicaID %d for local store s%d", desc, r.replicaID, storeID) } - return repl, nil + return nil } -// newUnloadedReplica partially constructs a Replica. The returned replica is -// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked() -// is called with the correct descriptor. The primary reason this function -// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid -// attempting to fully construct a Replica and load it from storage prior to -// proving that it can exist during the delicate synchronization dance in -// Store.tryGetOrCreateReplica(). A Replica returned from this function must not -// be used in any way until the load method has been called. -func newUnloadedReplica( - ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID, -) *Replica { - if replicaID == 0 { - log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID) +// loadReplica loads and constructs a Replica, after checking its invariants. +func loadReplica( + ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, +) (*Replica, error) { + state, err := loadReplicaState(ctx, store.engine, desc, replicaID) + if err != nil { + return nil, err + } + if err := state.check(store.StoreID()); err != nil { + return nil, err } - uninitState := stateloader.UninitializedReplicaState(rangeID) + return newReplica(ctx, store, state), nil +} + +// newReplica constructs a Replica using its state loaded from storage. It may +// return either an initialized, or an uninitialized Replica, depending on the +// loaded state. If the returned replica is uninitialized, it remains such until +// Replica.initRaftMuLockedReplicaMuLocked() is called. +func newReplica(ctx context.Context, store *Store, loaded loadedReplicaState) *Replica { + state := loaded.replState + rangeID := state.Desc.RangeID r := &Replica{ AmbientContext: store.cfg.AmbientCtx, RangeID: rangeID, - replicaID: replicaID, + replicaID: loaded.replicaID, + startKey: state.Desc.StartKey, creationTime: timeutil.Now(), store: store, abortSpan: abortspan.New(rangeID), concMgr: concurrency.NewManager(concurrency.Config{ NodeDesc: store.nodeDesc, - RangeDesc: uninitState.Desc, + RangeDesc: state.Desc, Settings: store.ClusterSettings(), DB: store.DB(), Clock: store.Clock(), @@ -127,9 +181,23 @@ func newUnloadedReplica( r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) } - // NB: the state will be loaded when the replica gets initialized. - r.mu.state = uninitState - r.rangeStr.store(replicaID, uninitState.Desc) + r.mu.state = state + // Only do this if there was a previous lease. This shouldn't be important + // to do but consider that the first lease which is obtained is back-dated + // to a zero start timestamp (and this de-flakes some tests). If we set the + // min proposed TS here, this lease could not be renewed (by the semantics + // of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits, + // this problem would multiply to a number of replicas at cluster bootstrap. + // Instead, we make the first lease special (which is OK) and the problem + // disappears. + if r.mu.state.Lease.Sequence > 0 { + r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() + } + r.mu.lastIndex = loaded.lastIndex + r.mu.lastTerm = invalidLastTerm + // NB: mutexes don't need to be locked on Replica struct initialization. + r.setDescLockedRaftMuLocked(r.AnnotateCtx(ctx), state.Desc) + // Add replica log tag - the value is rangeStr.String(). r.AmbientContext.AddLogTag("r", &r.rangeStr) r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */) @@ -160,6 +228,7 @@ func newUnloadedReplica( r.breaker = newReplicaCircuitBreaker( store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset, ) + return r } @@ -177,16 +246,15 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { r.startKey = startKey } -// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica -// from storage. After this method returns, Replica is initialized, and can not -// be loaded again. +// initRaftMuLockedReplicaMuLocked initializes the Replica using the state +// loaded from storage. Must not be called more than once on a Replica. // -// This method is called in two places: -// -// 1. newReplica - used when the store is initializing and during testing -// 2. splitPostApply - this call initializes a previously uninitialized Replica. -func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error { +// This method is called only in splitPostApply to initialize a previously +// uninitialized Replica. +func (r *Replica) initRaftMuLockedReplicaMuLocked(s loadedReplicaState) error { ctx := r.AnnotateCtx(context.TODO()) + desc := s.replState.Desc + if !desc.IsInitialized() { return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID) } @@ -204,14 +272,8 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) // group. r.mu.internalRaftGroup = nil - var err error - if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.Engine(), desc); err != nil { - return err - } - r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.Engine()) - if err != nil { - return err - } + r.mu.state = s.replState + r.mu.lastIndex = s.lastIndex r.mu.lastTerm = invalidLastTerm // Ensure that we're not trying to load a replica with a different ID than @@ -229,19 +291,6 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) } r.setDescLockedRaftMuLocked(ctx, desc) - - // Only do this if there was a previous lease. This shouldn't be important - // to do but consider that the first lease which is obtained is back-dated - // to a zero start timestamp (and this de-flakes some tests). If we set the - // min proposed TS here, this lease could not be renewed (by the semantics - // of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits, - // this problem would multiply to a number of replicas at cluster bootstrap. - // Instead, we make the first lease special (which is OK) and the problem - // disappears. - if r.mu.state.Lease.Sequence > 0 { - r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() - } - r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine()) return nil diff --git a/pkg/kv/kvserver/split_queue_test.go b/pkg/kv/kvserver/split_queue_test.go index 9b7ac669ca7a..fbb4bdc82a5f 100644 --- a/pkg/kv/kvserver/split_queue_test.go +++ b/pkg/kv/kvserver/split_queue_test.go @@ -100,7 +100,7 @@ func TestSplitQueueShouldQueue(t *testing.T) { replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID require.NoError(t, logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID)) - repl, err := newReplica(ctx, &cpy, tc.store, replicaID) + repl, err := loadReplica(ctx, tc.store, &cpy, replicaID) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e9c90106b427..3bf1b171c266 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1852,7 +1852,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { return err } for fullID, desc := range engRepls.Initialized { - rep, err := newReplica(ctx, desc, s, fullID.ReplicaID) + rep, err := loadReplica(ctx, s, desc, fullID.ReplicaID) if err != nil { return err } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 8d0162ca673c..13b6fe1687c9 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -222,16 +222,6 @@ func (s *Store) tryGetOrCreateReplica( return nil, false, &roachpb.RaftGroupDeletedError{} } - // An uninitialized replica must have an empty HardState.Commit at all times. - // Failure to maintain this invariant indicates corruption. And yet, we have - // observed this in the wild. See #40213. - sl := stateloader.Make(rangeID) - if hs, err := sl.LoadHardState(ctx, s.Engine()); err != nil { - return nil, false, err - } else if hs.Commit != 0 { - log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica r%d/%d. HS=%+v", - rangeID, replicaID, hs) - } // Write the RaftReplicaID for this replica. This is the only place in the // CockroachDB code that we are creating a new *uninitialized* replica. // Note that it is possible that we have already created the HardState for @@ -281,20 +271,18 @@ func (s *Store) tryGetOrCreateReplica( // power-cycled, losing the snapshot) the fire-and-forget way in which // raft votes are requested (in the same raft cycle) makes it extremely // unlikely that the restarted node would then receive it. + sl := stateloader.Make(rangeID) if err := sl.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil { return nil, false, err } // Create a new uninitialized replica and lock it for raft processing. - repl := newUnloadedReplica(ctx, rangeID, s, replicaID) + uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID} + repl, err := loadReplica(ctx, s, &uninitDesc, replicaID) + if err != nil { + return nil, false, err + } repl.raftMu.Lock() // not unlocked - repl.mu.Lock() - // TODO(pavelkalinnikov): there is little benefit in this check, since loading - // ReplicaID is a no-op after the above write, and the ReplicaState load is - // only for making sure it's empty. Distill the useful IO and make its result - // the direct input into Replica creation, then this check won't be needed. - repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, s.Engine()) - repl.mu.Unlock() // Install the replica in the store's replica map. s.mu.Lock() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index c19bb7d981b8..2243d1a21e15 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -223,7 +223,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { const replicaID = 1 require.NoError(t, logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := newReplica(ctx, &rg, store, replicaID) + replica, err := loadReplica(ctx, store, &rg, replicaID) if err != nil { t.Fatalf("make replica error : %+v", err) } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index d787397674b0..38758d039261 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -261,8 +261,13 @@ func prepareRightReplicaForSplit( } // Finish initialization of the RHS. - err := rightRepl.loadRaftMuLockedReplicaMuLocked(&split.RightDesc) - if err != nil { + if state, err := loadReplicaState( + ctx, r.store.engine, &split.RightDesc, rightRepl.replicaID, + ); err != nil { + log.Fatalf(ctx, "%v", err) + } else if err := state.check(r.store.StoreID()); err != nil { + log.Fatalf(ctx, "%v", err) + } else if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil { log.Fatalf(ctx, "%v", err) } @@ -291,10 +296,9 @@ func prepareRightReplicaForSplit( // until it receives a Raft message addressed to the right-hand range. But // since new replicas start out quiesced, unless we explicitly awaken the // Raft group, there might not be any Raft traffic for quite a while. - err = rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + if err := rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { return true, nil - }) - if err != nil { + }); err != nil { log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 31ce2b1b5071..fd226c3e07f0 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -545,7 +545,7 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) * ); err != nil { panic(err) } - r, err := newReplica(ctx, desc, s, replicaID) + r, err := loadReplica(ctx, s, desc, replicaID) if err != nil { panic(err) } @@ -834,17 +834,13 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) { } newRangeID := roachpb.RangeID(3) - desc := &roachpb.RangeDescriptor{ - RangeID: newRangeID, - } - const replicaID = 1 require.NoError(t, - logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - r, err := newReplica(ctx, desc, store, replicaID) - if err != nil { - t.Fatal(err) - } + logstore.NewStateLoader(newRangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) + + state := stateloader.UninitializedReplicaState(newRangeID) + desc := state.Desc + r := newReplica(ctx, store, loadedReplicaState{replicaID: replicaID, replState: state}) store.mu.Lock() defer store.mu.Unlock() diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go index d26d001042b0..dec9b62eccca 100644 --- a/pkg/kv/kvserver/stores_test.go +++ b/pkg/kv/kvserver/stores_test.go @@ -157,7 +157,7 @@ func TestStoresGetReplicaForRangeID(t *testing.T) { require.NoError(t, logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := newReplica(ctx, desc, store, replicaID) + replica, err := loadReplica(ctx, store, desc, replicaID) if err != nil { t.Fatalf("unexpected error when creating replica: %+v", err) }