Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: factor out replica loading phase #96424

Merged
merged 5 commits into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ go_library(
"cluster_version.go",
"doc.go",
"init.go",
"replica_state.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/logstore",
"//pkg/kv/kvserver/stateloader",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/util/hlc",
Expand Down
24 changes: 24 additions & 0 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -312,6 +313,29 @@ func (r Replica) ID() storage.FullReplicaID {
}
}

// Load loads the state necessary to instantiate a replica in memory.
func (r Replica) Load(
ctx context.Context, eng storage.Reader, storeID roachpb.StoreID,
) (LoadedReplicaState, error) {
ls := LoadedReplicaState{
ReplicaID: r.ReplicaID,
hardState: r.hardState,
}
sl := stateloader.Make(r.Desc.RangeID)
var err error
if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil {
return LoadedReplicaState{}, err
}
if ls.ReplState, err = sl.Load(ctx, eng, r.Desc); err != nil {
return LoadedReplicaState{}, err
}

if err := ls.check(storeID); err != nil {
return LoadedReplicaState{}, err
}
return ls, nil
}

// A replicaMap organizes a set of Replicas with unique RangeIDs.
type replicaMap map[roachpb.RangeID]Replica

Expand Down
103 changes: 103 additions & 0 deletions pkg/kv/kvserver/kvstorage/replica_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvstorage

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/errors"
"go.etcd.io/raft/v3/raftpb"
)

// LoadedReplicaState represents the state of a Replica loaded from storage, and
// is used to initialize the in-memory Replica instance.
// TODO(pavelkalinnikov): integrate with kvstorage.Replica.
type LoadedReplicaState struct {
ReplicaID roachpb.ReplicaID
LastIndex uint64
ReplState kvserverpb.ReplicaState

hardState raftpb.HardState
}

// LoadReplicaState loads the state necessary to create a Replica with the
// specified range descriptor, which can be either initialized or uninitialized.
// It also verifies replica state invariants.
// TODO(pavelkalinnikov): integrate with stateloader.
func LoadReplicaState(
ctx context.Context,
eng storage.Reader,
storeID roachpb.StoreID,
desc *roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
) (LoadedReplicaState, error) {
sl := stateloader.Make(desc.RangeID)
id, found, err := sl.LoadRaftReplicaID(ctx, eng)
if err != nil {
return LoadedReplicaState{}, err
} else if !found {
return LoadedReplicaState{}, errors.AssertionFailedf(
"r%d: RaftReplicaID not found", desc.RangeID)
} else if loaded := id.ReplicaID; loaded != replicaID {
return LoadedReplicaState{}, errors.AssertionFailedf(
"r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID)
}

ls := LoadedReplicaState{ReplicaID: replicaID}
if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil {
return LoadedReplicaState{}, err
}
if ls.LastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil {
return LoadedReplicaState{}, err
}
if ls.ReplState, err = sl.Load(ctx, eng, desc); err != nil {
return LoadedReplicaState{}, err
}

if err := ls.check(storeID); err != nil {
return LoadedReplicaState{}, err
}
return ls, nil
}

// check makes sure that the replica invariants hold for the loaded state.
func (r LoadedReplicaState) check(storeID roachpb.StoreID) error {
desc := r.ReplState.Desc
if r.ReplicaID == 0 {
return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID)
}

if !desc.IsInitialized() {
// An uninitialized replica must have an empty HardState.Commit at all
// times. Failure to maintain this invariant indicates corruption. And yet,
// we have observed this in the wild. See #40213.
if hs := r.hardState; hs.Commit != 0 {
return errors.AssertionFailedf(
"r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.ReplicaID, hs)
}
// TODO(pavelkalinnikov): assert r.lastIndex == 0?
return nil
}
// desc.IsInitialized() == true

// INVARIANT: a replica's RangeDescriptor always contains the local Store.
if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok {
return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID)
} else if replDesc.ReplicaID != r.ReplicaID {
return errors.AssertionFailedf(
"%+v does not contain replicaID %d for local store s%d", desc, r.ReplicaID, storeID)
}
return nil
}
121 changes: 49 additions & 72 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
Expand All @@ -41,45 +42,43 @@ const (
mergeQueueThrottleDuration = 5 * time.Second
)

// newReplica constructs a new Replica. If the desc is initialized, the store
// must be present in it and the corresponding replica descriptor must have
// replicaID as its ReplicaID.
func newReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
// loadInitializedReplicaForTesting loads and constructs an initialized Replica,
// after checking its invariants.
func loadInitializedReplicaForTesting(
ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID,
) (*Replica, error) {
repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID)
repl.raftMu.Lock()
defer repl.raftMu.Unlock()
repl.mu.Lock()
defer repl.mu.Unlock()

// TODO(pavelkalinnikov): this path is taken only in tests. Remove it and
// assert desc.IsInitialized().
if !desc.IsInitialized() {
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine())
return repl, nil
return nil, errors.AssertionFailedf("can not load with uninitialized descriptor: %s", desc)
}
state, err := kvstorage.LoadReplicaState(ctx, store.engine, store.StoreID(), desc, replicaID)
if err != nil {
return nil, err
}
return newInitializedReplica(store, state)
}

