Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: also load uninitialized replicas, verify replicaID #93317

Merged
merged 3 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,16 @@ func (r *Replica) assertStateRaftMuLockedReplicaMuRLocked(
log.Fatalf(ctx, "replica's replicaID %d diverges from descriptor %+v", r.replicaID, r.mu.state.Desc)
}
}
diskReplID, found, err := r.mu.stateLoader.LoadRaftReplicaID(ctx, reader)
if err != nil {
log.Fatalf(ctx, "%s", err)
}
if !found {
log.Fatalf(ctx, "no replicaID persisted")
}
if diskReplID.ReplicaID != r.replicaID {
log.Fatalf(ctx, "disk replicaID %d does not match in-mem %d", diskReplID, r.replicaID)
}
}

// TODO(nvanbenschoten): move the following 5 methods to replica_send.go.
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/split_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -96,7 +97,10 @@ func TestSplitQueueShouldQueue(t *testing.T) {
cpy := *tc.repl.Desc()
cpy.StartKey = test.start
cpy.EndKey = test.end
repl, err := newReplica(ctx, &cpy, tc.store, cpy.Replicas().VoterDescriptors()[0].ReplicaID)
replicaID := cpy.Replicas().VoterDescriptors()[0].ReplicaID
require.NoError(t,
logstore.NewStateLoader(cpy.RangeID).SetRaftReplicaID(ctx, tc.store.engine, replicaID))
repl, err := newReplica(ctx, &cpy, tc.store, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
82 changes: 69 additions & 13 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
Expand All @@ -64,6 +65,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
Expand Down Expand Up @@ -1904,19 +1906,47 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
return ident, err
}

type engineReplicas struct {
uninitialized map[storage.FullReplicaID]struct{}
initialized map[storage.FullReplicaID]*roachpb.RangeDescriptor
}

// loadFullReplicaIDsFromDisk discovers all Replicas on this Store.
// There will only be one entry in the map for a given RangeID.
//
// TODO(sep-raft-log): the reader here is for the log engine.
func loadFullReplicaIDsFromDisk(
ctx context.Context, reader storage.Reader,
) (map[storage.FullReplicaID]struct{}, error) {
m := map[storage.FullReplicaID]struct{}{}
var msg roachpb.RaftReplicaID
if err := IterateIDPrefixKeys(ctx, reader, func(rangeID roachpb.RangeID) roachpb.Key {
return keys.RaftReplicaIDKey(rangeID)
}, &msg, func(rangeID roachpb.RangeID) error {
m[storage.FullReplicaID{RangeID: rangeID, ReplicaID: msg.ReplicaID}] = struct{}{}
return nil
}); err != nil {
return nil, err
}

// TODO(sep-raft-log): if there is any other data that we mandate is present here
// (like a HardState), validate that here.

return m, nil
}

// loadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TODO(sep-raft-log): also load *uninitialized* Replicas.
func loadAndReconcileReplicas(
ctx context.Context, eng storage.Engine,
) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) {
// TOOD(sep-raft-log): consider a callback-visitor pattern here.
func loadAndReconcileReplicas(ctx context.Context, eng storage.Engine) (*engineReplicas, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
}

m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
initM := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
Expand All @@ -1933,12 +1963,7 @@ func loadAndReconcileReplicas(
ident.StoreID, desc)
}

