diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index f19f79353f70..24c11e1c27f5 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -251,7 +251,8 @@ type Replica struct { // Note that there are two StateLoaders, in raftMu and mu, // depending on which lock is being held. stateLoader stateloader.StateLoader - // on-disk storage for sideloaded SSTables. nil when there's no ReplicaID. + // on-disk storage for sideloaded SSTables. Always non-nil. + // TODO(pavelkalinnikov): remove sideloaded == nil checks. sideloaded logstore.SideloadStorage // stateMachine is used to apply committed raft entries. stateMachine replicaStateMachine diff --git a/pkg/kv/kvserver/replica_init.go b/pkg/kv/kvserver/replica_init.go index 41f2fd877c47..b5964c917b3b 100644 --- a/pkg/kv/kvserver/replica_init.go +++ b/pkg/kv/kvserver/replica_init.go @@ -47,39 +47,50 @@ const ( func newReplica( ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID, ) (*Replica, error) { - repl := newUnloadedReplica(ctx, desc, store, replicaID) + repl := newUnloadedReplica(ctx, desc.RangeID, store, replicaID) repl.raftMu.Lock() defer repl.raftMu.Unlock() repl.mu.Lock() defer repl.mu.Unlock() + + // TODO(pavelkalinnikov): this path is taken only in tests. Remove it and + // assert desc.IsInitialized(). + if !desc.IsInitialized() { + repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, store.Engine()) + return repl, nil + } + if err := repl.loadRaftMuLockedReplicaMuLocked(desc); err != nil { return nil, err } return repl, nil } -// newUnloadedReplica partially constructs a replica. The primary reason this -// function exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is -// to avoid attempting to fully constructing a Replica prior to proving that it -// can exist during the delicate synchronization dance that occurs in +// newUnloadedReplica partially constructs a Replica. The returned replica is +// assumed to be uninitialized, until Replica.loadRaftMuLockedReplicaMuLocked() +// is called with the correct descriptor. The primary reason this function +// exists separately from Replica.loadRaftMuLockedReplicaMuLocked() is to avoid +// attempting to fully construct a Replica and load it from storage prior to +// proving that it can exist during the delicate synchronization dance in // Store.tryGetOrCreateReplica(). A Replica returned from this function must not -// be used in any way until it's load() method has been called. +// be used in any way until the load method has been called. func newUnloadedReplica( - ctx context.Context, desc *roachpb.RangeDescriptor, store *Store, replicaID roachpb.ReplicaID, + ctx context.Context, rangeID roachpb.RangeID, store *Store, replicaID roachpb.ReplicaID, ) *Replica { if replicaID == 0 { - log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", desc.RangeID) + log.Fatalf(ctx, "cannot construct a replica for range %d with a 0 replica ID", rangeID) } + uninitState := stateloader.UninitializedReplicaState(rangeID) r := &Replica{ AmbientContext: store.cfg.AmbientCtx, - RangeID: desc.RangeID, + RangeID: rangeID, replicaID: replicaID, creationTime: timeutil.Now(), store: store, - abortSpan: abortspan.New(desc.RangeID), + abortSpan: abortspan.New(rangeID), concMgr: concurrency.NewManager(concurrency.Config{ NodeDesc: store.nodeDesc, - RangeDesc: desc, + RangeDesc: uninitState.Desc, Settings: store.ClusterSettings(), DB: store.DB(), Clock: store.Clock(), @@ -91,8 +102,10 @@ func newUnloadedReplica( TxnWaitKnobs: store.TestingKnobs().TxnWaitKnobs, }), } + r.sideTransportClosedTimestamp.init(store.cfg.ClosedTimestampReceiver, rangeID) + r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) - r.mu.stateLoader = stateloader.Make(desc.RangeID) + r.mu.stateLoader = stateloader.Make(rangeID) r.mu.quiescent = true r.mu.conf = store.cfg.DefaultSpanConfig split.Init(&r.loadBasedSplitter, store.cfg.Settings, split.GlobalRandSource(), func() float64 { @@ -114,15 +127,24 @@ func newUnloadedReplica( r.loadStats = load.NewReplicaLoad(store.Clock(), store.cfg.StorePool.GetNodeLocalityString) } - // Init rangeStr with the range ID. - r.rangeStr.store(replicaID, &roachpb.RangeDescriptor{RangeID: desc.RangeID}) + // NB: the state will be loaded when the replica gets initialized. + r.mu.state = uninitState + r.rangeStr.store(replicaID, uninitState.Desc) // Add replica log tag - the value is rangeStr.String(). r.AmbientContext.AddLogTag("r", &r.rangeStr) r.raftCtx = logtags.AddTag(r.AnnotateCtx(context.Background()), "raft", nil /* value */) // Add replica pointer value. NB: this was historically useful for debugging // replica GC issues, but is a distraction at the moment. // r.AmbientContext.AddLogTag("@", fmt.Sprintf("%x", unsafe.Pointer(r))) - r.raftMu.stateLoader = stateloader.Make(desc.RangeID) + + r.raftMu.stateLoader = stateloader.Make(rangeID) + r.raftMu.sideloaded = logstore.NewDiskSideloadStorage( + store.cfg.Settings, + rangeID, + store.engine.GetAuxiliaryDir(), + store.limiters.BulkIOWriteRate, + store.engine, + ) r.splitQueueThrottle = util.Every(splitQueueThrottleDuration) r.mergeQueueThrottle = util.Every(mergeQueueThrottleDuration) @@ -155,28 +177,27 @@ func (r *Replica) setStartKeyLocked(startKey roachpb.RKey) { r.startKey = startKey } -// loadRaftMuLockedReplicaMuLocked will load the state of the replica from disk. -// If desc is initialized, the Replica will be initialized when this method -// returns. An initialized Replica may not be reloaded. If this method is called -// with an uninitialized desc it may be called again later with an initialized -// desc. +// loadRaftMuLockedReplicaMuLocked loads the state of the initialized replica +// from storage. After this method returns, Replica is initialized, and can not +// be loaded again. // -// This method is called in three places: +// This method is called in two places: // // 1. newReplica - used when the store is initializing and during testing -// 2. tryGetOrCreateReplica - see newUnloadedReplica -// 3. splitPostApply - this call initializes a previously uninitialized Replica. +// 2. splitPostApply - this call initializes a previously uninitialized Replica. func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) error { ctx := r.AnnotateCtx(context.TODO()) - if r.mu.state.Desc != nil && r.IsInitialized() { - log.Fatalf(ctx, "r%d: cannot reinitialize an initialized replica", desc.RangeID) + if !desc.IsInitialized() { + return errors.AssertionFailedf("r%d: cannot load an uninitialized replica", desc.RangeID) + } + if r.IsInitialized() { + return errors.AssertionFailedf("r%d: cannot reinitialize an initialized replica", desc.RangeID) } else if r.replicaID == 0 { // NB: This is just a defensive check as r.mu.replicaID should never be 0. - log.Fatalf(ctx, "r%d: cannot initialize replica without a replicaID", desc.RangeID) - } - if desc.IsInitialized() { - r.setStartKeyLocked(desc.StartKey) + return errors.AssertionFailedf("r%d: cannot initialize replica without a replicaID", + desc.RangeID) } + r.setStartKeyLocked(desc.StartKey) // Clear the internal raft group in case we're being reset. Since we're // reloading the raft state below, it isn't safe to use the existing raft @@ -195,14 +216,15 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) // Ensure that we're not trying to load a replica with a different ID than // was used to construct this Replica. - replicaID := r.replicaID + var replicaID roachpb.ReplicaID if replicaDesc, found := r.mu.state.Desc.GetReplicaDescriptor(r.StoreID()); found { replicaID = replicaDesc.ReplicaID - } else if desc.IsInitialized() { - log.Fatalf(ctx, "r%d: cannot initialize replica which is not in descriptor %v", desc.RangeID, desc) + } else { + return errors.AssertionFailedf("r%d: cannot initialize replica which is not in descriptor %v", + desc.RangeID, desc) } if r.replicaID != replicaID { - log.Fatalf(ctx, "attempting to initialize a replica which has ID %d with ID %d", + return errors.AssertionFailedf("attempting to initialize a replica which has ID %d with ID %d", r.replicaID, replicaID) } @@ -220,17 +242,8 @@ func (r *Replica) loadRaftMuLockedReplicaMuLocked(desc *roachpb.RangeDescriptor) r.mu.minLeaseProposedTS = r.Clock().NowAsClockTimestamp() } - r.raftMu.sideloaded = logstore.NewDiskSideloadStorage( - r.store.cfg.Settings, - desc.RangeID, - r.Engine().GetAuxiliaryDir(), - r.store.limiters.BulkIOWriteRate, - r.store.engine, - ) r.assertStateRaftMuLockedReplicaMuRLocked(ctx, r.store.Engine()) - r.sideTransportClosedTimestamp.init(r.store.cfg.ClosedTimestampReceiver, desc.RangeID) - return nil } diff --git a/pkg/kv/kvserver/stateloader/BUILD.bazel b/pkg/kv/kvserver/stateloader/BUILD.bazel index 533574756750..19b9627a766d 100644 --- a/pkg/kv/kvserver/stateloader/BUILD.bazel +++ b/pkg/kv/kvserver/stateloader/BUILD.bazel @@ -25,7 +25,10 @@ go_library( go_test( name = "stateloader_test", size = "small", - srcs = ["initial_test.go"], + srcs = [ + "initial_test.go", + "stateloader_test.go", + ], args = ["-test.timeout=55s"], embed = [":stateloader"], deps = [ @@ -34,6 +37,7 @@ go_test( "//pkg/testutils", "//pkg/util/leaktest", "//pkg/util/stop", + "@com_github_stretchr_testify//require", "@io_etcd_go_raft_v3//raftpb", ], ) diff --git a/pkg/kv/kvserver/stateloader/stateloader.go b/pkg/kv/kvserver/stateloader/stateloader.go index 578037e0183f..93c18759228d 100644 --- a/pkg/kv/kvserver/stateloader/stateloader.go +++ b/pkg/kv/kvserver/stateloader/stateloader.go @@ -337,6 +337,20 @@ func (rsl StateLoader) SetVersion( hlc.Timestamp{}, hlc.ClockTimestamp{}, nil, version) } +// UninitializedReplicaState returns the ReplicaState of an uninitialized +// Replica with the given range ID. It is equivalent to StateLoader.Load from an +// empty storage. +func UninitializedReplicaState(rangeID roachpb.RangeID) kvserverpb.ReplicaState { + return kvserverpb.ReplicaState{ + Desc: &roachpb.RangeDescriptor{RangeID: rangeID}, + Lease: &roachpb.Lease{}, + TruncatedState: &roachpb.RaftTruncatedState{}, + GCThreshold: &hlc.Timestamp{}, + Stats: &enginepb.MVCCStats{}, + GCHint: &roachpb.GCHint{}, + } +} + // The rest is not technically part of ReplicaState. // SynthesizeRaftState creates a Raft state which synthesizes both a HardState diff --git a/pkg/kv/kvserver/stateloader/stateloader_test.go b/pkg/kv/kvserver/stateloader/stateloader_test.go new file mode 100644 index 000000000000..9e05a848adc6 --- /dev/null +++ b/pkg/kv/kvserver/stateloader/stateloader_test.go @@ -0,0 +1,30 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package stateloader + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/stretchr/testify/require" +) + +func TestUninitializedReplicaState(t *testing.T) { + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + desc := roachpb.RangeDescriptor{RangeID: 123} + exp, err := Make(desc.RangeID).Load(context.Background(), eng, &desc) + require.NoError(t, err) + act := UninitializedReplicaState(desc.RangeID) + require.Equal(t, exp, act) +} diff --git a/pkg/kv/kvserver/store_create_replica.go b/pkg/kv/kvserver/store_create_replica.go index ce0866f60680..a5cede75588f 100644 --- a/pkg/kv/kvserver/store_create_replica.go +++ b/pkg/kv/kvserver/store_create_replica.go @@ -222,14 +222,7 @@ func (s *Store) tryGetOrCreateReplica( } // Create a new uninitialized replica and lock it for raft processing. - // TODO(pavelkalinnikov): consolidate an uninitialized Replica creation into a - // single function, now that it is sequential. - uninitializedDesc := &roachpb.RangeDescriptor{ - RangeID: rangeID, - // NB: other fields are unknown; need to populate them from - // snapshot. - } - repl := newUnloadedReplica(ctx, uninitializedDesc, s, replicaID) + repl := newUnloadedReplica(ctx, rangeID, s, replicaID) repl.raftMu.Lock() // not unlocked // Take out read-only lock. Not strictly necessary here, but follows the // normal lock protocol for destroyStatus.Set(). @@ -237,19 +230,6 @@ func (s *Store) tryGetOrCreateReplica( // Grab the internal Replica state lock to ensure nobody mucks with our // replica even outside of raft processing. repl.mu.Lock() - - // NB: A Replica should never be in the store's replicas map with a nil - // descriptor. Assign it directly here. In the case that the Replica should - // exist (which we confirm with another check of the Tombstone below), we'll - // re-initialize the replica with the same uninitializedDesc. - // - // During short window between here and call to s.unlinkReplicaByRangeIDLocked() - // in the failure branch below, the Replica used to have a nil descriptor and - // was present in the map. While it was the case that the destroy status had - // been set, not every code path which inspects the descriptor checks the - // destroy status. - repl.mu.state.Desc = uninitializedDesc - // Initialize the Replica with the replicaID. if err := func() error { // An uninitialized replica should have an empty HardState.Commit at @@ -314,7 +294,8 @@ func (s *Store) tryGetOrCreateReplica( return err } - return repl.loadRaftMuLockedReplicaMuLocked(uninitializedDesc) + repl.assertStateRaftMuLockedReplicaMuRLocked(ctx, s.Engine()) + return nil }(); err != nil { // Mark the replica as destroyed and remove it from the replicas maps to // ensure nobody tries to use it.