Skip to content

Commit

Permalink
kvserver: trim state used from snapshots
Browse files Browse the repository at this point in the history
Snapshots contain a serialized copy of the `ReplicaState`. However, the
snapshot itself contains that data already. Having two sources of data
that may be interpreted differently can lead to problems, as we saw in
[72239].

This commit deprecates using the entire ReplicaState. Instead, we pick
out the descriptor and ignore everything else. Instead of using the
copy of the state to initialize the recipient's in-memory state, we
now use a state loader.

In 22.2 we can only send the descriptor but maybe we won't do that; for
logging and debugging it's kind of nice to have everything present.

Fixes cockroachdb#72222.

[cockroachdb#72239]: cockroachdb#72239

Release note: None
  • Loading branch information
tbg committed Nov 2, 2021
1 parent c9882b5 commit 2d17daf
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 34 deletions.
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func mergeCheckingTimestampCaches(

// Install a filter to capture the Raft snapshot.
snapshotFilter = func(inSnap kvserver.IncomingSnapshot) {
if inSnap.State.Desc.RangeID == lhsDesc.RangeID {
if inSnap.Desc.RangeID == lhsDesc.RangeID {
snapChan <- inSnap
}
}
Expand Down Expand Up @@ -809,7 +809,7 @@ func mergeCheckingTimestampCaches(
case <-time.After(45 * time.Second):
t.Fatal("timed out waiting for snapChan")
}
inSnapDesc := inSnap.State.Desc
inSnapDesc := inSnap.Desc
require.Equal(t, lhsDesc.StartKey, inSnapDesc.StartKey)
require.Equal(t, rhsDesc.EndKey, inSnapDesc.EndKey)

Expand Down Expand Up @@ -3731,7 +3731,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// on in the test. This function verifies that the subsumed replicas have
// been handled properly.
if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE ||
inSnap.State.Desc.RangeID != rangeIds[string(keyA)] {
inSnap.Desc.RangeID != rangeIds[string(keyA)] {
return nil
}

Expand Down Expand Up @@ -3779,8 +3779,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {

// Construct SSTs for the the first 4 bullets as numbered above, but only
// ultimately keep the last one.
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.State.Desc)
it := rditer.NewReplicaEngineDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */)
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.Desc)
it := rditer.NewReplicaEngineDataIterator(inSnap.Desc, sendingEng, true /* replicatedOnly */)
defer it.Close()
// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3648,7 +3648,7 @@ func TestTenantID(t *testing.T) {
request_type kvserver.SnapshotRequest_Type,
strings []string,
) error {
if snapshot.State.Desc.RangeID == repl.RangeID {
if snapshot.Desc.RangeID == repl.RangeID {
select {
case sawSnapshot <- struct{}{}:
default:
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
ctx context.Context, inSnap IncomingSnapshot,
) (_ handleRaftReadyStats, _ string, foo error) {
var stats handleRaftReadyStats
if inSnap.State != nil {
if inSnap.Desc != nil {
stats.snap.offered = true
}

Expand Down Expand Up @@ -584,7 +584,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
leaderID = roachpb.ReplicaID(rd.SoftState.Lead)
}

if inSnap.State != nil {
if inSnap.Desc != nil {
if !raft.IsEmptySnap(rd.Snapshot) {
snapUUID, err := uuid.FromBytes(rd.Snapshot.Data)
if err != nil {
Expand Down Expand Up @@ -1719,7 +1719,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock(
// installed a placeholder for snapshot's keyspace. No merge lock needed.
return nil, func() {}
}
for endKey.Less(inSnap.State.Desc.EndKey) {
for endKey.Less(inSnap.Desc.EndKey) {
sRepl := r.store.LookupReplica(endKey)
if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) {
log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey)
Expand Down
56 changes: 33 additions & 23 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,10 +515,11 @@ type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The storage interface for the underlying SSTs.
SSTStorageScratch *SSTSnapshotStorageScratch
// The replica state at the time the snapshot was generated (never nil).
State *kvserverpb.ReplicaState
snapType SnapshotRequest_Type
placeholder *ReplicaPlaceholder
// The descriptor in the snapshot, never nil.
Desc *roachpb.RangeDescriptor
snapType SnapshotRequest_Type
placeholder *ReplicaPlaceholder
raftAppliedIndex uint64 // logging only
}

func (s *IncomingSnapshot) String() string {
Expand All @@ -527,10 +528,10 @@ func (s *IncomingSnapshot) String() string {

// SafeFormat implements the redact.SafeFormatter interface.
func (s *IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex)
w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.raftAppliedIndex)
}

// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the
// snapshot creates an OutgoingSnapshot containing a pebble snapshot for the
// given range. Note that snapshot() is called without Replica.raftMu held.
func snapshot(
ctx context.Context,
Expand Down Expand Up @@ -758,9 +759,9 @@ func (r *Replica) applySnapshot(
hs raftpb.HardState,
subsumedRepls []*Replica,
) (err error) {
s := *inSnap.State
if s.Desc.RangeID != r.RangeID {
log.Fatalf(ctx, "unexpected range ID %d", s.Desc.RangeID)
desc := inSnap.Desc
if desc.RangeID != r.RangeID {
log.Fatalf(ctx, "unexpected range ID %d", desc.RangeID)
}

isInitialSnap := !r.IsInitialized()
Expand Down Expand Up @@ -852,7 +853,11 @@ func (r *Replica) applySnapshot(
r.store.raftEntryCache.Drop(r.RangeID)

if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: nonemptySnap.Metadata.Index,
Term: nonemptySnap.Metadata.Term,
},
); err != nil {
return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}
Expand All @@ -868,19 +873,14 @@ func (r *Replica) applySnapshot(
}
}

if s.RaftAppliedIndex != nonemptySnap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
s.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
// maximum possible replica ID. preDestroyRaftMuLocked will write a replica
// tombstone using this maximum possible replica ID, which would normally be
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
return err
}
stats.subsumedReplicas = timeutil.Now()
Expand All @@ -896,6 +896,16 @@ func (r *Replica) applySnapshot(
}
stats.ingestion = timeutil.Now()

state, err := stateloader.Make(desc.RangeID).Load(ctx, r.store.engine, desc)
if err != nil {
log.Fatalf(ctx, "unable to load replica state: %s", err)
}

if state.RaftAppliedIndex != nonemptySnap.Metadata.Index {
log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d",
state.RaftAppliedIndex, nonemptySnap.Metadata.Index)
}

// The on-disk state is now committed, but the corresponding in-memory state
// has not yet been updated. Any errors past this point must therefore be
// treated as fatal.
Expand Down Expand Up @@ -926,7 +936,7 @@ func (r *Replica) applySnapshot(
log.Fatalf(ctx, "unable to remove placeholder: %s", err)
}
}
r.setDescLockedRaftMuLocked(ctx, s.Desc)
r.setDescLockedRaftMuLocked(ctx, desc)
if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil {
log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err)
}
Expand All @@ -942,18 +952,18 @@ func (r *Replica) applySnapshot(
// performance implications are not likely to be drastic. If our
// feelings about this ever change, we can add a LastIndex field to
// raftpb.SnapshotMetadata.
r.mu.lastIndex = s.RaftAppliedIndex
r.mu.lastIndex = state.RaftAppliedIndex
r.mu.lastTerm = invalidLastTerm
r.mu.raftLogSize = 0
// Update the store stats for the data in the snapshot.
r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *s.Stats)
r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats)
lastKnownLease := r.mu.state.Lease
// Update the rest of the Raft state. Changes to r.mu.state.Desc must be
// managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled
// by r.leasePostApply, but we called those above, so now it's safe to
// wholesale replace r.mu.state.
r.mu.state = s
r.mu.state = state
// Snapshots typically have fewer log entries than the leaseholder. The next
// time we hold the lease, recompute the log size before making decisions.
r.mu.raftLogSizeTrusted = false
Expand All @@ -962,13 +972,13 @@ func (r *Replica) applySnapshot(
// replica according to whether it holds the lease. We allow jumps in the
// lease sequence because there may be multiple lease changes accounted for
// in the snapshot.
r.leasePostApplyLocked(ctx, lastKnownLease, s.Lease /* newLease */, prioReadSum, allowLeaseJump)
r.leasePostApplyLocked(ctx, lastKnownLease, state.Lease /* newLease */, prioReadSum, allowLeaseJump)

// Similarly, if we subsumed any replicas through the snapshot (meaning that
// we missed the application of a merge) and we are the new leaseholder, we
// make sure to update the timestamp cache using the prior read summary to
// account for any reads that were served on the right-hand side range(s).
if len(subsumedRepls) > 0 && s.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil {
applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum)
}

Expand All @@ -995,7 +1005,7 @@ func (r *Replica) applySnapshot(
// Update the replica's cached byte thresholds. This is a no-op if the system
// config is not available, in which case we rely on the next gossip update
// to perform the update.
if err := r.updateRangeInfo(ctx, s.Desc); err != nil {
if err := r.updateRangeInfo(ctx, desc); err != nil {
log.Fatalf(ctx, "unable to update range info while applying snapshot: %+v", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
inSnap := IncomingSnapshot{
SnapUUID: snapUUID,
SSTStorageScratch: kvSS.scratch,
State: &header.State,
Desc: header.State.Desc,
snapType: header.Type,
raftAppliedIndex: header.State.RaftAppliedIndex,
}

kvSS.status = fmt.Sprintf("ssts: %d", len(kvSS.scratch.SSTs()))
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2817,7 +2817,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) {
if err := s.processRaftSnapshotRequest(ctx, req,
IncomingSnapshot{
SnapUUID: uuid.MakeV4(),
State: &kvserverpb.ReplicaState{Desc: repl1.Desc()},
Desc: repl1.Desc(),
placeholder: placeholder,
},
); err != nil {
Expand Down

0 comments on commit 2d17daf

Please sign in to comment.