From bbfba80425e872156c75e3309893fe35e6b946d6 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 1 Feb 2023 15:33:53 +0000 Subject: [PATCH 1/5] kvserver: factor out replica loading phase This commit factors out Replica state loading and invariant checking so that it is more consolidated rather than interleaved into Replica creation. Replica creation path was loading this state, to post-factum assert that the in-memory state matches the state in storage. Now this is a pre-requisite and input into creating the Replica, and turning it from uninitialized to initialized state. This change also eliminates the concept of "unloaded" Replica. Now the Replica is created in a valid state (either initialized or uninitialized). Release note: none Epic: none --- pkg/kv/kvserver/replica_init.go | 197 +++++++++++++++--------- pkg/kv/kvserver/split_queue_test.go | 2 +- pkg/kv/kvserver/store.go | 5 +- pkg/kv/kvserver/store_create_replica.go | 29 ++-- pkg/kv/kvserver/store_pool_test.go | 2 +- pkg/kv/kvserver/store_split.go | 12 +- pkg/kv/kvserver/store_test.go | 17 +- pkg/kv/kvserver/stores_test.go | 2 +- 8 files changed, 155 insertions(+), 111 deletions(-) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index c2494e35bba7..7a3e4fd0e773 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -34,6 +36,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" ) const ( @@ -41,45 +44,112 @@ const ( mergeQueueThrottleDuration = 5 * time.Second ) -// newReplica constructs a new Replica. If the desc is initialized, the store -// must be present in it and the corresponding replica descriptor must have -// replicaID as its ReplicaID. -func newReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID, -) (*Replica, error) { - repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID) - repl.raftMu.Lock() - defer repl.raftMu.Unlock() - repl.mu.Lock() - defer repl.mu.Unlock() - - // TODO(pavelkalinnikov): this path is taken only in tests. Remove it and - // assert desc.IsInitialized(). +// loadedReplicaState represents the state of a Replica loaded from storage, and +// is used to initialize the in-memory Replica instance. +// TODO(pavelkalinnikov): move to kvstorage, integrate with kvstorage.Replica. +type loadedReplicaState struct { + replicaID roachpb.ReplicaID + hardState raftpb.HardState + lastIndex uint64 + replState kvserverpb.ReplicaState +} + +// loadReplicaState loads the state necessary to create a Replica with the +// specified range descriptor, which can be either initialized or uninitialized. +// TODO(pavelkalinnikov): integrate with stateloader. +func loadReplicaState( + ctx context.Context, + eng storage.Reader, + desc *roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, +) (loadedReplicaState, error) { + sl := stateloader.Make(desc.RangeID) + id, found, err := sl.LoadRaftReplicaID(ctx, eng) + if err != nil { + return loadedReplicaState{}, err + } else if !found { + return loadedReplicaState{}, errors.AssertionFailedf( + "r%d: RaftReplicaID not found", desc.RangeID) + } else if loaded := id.ReplicaID; loaded != replicaID { + return loadedReplicaState{}, errors.AssertionFailedf( + "r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID) + } + + ls := loadedReplicaState{replicaID: replicaID} + if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil { + return loadedReplicaState{}, err + } + if ls.lastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil { + return loadedReplicaState{}, err + } + if ls.replState, err = sl.Load(ctx, eng, desc); err != nil { + return loadedReplicaState{}, err + } + return ls, nil +} + +// check makes sure that the replica invariants hold for the loaded state. +func (r loadedReplicaState) check(storeID roachpb.StoreID) error { + desc := r.replState.Desc + if r.replicaID == 0 { + return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID) + } + if !desc.IsInitialized() { - repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine()) - return repl, nil + // An uninitialized replica must have an empty HardState.Commit at all + // times. Failure to maintain this invariant indicates corruption. And yet, + // we have observed this in the wild. See #40213. + if hs := r.hardState; hs.Commit != 0 { + return errors.AssertionFailedf( + "r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.replicaID, hs) + } + // TODO(pavelkalinnikov): assert r.lastIndex == 0? + return nil + } + // desc.IsInitialized() == true + + // INVARIANT: a replica's RangeDescriptor always contains the local Store. + if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok { + return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID) + } else if replDesc.ReplicaID != r.replicaID { + return errors.AssertionFailedf( + "%+v does not contain replicaID %d for local store s%d", desc, r.replicaID, storeID) } + return nil +} - if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil { +// loadInitializedReplica loads and constructs an initialized Replica, after +// checking its invariants. +func loadInitializedReplica( + ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, +) (*Replica, error) { + if !desc.IsInitialized() { + return nil, errors.AssertionFailedf("can not load with uninitialized descriptor: %s", desc) + } + state, err := loadReplicaState(ctx, store.engine, desc, replicaID) + if err != nil { return nil, err } - return repl, nil + r := newUninitializedReplica(store, desc.RangeID, replicaID) + r.raftMu.Lock() + defer r.raftMu.Unlock() + r.mu.Lock() + defer r.mu.Unlock() + if err := r.initRaftMuLockedReplicaMuLocked(state); err != nil { + return nil, err + } + return r, nil } -// newUnloadedReplica partially constructs a Replica. The returned replica is -// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked() -// is called with the correct descriptor. The primary reason this function -// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid -// attempting to fully construct a Replica and load it from storage prior to -// proving that it can exist during the delicate synchronization dance in -// Store.tryGetOrCreateReplica(). A Replica returned from this function must not -// be used in any way until the load method has been called. -func newUnloadedReplica( - ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID, +// newUninitializedReplica constructs an uninitialized Replica with the given +// range/replica ID. The returned replica remains uninitialized until +// Replica.loadRaftMuLockedReplicaMuLocked() is called. +// +// TODO(#94912): we actually have another initialization path which should be +// refactored: Store.maybeMarkReplicaInitializedLockedReplLocked(). +func newUninitializedReplica( + store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID, ) *Replica { - if replicaID == 0 { - log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID) - } uninitState := stateloader.UninitializedReplicaState(rangeID) r := &Replica{ AmbientContext: store.cfg.AmbientCtx, @@ -177,26 +247,29 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { r.startKey = startKey } -// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica -// from storage. After this method returns, Replica is initialized, and can not -// be loaded again. +// initRaftMuLockedReplicaMuLocked initializes the Replica using the state +// loaded from storage. Must not be called more than once on a Replica. // -// This method is called in two places: -// -// 1. newReplica - used when the store is initializing and during testing -// 2. splitPostApply - this call initializes a previously uninitialized Replica. -func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error { - ctx := r.AnnotateCtx(context.TODO()) - if !desc.IsInitialized() { - return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID) +// This method is called in: +// - loadInitializedReplica, to finalize creating an initialized replica; +// - splitPostApply, to initialize a previously uninitialized replica. +func (r *Replica) initRaftMuLockedReplicaMuLocked(s loadedReplicaState) error { + if err := s.check(r.StoreID()); err != nil { + return err } - if r.IsInitialized() { - return errors.AssertionFailedf("r%d: cannot reinitialize an initialized replica", desc.RangeID) - } else if r.replicaID == 0 { - // NB: This is just a defensive check as r.mu.replicaID should never be 0. - return errors.AssertionFailedf("r%d: cannot initialize replica without a replicaID", - desc.RangeID) + desc := s.replState.Desc + // Ensure that the loaded state corresponds to the same replica. + if desc.RangeID != r.RangeID || s.replicaID != r.replicaID { + return errors.AssertionFailedf( + "%s: trying to init with other replica's state r%d/%d", r, desc.RangeID, s.replicaID) + } + // Ensure that we transition to initialized replica, and do it only once. + if !desc.IsInitialized() { + return errors.AssertionFailedf("%s: cannot init replica with uninitialized descriptor", r) + } else if r.IsInitialized() { + return errors.AssertionFailedf("%s: cannot reinitialize an initialized replica", r) } + r.setStartKeyLocked(desc.StartKey) // Clear the internal raft group in case we're being reset. Since we're @@ -204,31 +277,11 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) // group. r.mu.internalRaftGroup = nil - var err error - if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.Engine(), desc); err != nil { - return err - } - r.mu.lastIndexNotDurable, err = r.mu.stateLoader.LoadLastIndex(ctx, r.Engine()) - if err != nil { - return err - } + r.mu.state = s.replState + r.mu.lastIndexNotDurable = s.lastIndex r.mu.lastTermNotDurable = invalidLastTerm - // Ensure that we're not trying to load a replica with a different ID than - // was used to construct this Replica. - var replicaID roachpb.ReplicaID - if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found { - replicaID = replicaDesc.ReplicaID - } else { - return errors.AssertionFailedf("r%d: cannot initialize replica which is not in descriptor %v", - desc.RangeID, desc) - } - if r.replicaID != replicaID { - return errors.AssertionFailedf("attempting to initialize a replica which has ID %d with ID %d", - r.replicaID, replicaID) - } - - r.setDescLockedRaftMuLocked(ctx, desc) + r.setDescLockedRaftMuLocked(r.AnnotateCtx(context.TODO()), desc) // Only do this if there was a previous lease. This shouldn't be important // to do but consider that the first lease which is obtained is back-dated @@ -242,8 +295,6 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() } - r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine()) - return nil } diff --git a/pkg/kv/kvserver/split_queue_test.go b/pkg/kv/kvserver/split_queue_test.go index 9b7ac669ca7a..53f9503dcdeb 100644 --- a/pkg/kv/kvserver/split_queue_test.go +++ b/pkg/kv/kvserver/split_queue_test.go @@ -100,7 +100,7 @@ func TestSplitQueueShouldQueue(t *testing.T) { replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID require.NoError(t, logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID)) - repl, err := newReplica(ctx, &cpy, tc.store, replicaID) + repl, err := loadInitializedReplica(ctx, tc.store, &cpy, replicaID) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 4d9f7fcc829c..c39bf59708a2 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1861,7 +1861,10 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Uninitialized Replicas are not currently instantiated at store start. continue } - rep, err := newReplica(ctx, repl.Desc, s, repl.ReplicaID) + // TODO(sep-raft-log): construct the loaded Replica state directly in + // LoadAndReconcileReplicas, which loads and checks most of it already, then + // feed it to Replica creation, to avoid double-loading. + rep, err := loadInitializedReplica(ctx, s, repl.Desc, repl.ReplicaID) if err != nil { return err } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 947b000c39a0..10b4d5efc848 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -222,16 +222,6 @@ func (s *Store) tryGetOrCreateReplica( return nil, false, &roachpb.RaftGroupDeletedError{} } - // An uninitialized replica must have an empty HardState.Commit at all times. - // Failure to maintain this invariant indicates corruption. And yet, we have - // observed this in the wild. See #40213. - sl := stateloader.Make(rangeID) - if hs, err := sl.LoadHardState(ctx, s.Engine()); err != nil { - return nil, false, err - } else if hs.Commit != 0 { - log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica r%d/%d. HS=%+v", - rangeID, replicaID, hs) - } // Write the RaftReplicaID for this replica. This is the only place in the // CockroachDB code that we are creating a new *uninitialized* replica. // Note that it is possible that we have already created the HardState for @@ -258,20 +248,23 @@ func (s *Store) tryGetOrCreateReplica( // HardState but no RaftReplicaID, see kvstorage.LoadAndReconcileReplicas. // So after first call to this method we have the invariant that all replicas // have a RaftReplicaID persisted. + sl := stateloader.Make(rangeID) if err := sl.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil { return nil, false, err } + // Make sure that storage invariants for this uninitialized replica hold. + uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID} + state, err := loadReplicaState(ctx, s.engine, &uninitDesc, replicaID) + if err != nil { + return nil, false, err + } else if err := state.check(s.StoreID()); err != nil { + return nil, false, err + } + // Create a new uninitialized replica and lock it for raft processing. - repl := newUnloadedReplica(ctx, rangeID, s, replicaID) + repl := newUninitializedReplica(s, rangeID, replicaID) repl.raftMu.Lock() // not unlocked - repl.mu.Lock() - // TODO(pavelkalinnikov): there is little benefit in this check, since loading - // ReplicaID is a no-op after the above write, and the ReplicaState load is - // only for making sure it's empty. Distill the useful IO and make its result - // the direct input into Replica creation, then this check won't be needed. - repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, s.Engine()) - repl.mu.Unlock() // Install the replica in the store's replica map. s.mu.Lock() diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index c19bb7d981b8..cef730713330 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -223,7 +223,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { const replicaID = 1 require.NoError(t, logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := newReplica(ctx, &rg, store, replicaID) + replica, err := loadInitializedReplica(ctx, store, &rg, replicaID) if err != nil { t.Fatalf("make replica error : %+v", err) } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index d787397674b0..7c6f115b6f0f 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -261,8 +261,11 @@ func prepareRightReplicaForSplit( } // Finish initialization of the RHS. - err := rightRepl.loadRaftMuLockedReplicaMuLocked(&split.RightDesc) - if err != nil { + if state, err := loadReplicaState( + ctx, r.store.engine, &split.RightDesc, rightRepl.replicaID, + ); err != nil { + log.Fatalf(ctx, "%v", err) + } else if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil { log.Fatalf(ctx, "%v", err) } @@ -291,10 +294,9 @@ func prepareRightReplicaForSplit( // until it receives a Raft message addressed to the right-hand range. But // since new replicas start out quiesced, unless we explicitly awaken the // Raft group, there might not be any Raft traffic for quite a while. - err = rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { + if err := rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) { return true, nil - }) - if err != nil { + }); err != nil { log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index bebeebef138f..b072da8a5a9c 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -546,7 +546,7 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) * ); err != nil { panic(err) } - r, err := newReplica(ctx, desc, s, replicaID) + r, err := loadInitializedReplica(ctx, s, desc, replicaID) if err != nil { panic(err) } @@ -835,17 +835,12 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) { } newRangeID := roachpb.RangeID(3) - desc := &roachpb.RangeDescriptor{ - RangeID: newRangeID, - } - const replicaID = 1 require.NoError(t, - logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - r, err := newReplica(ctx, desc, store, replicaID) - if err != nil { - t.Fatal(err) - } + logstore.NewStateLoader(newRangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) + + r := newUninitializedReplica(store, newRangeID, replicaID) + require.NoError(t, err) store.mu.Lock() defer store.mu.Unlock() @@ -861,7 +856,7 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) { }() // Initialize the range with start and end keys. - desc = protoutil.Clone(desc).(*roachpb.RangeDescriptor) + desc := protoutil.Clone(r.Desc()).(*roachpb.RangeDescriptor) desc.StartKey = roachpb.RKey("b") desc.EndKey = roachpb.RKey("d") desc.InternalReplicas = []roachpb.ReplicaDescriptor{{ diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go index c7c90ad16e2a..6104c48f6d63 100644 --- a/pkg/kv/kvserver/stores_test.go +++ b/pkg/kv/kvserver/stores_test.go @@ -157,7 +157,7 @@ func TestStoresGetReplicaForRangeID(t *testing.T) { require.NoError(t, logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := newReplica(ctx, desc, store, replicaID) + replica, err := loadInitializedReplica(ctx, store, desc, replicaID) if err != nil { t.Fatalf("unexpected error when creating replica: %+v", err) } From 17ab9d424a1ce19fa63954ec86db31d000d9a3a3 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 6 Feb 2023 20:44:50 +0000 Subject: [PATCH 2/5] kvserver,kvstorage: move replica state loading to kvstorage Release note: none Epic: none --- pkg/kv/kvserver/kvstorage/BUILD.bazel | 3 + pkg/kv/kvserver/kvstorage/replica_state.go | 97 ++++++++++++++++++++++ pkg/kv/kvserver/replica_init.go | 94 ++------------------- pkg/kv/kvserver/store_create_replica.go | 5 +- pkg/kv/kvserver/store_split.go | 3 +- 5 files changed, 114 insertions(+), 88 deletions(-) create mode 100644 pkg/kv/kvserver/kvstorage/replica_state.go diff --git a/pkg/kv/kvserver/kvstorage/BUILD.bazel b/pkg/kv/kvserver/kvstorage/BUILD.bazel index 1974aafe25b1..5887d87140f9 100644 --- a/pkg/kv/kvserver/kvstorage/BUILD.bazel +++ b/pkg/kv/kvserver/kvstorage/BUILD.bazel @@ -7,13 +7,16 @@ go_library( "cluster_version.go", "doc.go", "init.go", + "replica_state.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage", visibility = ["//visibility:public"], deps = [ "//pkg/clusterversion", "//pkg/keys", + "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/logstore", + "//pkg/kv/kvserver/stateloader", "//pkg/roachpb", "//pkg/storage", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/kvstorage/replica_state.go b/pkg/kv/kvserver/kvstorage/replica_state.go new file mode 100644 index 000000000000..74080e2f9fa2 --- /dev/null +++ b/pkg/kv/kvserver/kvstorage/replica_state.go @@ -0,0 +1,97 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvstorage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/errors" + "go.etcd.io/raft/v3/raftpb" +) + +// LoadedReplicaState represents the state of a Replica loaded from storage, and +// is used to initialize the in-memory Replica instance. +// TODO(pavelkalinnikov): integrate with kvstorage.Replica. +type LoadedReplicaState struct { + ReplicaID roachpb.ReplicaID + LastIndex uint64 + ReplState kvserverpb.ReplicaState + + hardState raftpb.HardState +} + +// LoadReplicaState loads the state necessary to create a Replica with the +// specified range descriptor, which can be either initialized or uninitialized. +// TODO(pavelkalinnikov): integrate with stateloader. +func LoadReplicaState( + ctx context.Context, + eng storage.Reader, + desc *roachpb.RangeDescriptor, + replicaID roachpb.ReplicaID, +) (LoadedReplicaState, error) { + sl := stateloader.Make(desc.RangeID) + id, found, err := sl.LoadRaftReplicaID(ctx, eng) + if err != nil { + return LoadedReplicaState{}, err + } else if !found { + return LoadedReplicaState{}, errors.AssertionFailedf( + "r%d: RaftReplicaID not found", desc.RangeID) + } else if loaded := id.ReplicaID; loaded != replicaID { + return LoadedReplicaState{}, errors.AssertionFailedf( + "r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID) + } + + ls := LoadedReplicaState{ReplicaID: replicaID} + if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil { + return LoadedReplicaState{}, err + } + if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil { + return LoadedReplicaState{}, err + } + if ls.ReplState, err = sl.Load(ctx, eng, desc); err != nil { + return LoadedReplicaState{}, err + } + return ls, nil +} + +// Check makes sure that the replica invariants hold for the loaded state. +func (r LoadedReplicaState) Check(storeID roachpb.StoreID) error { + desc := r.ReplState.Desc + if r.ReplicaID == 0 { + return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID) + } + + if !desc.IsInitialized() { + // An uninitialized replica must have an empty HardState.Commit at all + // times. Failure to maintain this invariant indicates corruption. And yet, + // we have observed this in the wild. See #40213. + if hs := r.hardState; hs.Commit != 0 { + return errors.AssertionFailedf( + "r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.ReplicaID, hs) + } + // TODO(pavelkalinnikov): assert r.lastIndex == 0? + return nil + } + // desc.IsInitialized() == true + + // INVARIANT: a replica's RangeDescriptor always contains the local Store. + if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok { + return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID) + } else if replDesc.ReplicaID != r.ReplicaID { + return errors.AssertionFailedf( + "%+v does not contain replicaID %d for local store s%d", desc, r.ReplicaID, storeID) + } + return nil +} diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 7a3e4fd0e773..54fccc3d135a 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -20,7 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/split" @@ -28,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -36,7 +35,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "go.etcd.io/raft/v3" - "go.etcd.io/raft/v3/raftpb" ) const ( @@ -44,80 +42,6 @@ const ( mergeQueueThrottleDuration = 5 * time.Second ) -// loadedReplicaState represents the state of a Replica loaded from storage, and -// is used to initialize the in-memory Replica instance. -// TODO(pavelkalinnikov): move to kvstorage, integrate with kvstorage.Replica. -type loadedReplicaState struct { - replicaID roachpb.ReplicaID - hardState raftpb.HardState - lastIndex uint64 - replState kvserverpb.ReplicaState -} - -// loadReplicaState loads the state necessary to create a Replica with the -// specified range descriptor, which can be either initialized or uninitialized. -// TODO(pavelkalinnikov): integrate with stateloader. -func loadReplicaState( - ctx context.Context, - eng storage.Reader, - desc *roachpb.RangeDescriptor, - replicaID roachpb.ReplicaID, -) (loadedReplicaState, error) { - sl := stateloader.Make(desc.RangeID) - id, found, err := sl.LoadRaftReplicaID(ctx, eng) - if err != nil { - return loadedReplicaState{}, err - } else if !found { - return loadedReplicaState{}, errors.AssertionFailedf( - "r%d: RaftReplicaID not found", desc.RangeID) - } else if loaded := id.ReplicaID; loaded != replicaID { - return loadedReplicaState{}, errors.AssertionFailedf( - "r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID) - } - - ls := loadedReplicaState{replicaID: replicaID} - if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil { - return loadedReplicaState{}, err - } - if ls.lastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil { - return loadedReplicaState{}, err - } - if ls.replState, err = sl.Load(ctx, eng, desc); err != nil { - return loadedReplicaState{}, err - } - return ls, nil -} - -// check makes sure that the replica invariants hold for the loaded state. -func (r loadedReplicaState) check(storeID roachpb.StoreID) error { - desc := r.replState.Desc - if r.replicaID == 0 { - return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID) - } - - if !desc.IsInitialized() { - // An uninitialized replica must have an empty HardState.Commit at all - // times. Failure to maintain this invariant indicates corruption. And yet, - // we have observed this in the wild. See #40213. - if hs := r.hardState; hs.Commit != 0 { - return errors.AssertionFailedf( - "r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.replicaID, hs) - } - // TODO(pavelkalinnikov): assert r.lastIndex == 0? - return nil - } - // desc.IsInitialized() == true - - // INVARIANT: a replica's RangeDescriptor always contains the local Store. - if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok { - return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID) - } else if replDesc.ReplicaID != r.replicaID { - return errors.AssertionFailedf( - "%+v does not contain replicaID %d for local store s%d", desc, r.replicaID, storeID) - } - return nil -} - // loadInitializedReplica loads and constructs an initialized Replica, after // checking its invariants. func loadInitializedReplica( @@ -126,7 +50,7 @@ func loadInitializedReplica( if !desc.IsInitialized() { return nil, errors.AssertionFailedf("can not load with uninitialized descriptor: %s", desc) } - state, err := loadReplicaState(ctx, store.engine, desc, replicaID) + state, err := kvstorage.LoadReplicaState(ctx, store.engine, desc, replicaID) if err != nil { return nil, err } @@ -253,15 +177,15 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { // This method is called in: // - loadInitializedReplica, to finalize creating an initialized replica; // - splitPostApply, to initialize a previously uninitialized replica. -func (r *Replica) initRaftMuLockedReplicaMuLocked(s loadedReplicaState) error { - if err := s.check(r.StoreID()); err != nil { +func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState) error { + if err := s.Check(r.StoreID()); err != nil { return err } - desc := s.replState.Desc + desc := s.ReplState.Desc // Ensure that the loaded state corresponds to the same replica. - if desc.RangeID != r.RangeID || s.replicaID != r.replicaID { + if desc.RangeID != r.RangeID || s.ReplicaID != r.replicaID { return errors.AssertionFailedf( - "%s: trying to init with other replica's state r%d/%d", r, desc.RangeID, s.replicaID) + "%s: trying to init with other replica's state r%d/%d", r, desc.RangeID, s.ReplicaID) } // Ensure that we transition to initialized replica, and do it only once. if !desc.IsInitialized() { @@ -277,8 +201,8 @@ func (r *Replica) initRaftMuLockedReplicaMuLocked(s loadedReplicaState) error { // group. r.mu.internalRaftGroup = nil - r.mu.state = s.replState - r.mu.lastIndexNotDurable = s.lastIndex + r.mu.state = s.ReplState + r.mu.lastIndexNotDurable = s.LastIndex r.mu.lastTermNotDurable = invalidLastTerm r.setDescLockedRaftMuLocked(r.AnnotateCtx(context.TODO()), desc) diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 10b4d5efc848..79a408ffd1f0 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -15,6 +15,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -255,10 +256,10 @@ func (s *Store) tryGetOrCreateReplica( // Make sure that storage invariants for this uninitialized replica hold. uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID} - state, err := loadReplicaState(ctx, s.engine, &uninitDesc, replicaID) + state, err := kvstorage.LoadReplicaState(ctx, s.engine, &uninitDesc, replicaID) if err != nil { return nil, false, err - } else if err := state.check(s.StoreID()); err != nil { + } else if err := state.Check(s.StoreID()); err != nil { return nil, false, err } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 7c6f115b6f0f..64036b1bc1ad 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -14,6 +14,7 @@ import ( "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -261,7 +262,7 @@ func prepareRightReplicaForSplit( } // Finish initialization of the RHS. - if state, err := loadReplicaState( + if state, err := kvstorage.LoadReplicaState( ctx, r.store.engine, &split.RightDesc, rightRepl.replicaID, ); err != nil { log.Fatalf(ctx, "%v", err) From 57ee90bb7314fd05b3387c07da75cd6594757fc1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 6 Feb 2023 20:32:47 +0000 Subject: [PATCH 3/5] kvserver: avoid duplicate loading in Store.Start Release note: none Epic: none --- pkg/kv/kvserver/kvstorage/init.go | 18 ++++++++++++++++++ pkg/kv/kvserver/replica_init.go | 9 +++++++-- pkg/kv/kvserver/store.go | 10 ++++++---- 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/init.go b/pkg/kv/kvserver/kvstorage/init.go index 00d76d452460..f0c1356102d2 100644 --- a/pkg/kv/kvserver/kvstorage/init.go +++ b/pkg/kv/kvserver/kvstorage/init.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -312,6 +313,23 @@ func (r Replica) ID() storage.FullReplicaID { } } +// Load loads the state necessary to instantiate a replica in memory. +func (r Replica) Load(ctx context.Context, eng storage.Reader) (LoadedReplicaState, error) { + ls := LoadedReplicaState{ + ReplicaID: r.ReplicaID, + hardState: r.hardState, + } + sl := stateloader.Make(r.Desc.RangeID) + var err error + if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil { + return LoadedReplicaState{}, err + } + if ls.ReplState, err = sl.Load(ctx, eng, r.Desc); err != nil { + return LoadedReplicaState{}, err + } + return ls, nil +} + // A replicaMap organizes a set of Replicas with unique RangeIDs. type replicaMap map[roachpb.RangeID]Replica diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 54fccc3d135a..2b355ad9ae17 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -54,12 +54,17 @@ func loadInitializedReplica( if err != nil { return nil, err } - r := newUninitializedReplica(store, desc.RangeID, replicaID) + return newInitializedReplica(store, state) +} + +// newInitializedReplica creates an initialized Replica from its loaded state. +func newInitializedReplica(store *Store, loaded kvstorage.LoadedReplicaState) (*Replica, error) { + r := newUninitializedReplica(store, loaded.ReplState.Desc.RangeID, loaded.ReplicaID) r.raftMu.Lock() defer r.raftMu.Unlock() r.mu.Lock() defer r.mu.Unlock() - if err := r.initRaftMuLockedReplicaMuLocked(state); err != nil { + if err := r.initRaftMuLockedReplicaMuLocked(loaded); err != nil { return nil, err } return r, nil diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index c39bf59708a2..19de32acc82b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1861,10 +1861,12 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { // Uninitialized Replicas are not currently instantiated at store start. continue } - // TODO(sep-raft-log): construct the loaded Replica state directly in - // LoadAndReconcileReplicas, which loads and checks most of it already, then - // feed it to Replica creation, to avoid double-loading. - rep, err := loadInitializedReplica(ctx, s, repl.Desc, repl.ReplicaID) + // TODO(pavelkalinnikov): integrate into kvstorage.LoadAndReconcileReplicas. + state, err := repl.Load(ctx, s.Engine()) + if err != nil { + return err + } + rep, err := newInitializedReplica(s, state) if err != nil { return err } From 5e241b456d591c4f4b1e67a51d18f7594c51bb53 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 6 Feb 2023 20:35:42 +0000 Subject: [PATCH 4/5] kvserver: rename the test-only replica loading func Release note: none Epic: none --- pkg/kv/kvserver/replica_init.go | 8 ++++---- pkg/kv/kvserver/split_queue_test.go | 2 +- pkg/kv/kvserver/store_pool_test.go | 2 +- pkg/kv/kvserver/store_test.go | 2 +- pkg/kv/kvserver/stores_test.go | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 2b355ad9ae17..f6bc6f2afa85 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -42,9 +42,9 @@ const ( mergeQueueThrottleDuration = 5 * time.Second ) -// loadInitializedReplica loads and constructs an initialized Replica, after -// checking its invariants. -func loadInitializedReplica( +// loadInitializedReplicaForTesting loads and constructs an initialized Replica, +// after checking its invariants. +func loadInitializedReplicaForTesting( ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, ) (*Replica, error) { if !desc.IsInitialized() { @@ -180,7 +180,7 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { // loaded from storage. Must not be called more than once on a Replica. // // This method is called in: -// - loadInitializedReplica, to finalize creating an initialized replica; +// - loadInitializedReplicaForTesting, to finalize creating an initialized replica; // - splitPostApply, to initialize a previously uninitialized replica. func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState) error { if err := s.Check(r.StoreID()); err != nil { diff --git a/pkg/kv/kvserver/split_queue_test.go b/pkg/kv/kvserver/split_queue_test.go index 53f9503dcdeb..9f3fe38cdf92 100644 --- a/pkg/kv/kvserver/split_queue_test.go +++ b/pkg/kv/kvserver/split_queue_test.go @@ -100,7 +100,7 @@ func TestSplitQueueShouldQueue(t *testing.T) { replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID require.NoError(t, logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID)) - repl, err := loadInitializedReplica(ctx, tc.store, &cpy, replicaID) + repl, err := loadInitializedReplicaForTesting(ctx, tc.store, &cpy, replicaID) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/store_pool_test.go b/pkg/kv/kvserver/store_pool_test.go index cef730713330..8a05320d9d54 100644 --- a/pkg/kv/kvserver/store_pool_test.go +++ b/pkg/kv/kvserver/store_pool_test.go @@ -223,7 +223,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) { const replicaID = 1 require.NoError(t, logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := loadInitializedReplica(ctx, store, &rg, replicaID) + replica, err := loadInitializedReplicaForTesting(ctx, store, &rg, replicaID) if err != nil { t.Fatalf("make replica error : %+v", err) } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index b072da8a5a9c..9cd597593ac4 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -546,7 +546,7 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) * ); err != nil { panic(err) } - r, err := loadInitializedReplica(ctx, s, desc, replicaID) + r, err := loadInitializedReplicaForTesting(ctx, s, desc, replicaID) if err != nil { panic(err) } diff --git a/pkg/kv/kvserver/stores_test.go b/pkg/kv/kvserver/stores_test.go index 6104c48f6d63..429b0fbc83cd 100644 --- a/pkg/kv/kvserver/stores_test.go +++ b/pkg/kv/kvserver/stores_test.go @@ -157,7 +157,7 @@ func TestStoresGetReplicaForRangeID(t *testing.T) { require.NoError(t, logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID)) - replica, err := loadInitializedReplica(ctx, store, desc, replicaID) + replica, err := loadInitializedReplicaForTesting(ctx, store, desc, replicaID) if err != nil { t.Fatalf("unexpected error when creating replica: %+v", err) } From 574a4d7d752add850f8fcaec9bc9df77f8ec7381 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 7 Feb 2023 12:26:21 +0000 Subject: [PATCH 5/5] kvstorage: check replica invariants in Load Release note: none Epic: none --- pkg/kv/kvserver/kvstorage/init.go | 8 +++++++- pkg/kv/kvserver/kvstorage/replica_state.go | 10 ++++++++-- pkg/kv/kvserver/replica_init.go | 5 +---- pkg/kv/kvserver/store.go | 2 +- pkg/kv/kvserver/store_create_replica.go | 4 +--- pkg/kv/kvserver/store_split.go | 2 +- 6 files changed, 19 insertions(+), 12 deletions(-) diff --git a/pkg/kv/kvserver/kvstorage/init.go b/pkg/kv/kvserver/kvstorage/init.go index f0c1356102d2..f791d8d99217 100644 --- a/pkg/kv/kvserver/kvstorage/init.go +++ b/pkg/kv/kvserver/kvstorage/init.go @@ -314,7 +314,9 @@ func (r Replica) ID() storage.FullReplicaID { } // Load loads the state necessary to instantiate a replica in memory. -func (r Replica) Load(ctx context.Context, eng storage.Reader) (LoadedReplicaState, error) { +func (r Replica) Load( + ctx context.Context, eng storage.Reader, storeID roachpb.StoreID, +) (LoadedReplicaState, error) { ls := LoadedReplicaState{ ReplicaID: r.ReplicaID, hardState: r.hardState, @@ -327,6 +329,10 @@ func (r Replica) Load(ctx context.Context, eng storage.Reader) (LoadedReplicaSta if ls.ReplState, err = sl.Load(ctx, eng, r.Desc); err != nil { return LoadedReplicaState{}, err } + + if err := ls.check(storeID); err != nil { + return LoadedReplicaState{}, err + } return ls, nil } diff --git a/pkg/kv/kvserver/kvstorage/replica_state.go b/pkg/kv/kvserver/kvstorage/replica_state.go index 74080e2f9fa2..245edb789edb 100644 --- a/pkg/kv/kvserver/kvstorage/replica_state.go +++ b/pkg/kv/kvserver/kvstorage/replica_state.go @@ -34,10 +34,12 @@ type LoadedReplicaState struct { // LoadReplicaState loads the state necessary to create a Replica with the // specified range descriptor, which can be either initialized or uninitialized. +// It also verifies replica state invariants. // TODO(pavelkalinnikov): integrate with stateloader. func LoadReplicaState( ctx context.Context, eng storage.Reader, + storeID roachpb.StoreID, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID, ) (LoadedReplicaState, error) { @@ -63,11 +65,15 @@ func LoadReplicaState( if ls.ReplState, err = sl.Load(ctx, eng, desc); err != nil { return LoadedReplicaState{}, err } + + if err := ls.check(storeID); err != nil { + return LoadedReplicaState{}, err + } return ls, nil } -// Check makes sure that the replica invariants hold for the loaded state. -func (r LoadedReplicaState) Check(storeID roachpb.StoreID) error { +// check makes sure that the replica invariants hold for the loaded state. +func (r LoadedReplicaState) check(storeID roachpb.StoreID) error { desc := r.ReplState.Desc if r.ReplicaID == 0 { return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID) diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index f6bc6f2afa85..4b8caf4aa77c 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -50,7 +50,7 @@ func loadInitializedReplicaForTesting( if !desc.IsInitialized() { return nil, errors.AssertionFailedf("can not load with uninitialized descriptor: %s", desc) } - state, err := kvstorage.LoadReplicaState(ctx, store.engine, desc, replicaID) + state, err := kvstorage.LoadReplicaState(ctx, store.engine, store.StoreID(), desc, replicaID) if err != nil { return nil, err } @@ -183,9 +183,6 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { // - loadInitializedReplicaForTesting, to finalize creating an initialized replica; // - splitPostApply, to initialize a previously uninitialized replica. func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState) error { - if err := s.Check(r.StoreID()); err != nil { - return err - } desc := s.ReplState.Desc // Ensure that the loaded state corresponds to the same replica. if desc.RangeID != r.RangeID || s.ReplicaID != r.replicaID { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 19de32acc82b..6bccb85a8b63 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1862,7 +1862,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { continue } // TODO(pavelkalinnikov): integrate into kvstorage.LoadAndReconcileReplicas. - state, err := repl.Load(ctx, s.Engine()) + state, err := repl.Load(ctx, s.Engine(), s.StoreID()) if err != nil { return err } diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index 79a408ffd1f0..edd8d4de8313 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -256,11 +256,9 @@ func (s *Store) tryGetOrCreateReplica( // Make sure that storage invariants for this uninitialized replica hold. uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID} - state, err := kvstorage.LoadReplicaState(ctx, s.engine, &uninitDesc, replicaID) + _, err := kvstorage.LoadReplicaState(ctx, s.Engine(), s.StoreID(), &uninitDesc, replicaID) if err != nil { return nil, false, err - } else if err := state.Check(s.StoreID()); err != nil { - return nil, false, err } // Create a new uninitialized replica and lock it for raft processing. diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 64036b1bc1ad..7f71f16f277a 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -263,7 +263,7 @@ func prepareRightReplicaForSplit( // Finish initialization of the RHS. if state, err := kvstorage.LoadReplicaState( - ctx, r.store.engine, &split.RightDesc, rightRepl.replicaID, + ctx, r.Engine(), r.StoreID(), &split.RightDesc, rightRepl.replicaID, ); err != nil { log.Fatalf(ctx, "%v", err) } else if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil {