Skip to content

Commit

Permalink
Merge #93311
Browse files Browse the repository at this point in the history
93311: kvserver: introduce loadAndReconcileReplicas r=pavelkalinnikov a=tbg

These are the beginnings of #93310 and #93244.
We need to inspect and possibly reconcile the persisted state at startup
time. This PR refactors what we have into a shape where new code can be
added more easily.

Epic: CRDB-220
Release note: None

Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
craig[bot] and tbg committed Dec 19, 2022
2 parents a34ab92 + 597b4f9 commit c88b6ea
Showing 1 changed file with 95 additions and 73 deletions.
168 changes: 95 additions & 73 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,9 +1833,10 @@ func IterateIDPrefixKeys(
}
}

// IterateRangeDescriptorsFromDisk calls the provided function with each
// descriptor from the provided Engine. The return values of this method and fn
// have semantics similar to storage.MVCCIterate.
// IterateRangeDescriptorsFromDisk discovers the initialized replicas and calls
// the provided function with each such descriptor from the provided Engine. The
// return values of this method and fn have semantics similar to
// storage.MVCCIterate.
func IterateRangeDescriptorsFromDisk(
ctx context.Context, reader storage.Reader, fn func(desc roachpb.RangeDescriptor) error,
) error {
Expand All @@ -1850,7 +1851,7 @@ func IterateRangeDescriptorsFromDisk(
kvToDesc := func(kv roachpb.KeyValue) error {
allCount++
// Only consider range metadata entries; ignore others.
_, suffix, _, err := keys.DecodeRangeKey(kv.Key)
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return err
}
Expand All @@ -1862,6 +1863,14 @@ func IterateRangeDescriptorsFromDisk(
if err := kv.Value.GetProto(&desc); err != nil {
return err
}
// Descriptor for range `[a,z)` must be found at `/rdsc/a`.
if !startKey.Equal(desc.StartKey.AsRawKey()) {
return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s",
kv.Key, desc.StartKey)
}
if !desc.IsInitialized() {
return errors.AssertionFailedf("uninitialized descriptor: %s", desc)
}
matchCount++
err = fn(desc)
if err == nil {
Expand Down Expand Up @@ -1895,6 +1904,52 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
return ident, err
}

// loadAndReconcileReplicas loads the Replicas present on this
// store. It reconciles inconsistent state and runs validation checks.
//
// TODO(sep-raft-log): also load *uninitialized* Replicas.
func loadAndReconcileReplicas(
ctx context.Context, eng storage.Engine,
) (map[storage.FullReplicaID]*roachpb.RangeDescriptor, error) {
ident, err := ReadStoreIdent(ctx, eng)
if err != nil {
return nil, err
}

m := map[storage.FullReplicaID]*roachpb.RangeDescriptor{}
// INVARIANT: the latest visible committed version of the RangeDescriptor
// (which is what IterateRangeDescriptorsFromDisk returns) is the one reflecting
// the state of the Replica.
// INVARIANT: the descriptor for range [a,z) is located at RangeDescriptorKey(a).
// This is checked in IterateRangeDescriptorsFromDisk.
if err := IterateRangeDescriptorsFromDisk(
ctx, eng, func(desc roachpb.RangeDescriptor) error {
// INVARIANT: a Replica's RangeDescriptor always contains the local Store,
// i.e. a Store is a member of all of its local Replicas.
repDesc, found := desc.GetReplicaDescriptor(ident.StoreID)
if !found {
return errors.AssertionFailedf(
"RangeDescriptor does not contain local s%d: %s",
ident.StoreID, desc)
}

// INVARIANT: every Replica has a persisted ReplicaID. For initialized
// Replicas, it matches that of the descriptor.
//
// TODO(sep-raft-log): actually load the persisted ReplicaID and validate
// the above invariant.
m[storage.FullReplicaID{
RangeID: desc.RangeID,
ReplicaID: repDesc.ReplicaID,
}] = &desc
return nil
}); err != nil {
return nil, err
}

return m, nil
}

// Start the engine, set the GC and read the StoreIdent.
func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.stopper = stopper
Expand Down Expand Up @@ -1995,83 +2050,50 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
// (consistent=false). Uncommitted intents which have been abandoned
// due to a split crashing halfway will simply be resolved on the
// next split attempt. They can otherwise be ignored.

//
// Note that we do not create raft groups at this time; they will be created
// on-demand the first time they are needed. This helps reduce the amount of
// election-related traffic in a cold start.
// Raft initialization occurs when we propose a command on this range or
// receive a raft message addressed to it.
// TODO(bdarnell): Also initialize raft groups when read leases are needed.
// TODO(bdarnell): Scan all ranges at startup for unapplied log entries
// and initialize those groups.
//
// TODO(peter): While we have to iterate to find the replica descriptors
// serially, we can perform the migrations and replica creation
// concurrently. Note that while we can perform this initialization
// concurrently, all of the initialization must be performed before we start
// listening for Raft messages and starting the process Raft loop.
err = IterateRangeDescriptorsFromDisk(ctx, s.engine,
func(desc roachpb.RangeDescriptor) error {
if !desc.IsInitialized() {
return errors.Errorf("found uninitialized RangeDescriptor: %+v", desc)
}
replicaDesc, found := desc.GetReplicaDescriptor(s.StoreID())
if !found {
// This is a pre-emptive snapshot. It's also possible that this is a
// range which has processed a raft command to remove itself (which is
// possible prior to 19.2 or if the DisableEagerReplicaRemoval is
// enabled) and has not yet been removed by the replica gc queue.
// We treat both cases the same way. These should no longer exist in
// 20.2 or after as there was a migration in 20.1 to remove them and
// no pre-emptive snapshot should have been sent since 19.2 was
// finalized.
return errors.AssertionFailedf(
"found RangeDescriptor for range %d at generation %d which does not"+
" contain this store %d",
redact.Safe(desc.RangeID),
redact.Safe(desc.Generation),
redact.Safe(s.StoreID()))
}

rep, err := newReplica(ctx, &desc, s, replicaDesc.ReplicaID)
if err != nil {
return err
}

// We can't lock s.mu across NewReplica due to the lock ordering
// constraint (*Replica).raftMu < (*Store).mu. See the comment on
// (Store).mu.
s.mu.Lock()
err = s.addReplicaInternalLocked(rep)
s.mu.Unlock()
if err != nil {
return err
}

// Add this range and its stats to our counter.
s.metrics.ReplicaCount.Inc(1)
if _, ok := rep.TenantID(); ok {
// TODO(tbg): why the check? We're definitely an initialized range so
// we have a tenantID.
s.metrics.addMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats())
} else {
return errors.AssertionFailedf("found newly constructed replica"+
" for range %d at generation %d with an invalid tenant ID in store %d",
redact.Safe(desc.RangeID),
redact.Safe(desc.Generation),
redact.Safe(s.StoreID()))
}

if _, ok := desc.GetReplicaDescriptor(s.StoreID()); !ok {
// We are no longer a member of the range, but we didn't GC the replica
// before shutting down. Add the replica to the GC queue.
s.replicaGCQueue.AddAsync(ctx, rep, replicaGCPriorityRemoved)
}

// Note that we do not create raft groups at this time; they will be created
// on-demand the first time they are needed. This helps reduce the amount of
// election-related traffic in a cold start.
// Raft initialization occurs when we propose a command on this range or
// receive a raft message addressed to it.
// TODO(bdarnell): Also initialize raft groups when read leases are needed.
// TODO(bdarnell): Scan all ranges at startup for unapplied log entries
// and initialize those groups.
return nil
})
descs, err := loadAndReconcileReplicas(ctx, s.engine)
if err != nil {
return err
}
for fullID, desc := range descs {
rep, err := newReplica(ctx, desc, s, fullID.ReplicaID)
if err != nil {
return err
}

// We can't lock s.mu across NewReplica due to the lock ordering
// constraint (*Replica).raftMu < (*Store).mu. See the comment on
// (Store).mu.
s.mu.Lock()
err = s.addReplicaInternalLocked(rep)
s.mu.Unlock()
if err != nil {
return err
}

// Add this range and its stats to our counter.
s.metrics.ReplicaCount.Inc(1)
// INVARIANT: each initialized Replica is associated to a tenant.
if _, ok := rep.TenantID(); ok {
s.metrics.addMVCCStats(ctx, rep.tenantMetricsRef, rep.GetMVCCStats())
} else {
return errors.AssertionFailedf("no tenantID for initialized replica %s", rep)
}
}

// Start Raft processing goroutines.
s.cfg.Transport.Listen(s.StoreID(), s)
Expand Down

0 comments on commit c88b6ea

Please sign in to comment.