// INVARIANT: every Replica has a persisted ReplicaID. For initialized
// Replicas, it matches that of the descriptor.
//
// TODO(sep-raft-log): actually load the persisted ReplicaID and validate
// the above invariant.
m[storage.FullReplicaID{
initM[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
Expand All @@ -1947,7 +1972,38 @@ func loadAndReconcileReplicas(
return nil, err
}

return m, nil
allM, err := loadFullReplicaIDsFromDisk(ctx, eng)
if err != nil {
return nil, err
}

for id := range initM {
if _, ok := allM[id]; !ok {
// INVARIANT: all replicas have a persisted full replicaID (i.e. a "replicaID from disk").
//
// This invariant is true for replicas created in 22.2, but no migration
// was ever written. So we backfill the replicaID here (as of 23.1) and
// remove this code in the future (the follow-up release, assuming it is
// forced to migrate through 23.1, otherwise later).
if buildutil.CrdbTestBuild {
return nil, errors.AssertionFailedf("%s has no persisted replicaID", initM[id])
}
if err := logstore.NewStateLoader(id.RangeID).SetRaftReplicaID(ctx, eng, id.ReplicaID); err != nil {
return nil, errors.Wrapf(err, "backfilling replicaID for r%d", id.RangeID)
}
log.Eventf(ctx, "backfilled replicaID for %s", id)
}
// `allM` will be our map of uninitialized replicas.
//
// A replica is "uninitialized" if it's not in initM (i.e. is at log position
// zero and has no visible RangeDescriptor).
delete(allM, id)
}

return &engineReplicas{
initialized: initM,
uninitialized: allM, // NB: init'ed ones were deleted earlier
}, nil
}

// Start the engine, set the GC and read the StoreIdent.
Expand Down Expand Up @@ -2065,11 +2121,11 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// concurrently. Note that while we can perform this initialization
// concurrently, all of the initialization must be performed before we start
// listening for Raft messages and starting the process Raft loop.
descs, err := loadAndReconcileReplicas(ctx, s.engine)
engRepls, err := loadAndReconcileReplicas(ctx, s.engine)
if err != nil {
return err
}
for fullID, desc := range descs {
for fullID, desc := range engRepls.initialized {
rep, err := newReplica(ctx, desc, s, fullID.ReplicaID)
if err != nil {
return err
Expand Down
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/store_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -216,7 +217,11 @@ func TestStorePoolUpdateLocalStoreBeforeGossip(t *testing.T) {
NextReplicaID: 1,
}
rg.AddReplica(1, 1, roachpb.VOTER_FULL)
replica, err := newReplica(ctx, &rg, store, 1)

const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(rg.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
replica, err := newReplica(ctx, &rg, store, replicaID)
if err != nil {
t.Fatalf("make replica error : %+v", err)
}
Expand Down
17 changes: 14 additions & 3 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
Expand Down Expand Up @@ -521,6 +522,7 @@ func TestInitializeEngineErrors(t *testing.T) {
// deprecated; new tests should create replicas by splitting from a
// properly-bootstrapped initial range.
func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) *Replica {
ctx := context.Background()
desc := &roachpb.RangeDescriptor{
RangeID: rangeID,
StartKey: start,
Expand All @@ -532,9 +534,15 @@ func createReplica(s *Store, rangeID roachpb.RangeID, start, end roachpb.RKey) *
}},
NextReplicaID: 2,
}
r, err := newReplica(context.Background(), desc, s, 1)
const replicaID = 1
if err := stateloader.WriteInitialRangeState(
ctx, s.engine, *desc, replicaID, clusterversion.TestingClusterVersion.Version,
); err != nil {
panic(err)
}
r, err := newReplica(ctx, desc, s, replicaID)
if err != nil {
log.Fatalf(context.Background(), "%v", err)
panic(err)
}
return r
}
Expand Down Expand Up @@ -825,7 +833,10 @@ func TestMaybeMarkReplicaInitialized(t *testing.T) {
RangeID: newRangeID,
}

r, err := newReplica(ctx, desc, store, 1)
const replicaID = 1
require.NoError(t,
logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
r, err := newReplica(ctx, desc, store, replicaID)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func newStores(ambientCtx log.AmbientContext, clock *hlc.Clock) *Stores {
Expand Down Expand Up @@ -153,6 +155,8 @@ func TestStoresGetReplicaForRangeID(t *testing.T) {
},
}

require.NoError(t,
logstore.NewStateLoader(desc.RangeID).SetRaftReplicaID(ctx, store.engine, replicaID))
replica, err := newReplica(ctx, desc, store, replicaID)
if err != nil {
t.Fatalf("unexpected error when creating replica: %+v", err)
Expand Down