if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil {
// newInitializedReplica creates an initialized Replica from its loaded state.
func newInitializedReplica(store *Store, loaded kvstorage.LoadedReplicaState) (*Replica, error) {
r := newUninitializedReplica(store, loaded.ReplState.Desc.RangeID, loaded.ReplicaID)
r.raftMu.Lock()
defer r.raftMu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
if err := r.initRaftMuLockedReplicaMuLocked(loaded); err != nil {
return nil, err
}
return repl, nil
return r, nil
}

// newUnloadedReplica partially constructs a Replica. The returned replica is
// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked()
// is called with the correct descriptor. The primary reason this function
// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid
// attempting to fully construct a Replica and load it from storage prior to
// proving that it can exist during the delicate synchronization dance in
// Store.tryGetOrCreateReplica(). A Replica returned from this function must not
// be used in any way until the load method has been called.
func newUnloadedReplica(
ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID,
// newUninitializedReplica constructs an uninitialized Replica with the given
// range/replica ID. The returned replica remains uninitialized until
// Replica.loadRaftMuLockedReplicaMuLocked() is called.
//
// TODO(#94912): we actually have another initialization path which should be
// refactored: Store.maybeMarkReplicaInitializedLockedReplLocked().
func newUninitializedReplica(
store *Store, rangeID roachpb.RangeID, replicaID roachpb.ReplicaID,
) *Replica {
if replicaID == 0 {
log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID)
}
uninitState := stateloader.UninitializedReplicaState(rangeID)
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
Expand Down Expand Up @@ -177,58 +176,38 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) {
r.startKey = startKey
}

// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica
// from storage. After this method returns, Replica is initialized, and can not
// be loaded again.
//
// This method is called in two places:
// initRaftMuLockedReplicaMuLocked initializes the Replica using the state
// loaded from storage. Must not be called more than once on a Replica.
//
// 1. newReplica - used when the store is initializing and during testing
// 2. splitPostApply - this call initializes a previously uninitialized Replica.
func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error {
ctx := r.AnnotateCtx(context.TODO())
if !desc.IsInitialized() {
return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID)
// This method is called in:
// - loadInitializedReplicaForTesting, to finalize creating an initialized replica;
// - splitPostApply, to initialize a previously uninitialized replica.
func (r *Replica) initRaftMuLockedReplicaMuLocked(s kvstorage.LoadedReplicaState) error {
desc := s.ReplState.Desc
// Ensure that the loaded state corresponds to the same replica.
if desc.RangeID != r.RangeID || s.ReplicaID != r.replicaID {
return errors.AssertionFailedf(
"%s: trying to init with other replica's state r%d/%d", r, desc.RangeID, s.ReplicaID)
}
if r.IsInitialized() {
return errors.AssertionFailedf("r%d: cannot reinitialize an initialized replica", desc.RangeID)
} else if r.replicaID == 0 {
// NB: This is just a defensive check as r.mu.replicaID should never be 0.
return errors.AssertionFailedf("r%d: cannot initialize replica without a replicaID",
desc.RangeID)
// Ensure that we transition to initialized replica, and do it only once.
if !desc.IsInitialized() {
return errors.AssertionFailedf("%s: cannot init replica with uninitialized descriptor", r)
} else if r.IsInitialized() {
return errors.AssertionFailedf("%s: cannot reinitialize an initialized replica", r)
}

r.setStartKeyLocked(desc.StartKey)

// Clear the internal raft group in case we're being reset. Since we're
// reloading the raft state below, it isn't safe to use the existing raft
// group.
r.mu.internalRaftGroup = nil

var err error
if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.Engine(), desc); err != nil {
return err
}
r.mu.lastIndexNotDurable, err = r.mu.stateLoader.LoadLastIndex(ctx, r.Engine())
if err != nil {
return err
}
r.mu.state = s.ReplState
r.mu.lastIndexNotDurable = s.LastIndex
r.mu.lastTermNotDurable = invalidLastTerm

// Ensure that we're not trying to load a replica with a different ID than
// was used to construct this Replica.
var replicaID roachpb.ReplicaID
if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found {
replicaID = replicaDesc.ReplicaID
} else {
return errors.AssertionFailedf("r%d: cannot initialize replica which is not in descriptor %v",
desc.RangeID, desc)
}
if r.replicaID != replicaID {
return errors.AssertionFailedf("attempting to initialize a replica which has ID %d with ID %d",
r.replicaID, replicaID)
}

r.setDescLockedRaftMuLocked(ctx, desc)
r.setDescLockedRaftMuLocked(r.AnnotateCtx(context.TODO()), desc)

// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
Expand All @@ -242,8 +221,6 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp()
}

r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestSplitQueueShouldQueue(t *testing.T) {
replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID
require.NoError(t,
logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID))
repl, err := newReplica(ctx, &cpy, tc.store, replicaID)
repl, err := loadInitializedReplicaForTesting(ctx, tc.store, &cpy, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,7 +1861,12 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// Uninitialized Replicas are not currently instantiated at store start.
continue
}
rep, err := newReplica(ctx, repl.Desc, s, repl.ReplicaID)
// TODO(pavelkalinnikov): integrate into kvstorage.LoadAndReconcileReplicas.
state, err := repl.Load(ctx, s.Engine(), s.StoreID())
if err != nil {
return err
}
rep, err := newInitializedReplica(s, state)
if err != nil {
return err
}
Expand Down
Loading