Skip to content

Commit

Permalink
kvserver: factor out replica loading phase
Browse files Browse the repository at this point in the history
This commit factors out Replica state loading and invariant checking so that it
is more consolidated rather than interleaved into Replica creation. Replica was
loading the state, to assert that the in-memory state matches the state in
storage post-factum. Now this is a pre-requisite and input into creating the
Replica, and turning it from uninitialized to initialized state.

This change also eliminates the concept of "unloaded" Replica. Now the Replica
is created in a valid state (either initialized or uninitialized).

Release note: none
Epic: none
  • Loading branch information
pav-kv committed Feb 2, 2023
1 parent 10ef5d9 commit 9fc6141
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 104 deletions.
183 changes: 116 additions & 67 deletions pkg/kv/kvserver/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,77 +20,131 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/load"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

const (
splitQueueThrottleDuration = 5 * time.Second
mergeQueueThrottleDuration = 5 * time.Second
)

// newReplica constructs a new Replica. If the desc is initialized, the store
// must be present in it and the corresponding replica descriptor must have
// replicaID as its ReplicaID.
func newReplica(
ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID,
) (*Replica, error) {
repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID)
repl.raftMu.Lock()
defer repl.raftMu.Unlock()
repl.mu.Lock()
defer repl.mu.Unlock()

// TODO(pavelkalinnikov): this path is taken only in tests. Remove it and
// assert desc.IsInitialized().
// loadedReplicaState represents the state of a Replica loaded from storage, and
// is used to initialize the in-memory Replica instance.
type loadedReplicaState struct {
replicaID roachpb.ReplicaID
hardState raftpb.HardState
lastIndex uint64
replState kvserverpb.ReplicaState
}

// loadReplicaState loads the state necessary to create a Replica from storage.
func loadReplicaState(
ctx context.Context,
eng storage.Reader,
desc *roachpb.RangeDescriptor,
replicaID roachpb.ReplicaID,
) (loadedReplicaState, error) {
sl := stateloader.Make(desc.RangeID)
id, found, err := sl.LoadRaftReplicaID(ctx, eng)
if err != nil {
return loadedReplicaState{}, err
} else if !found {
return loadedReplicaState{}, errors.AssertionFailedf(
"r%d: RaftReplicaID not found", desc.RangeID)
} else if loaded := id.ReplicaID; loaded != replicaID {
return loadedReplicaState{}, errors.AssertionFailedf(
"r%d: loaded RaftReplicaID %d does not match %d", desc.RangeID, loaded, replicaID)
}

ls := loadedReplicaState{replicaID: replicaID}
if ls.hardState, err = sl.LoadHardState(ctx, eng); err != nil {
return loadedReplicaState{}, err
}
if ls.lastIndex, err = sl.LoadLastIndex(ctx, eng); err != nil {
return loadedReplicaState{}, err
}
if ls.replState, err = sl.Load(ctx, eng, desc); err != nil {
return loadedReplicaState{}, err
}
return ls, nil
}

// check makes sure that the replica invariants hold for the loaded state.
func (r loadedReplicaState) check(storeID roachpb.StoreID) error {
desc := r.replState.Desc
if id := r.replicaID; id == 0 {
return errors.AssertionFailedf("r%d: replicaID is 0", desc.RangeID)
}

if !desc.IsInitialized() {
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine())
return repl, nil
// An uninitialized replica must have an empty HardState.Commit at all
// times. Failure to maintain this invariant indicates corruption. And yet,
// we have observed this in the wild. See #40213.
if hs := r.hardState; hs.Commit != 0 {
return errors.AssertionFailedf(
"r%d/%d: non-zero HardState.Commit on uninitialized replica: %+v", desc.RangeID, r.replicaID, hs)
}
return nil
}
// desc.IsInitialized() == true

if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil {
return nil, err
if replDesc, ok := desc.GetReplicaDescriptor(storeID); !ok {
return errors.AssertionFailedf("%+v does not contain local store s%d", desc, storeID)
} else if replDesc.ReplicaID != r.replicaID {
return errors.AssertionFailedf(
"%+v does not contain replicaID %d for local store s%d", desc, r.replicaID, storeID)
}
return repl, nil
return nil
}

// newUnloadedReplica partially constructs a Replica. The returned replica is
// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked()
// is called with the correct descriptor. The primary reason this function
// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid
// attempting to fully construct a Replica and load it from storage prior to
// proving that it can exist during the delicate synchronization dance in
// Store.tryGetOrCreateReplica(). A Replica returned from this function must not
// be used in any way until the load method has been called.
func newUnloadedReplica(
ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID,
) *Replica {
if replicaID == 0 {
log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID)
// loadReplica loads and constructs a Replica, after checking its invariants.
func loadReplica(
ctx context.Context, store *Store, desc *roachpb.RangeDescriptor, replicaID roachpb.ReplicaID,
) (*Replica, error) {
state, err := loadReplicaState(ctx, store.engine, desc, replicaID)
if err != nil {
return nil, err
}
if err := state.check(store.StoreID()); err != nil {
return nil, err
}
uninitState := stateloader.UninitializedReplicaState(rangeID)
return newReplica(ctx, store, state), nil
}

// newReplica constructs a Replica using its state loaded from storage. It may
// return either an initialized, or an uninitialized Replica, depending on the
// loaded state. If the returned replica is uninitialized, it remains such until
// Replica.initRaftMuLockedReplicaMuLocked() is called.
func newReplica(ctx context.Context, store *Store, loaded loadedReplicaState) *Replica {
state := loaded.replState
rangeID := state.Desc.RangeID
r := &Replica{
AmbientContext: store.cfg.AmbientCtx,
RangeID: rangeID,
replicaID: replicaID,
replicaID: loaded.replicaID,
startKey: state.Desc.StartKey,
creationTime: timeutil.Now(),
store: store,
abortSpan: abortspan.New(rangeID),
concMgr: concurrency.NewManager(concurrency.Config{
NodeDesc: store.nodeDesc,
RangeDesc: uninitState.Desc,
RangeDesc: state.Desc,
Settings: store.ClusterSettings(),
DB: store.DB(),
Clock: store.Clock(),
Expand Down Expand Up @@ -127,9 +181,23 @@ func newUnloadedReplica(
r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString)
}

// NB: the state will be loaded when the replica gets initialized.
r.mu.state = uninitState
r.rangeStr.store(replicaID, uninitState.Desc)
r.mu.state = state
// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
// to a zero start timestamp (and this de-flakes some tests). If we set the
// min proposed TS here, this lease could not be renewed (by the semantics
// of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits,
// this problem would multiply to a number of replicas at cluster bootstrap.
// Instead, we make the first lease special (which is OK) and the problem
// disappears.
if r.mu.state.Lease.Sequence > 0 {
r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp()
}
r.mu.lastIndex = loaded.lastIndex
r.mu.lastTerm = invalidLastTerm
// NB: mutexes don't need to be locked on Replica struct initialization.
r.setDescLockedRaftMuLocked(r.AnnotateCtx(ctx), state.Desc)

// Add replica log tag - the value is rangeStr.String().
r.AmbientContext.AddLogTag("r", &r.rangeStr)
r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */)
Expand Down Expand Up @@ -160,6 +228,7 @@ func newUnloadedReplica(
r.breaker = newReplicaCircuitBreaker(
store.cfg.Settings, store.stopper, r.AmbientContext, r, onTrip, onReset,
)

return r
}

Expand All @@ -177,16 +246,15 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) {
r.startKey = startKey
}

// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica
// from storage. After this method returns, Replica is initialized, and can not
// be loaded again.
// initRaftMuLockedReplicaMuLocked initializes the Replica using the state
// loaded from storage. Must not be called more than once on a Replica.
//
// This method is called in two places:
//
// 1. newReplica - used when the store is initializing and during testing
// 2. splitPostApply - this call initializes a previously uninitialized Replica.
func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error {
// This method is called only in splitPostApply to initialize a previously
// uninitialized Replica.
func (r *Replica) initRaftMuLockedReplicaMuLocked(s loadedReplicaState) error {
ctx := r.AnnotateCtx(context.TODO())
desc := s.replState.Desc

if !desc.IsInitialized() {
return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID)
}
Expand All @@ -204,14 +272,8 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
// group.
r.mu.internalRaftGroup = nil

var err error
if r.mu.state, err = r.mu.stateLoader.Load(ctx, r.Engine(), desc); err != nil {
return err
}
r.mu.lastIndex, err = r.mu.stateLoader.LoadLastIndex(ctx, r.Engine())
if err != nil {
return err
}
r.mu.state = s.replState
r.mu.lastIndex = s.lastIndex
r.mu.lastTerm = invalidLastTerm

// Ensure that we're not trying to load a replica with a different ID than
Expand All @@ -229,19 +291,6 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor)
}

r.setDescLockedRaftMuLocked(ctx, desc)

// Only do this if there was a previous lease. This shouldn't be important
// to do but consider that the first lease which is obtained is back-dated
// to a zero start timestamp (and this de-flakes some tests). If we set the
// min proposed TS here, this lease could not be renewed (by the semantics
// of minLeaseProposedTS); and since minLeaseProposedTS is copied on splits,
// this problem would multiply to a number of replicas at cluster bootstrap.
// Instead, we make the first lease special (which is OK) and the problem
// disappears.
if r.mu.state.Lease.Sequence > 0 {
r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp()
}

r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine())

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestSplitQueueShouldQueue(t *testing.T) {
replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID
require.NoError(t,
logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID))
repl, err := newReplica(ctx, &cpy, tc.store, replicaID)
repl, err := loadReplica(ctx, tc.store, &cpy, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
return err
}
for fullID, desc := range engRepls.Initialized {
rep, err := newReplica(ctx, desc, s, fullID.ReplicaID)
rep, err := loadReplica(ctx, s, desc, fullID.ReplicaID)
if err != nil {
return err
}
Expand Down
24 changes: 6 additions & 18 deletions pkg/kv/kvserver/store_create_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,6 @@ func (s *Store) tryGetOrCreateReplica(
return nil, false, &roachpb.RaftGroupDeletedError{}
}

// An uninitialized replica must have an empty HardState.Commit at all times.
// Failure to maintain this invariant indicates corruption. And yet, we have
// observed this in the wild. See #40213.
sl := stateloader.Make(rangeID)
if hs, err := sl.LoadHardState(ctx, s.Engine()); err != nil {
return nil, false, err
} else if hs.Commit != 0 {
log.Fatalf(ctx, "found non-zero HardState.Commit on uninitialized replica r%d/%d. HS=%+v",
rangeID, replicaID, hs)
}
// Write the RaftReplicaID for this replica. This is the only place in the
// CockroachDB code that we are creating a new *uninitialized* replica.
// Note that it is possible that we have already created the HardState for
Expand Down Expand Up @@ -281,20 +271,18 @@ func (s *Store) tryGetOrCreateReplica(
// power-cycled, losing the snapshot) the fire-and-forget way in which
// raft votes are requested (in the same raft cycle) makes it extremely
// unlikely that the restarted node would then receive it.
sl := stateloader.Make(rangeID)
if err := sl.SetRaftReplicaID(ctx, s.Engine(), replicaID); err != nil {
return nil, false, err
}

// Create a new uninitialized replica and lock it for raft processing.
repl := newUnloadedReplica(ctx, rangeID, s, replicaID)
uninitDesc := roachpb.RangeDescriptor{RangeID: rangeID}
repl, err := loadReplica(ctx, s, &uninitDesc, replicaID)
if err != nil {
return nil, false, err
}
repl.raftMu.Lock() // not unlocked
repl.mu.Lock()
// TODO(pavelkalinnikov): there is little benefit in this check, since loading
// ReplicaID is a no-op after the above write, and the ReplicaState load is
// only for making sure it's empty. Distill the useful IO and make its result
// the direct input into Replica creation, then this check won't be needed.
repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, s.Engine())
repl.mu.Unlock()

// Install the replica in the store's replica map.
s.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
replica, err := newReplica(ctx, &rg, store, replicaID)
replica, err := loadReplica(ctx, store, &rg, replicaID)
if err != nil {
t.Fatalf("make replica error : %+v", err)
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,13 @@ func prepareRightReplicaForSplit(
}

// Finish initialization of the RHS.
err := rightRepl.loadRaftMuLockedReplicaMuLocked(&split.RightDesc)
if err != nil {
if state, err := loadReplicaState(
ctx, r.store.engine, &split.RightDesc, rightRepl.replicaID,
); err != nil {
log.Fatalf(ctx, "%v", err)
} else if err := state.check(r.store.StoreID()); err != nil {
log.Fatalf(ctx, "%v", err)
} else if err := rightRepl.initRaftMuLockedReplicaMuLocked(state); err != nil {
log.Fatalf(ctx, "%v", err)
}

Expand Down Expand Up @@ -291,10 +296,9 @@ func prepareRightReplicaForSplit(
// until it receives a Raft message addressed to the right-hand range. But
// since new replicas start out quiesced, unless we explicitly awaken the
// Raft group, there might not be any Raft traffic for quite a while.
err = rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
if err := rightRepl.withRaftGroupLocked(true, func(r *raft.RawNode) (unquiesceAndWakeLeader bool, _ error) {
return true, nil
})
if err != nil {
}); err != nil {
log.Fatalf(ctx, "unable to create raft group for right-hand range in split: %+v", err)
}

Expand Down
16 changes: 6 additions & 10 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) *
); err != nil {
panic(err)
}
r, err := newReplica(ctx, desc, s, replicaID)
r, err := loadReplica(ctx, s, desc, replicaID)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -834,17 +834,13 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {
}

newRangeID := roachpb.RangeID(3)
desc := &roachpb.RangeDescriptor{
RangeID: newRangeID,
}

const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
r, err := newReplica(ctx, desc, store, replicaID)
if err != nil {
t.Fatal(err)
}
logstore.NewStateLoader(newRangeID).SetRaftReplicaID(ctx, store.engine, replicaID))

state := stateloader.UninitializedReplicaState(newRangeID)
desc := state.Desc
r := newReplica(ctx, store, loadedReplicaState{replicaID: replicaID, replState: state})

store.mu.Lock()
defer store.mu.Unlock()
Expand Down
Loading

0 comments on commit 9fc6141

Please sign in to comment.