diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index fad033662d11..51d36ba78015 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -757,7 +757,7 @@ func mergeCheckingTimestampCaches( // Install a filter to capture the Raft snapshot. snapshotFilter = func(inSnap kvserver.IncomingSnapshot) { - if inSnap.State.Desc.RangeID == lhsDesc.RangeID { + if inSnap.Desc.RangeID == lhsDesc.RangeID { snapChan <- inSnap } } @@ -809,7 +809,7 @@ func mergeCheckingTimestampCaches( case <-time.After(45 * time.Second): t.Fatal("timed out waiting for snapChan") } - inSnapDesc := inSnap.State.Desc + inSnapDesc := inSnap.Desc require.Equal(t, lhsDesc.StartKey, inSnapDesc.StartKey) require.Equal(t, rhsDesc.EndKey, inSnapDesc.EndKey) @@ -3731,7 +3731,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // on in the test. This function verifies that the subsumed replicas have // been handled properly. if snapType != kvserver.SnapshotRequest_VIA_SNAPSHOT_QUEUE || - inSnap.State.Desc.RangeID != rangeIds[string(keyA)] { + inSnap.Desc.RangeID != rangeIds[string(keyA)] { return nil } @@ -3779,8 +3779,8 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // Construct SSTs for the the first 4 bullets as numbered above, but only // ultimately keep the last one. - keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.State.Desc) - it := rditer.NewReplicaEngineDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */) + keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.Desc) + it := rditer.NewReplicaEngineDataIterator(inSnap.Desc, sendingEng, true /* replicatedOnly */) defer it.Close() // Write a range deletion tombstone to each of the SSTs then put in the // kv entries from the sender of the snapshot. diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4fbd07c678c0..2a265fbb3acc 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3648,7 +3648,7 @@ func TestTenantID(t *testing.T) { request_type kvserver.SnapshotRequest_Type, strings []string, ) error { - if snapshot.State.Desc.RangeID == repl.RangeID { + if snapshot.Desc.RangeID == repl.RangeID { select { case sawSnapshot <- struct{}{}: default: diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 771f6315c3cc..51e37ba1c68d 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -507,7 +507,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( ctx context.Context, inSnap IncomingSnapshot, ) (_ handleRaftReadyStats, _ string, foo error) { var stats handleRaftReadyStats - if inSnap.State != nil { + if inSnap.Desc != nil { stats.snap.offered = true } @@ -584,7 +584,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID = roachpb.ReplicaID(rd.SoftState.Lead) } - if inSnap.State != nil { + if inSnap.Desc != nil { if !raft.IsEmptySnap(rd.Snapshot) { snapUUID, err := uuid.FromBytes(rd.Snapshot.Data) if err != nil { @@ -1719,7 +1719,7 @@ func (r *Replica) maybeAcquireSnapshotMergeLock( // installed a placeholder for snapshot's keyspace. No merge lock needed. return nil, func() {} } - for endKey.Less(inSnap.State.Desc.EndKey) { + for endKey.Less(inSnap.Desc.EndKey) { sRepl := r.store.LookupReplica(endKey) if sRepl == nil || !endKey.Equal(sRepl.Desc().StartKey) { log.Fatalf(ctx, "snapshot widens existing replica, but no replica exists for subsumed key %s", endKey) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index aaa0e477aea8..a4817278bb12 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -515,10 +515,11 @@ type IncomingSnapshot struct { SnapUUID uuid.UUID // The storage interface for the underlying SSTs. SSTStorageScratch *SSTSnapshotStorageScratch - // The replica state at the time the snapshot was generated (never nil). - State *kvserverpb.ReplicaState - snapType SnapshotRequest_Type - placeholder *ReplicaPlaceholder + // The descriptor in the snapshot, never nil. + Desc *roachpb.RangeDescriptor + snapType SnapshotRequest_Type + placeholder *ReplicaPlaceholder + raftAppliedIndex uint64 // logging only } func (s *IncomingSnapshot) String() string { @@ -527,10 +528,10 @@ func (s *IncomingSnapshot) String() string { // SafeFormat implements the redact.SafeFormatter interface. func (s *IncomingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.State.RaftAppliedIndex) + w.Printf("%s snapshot %s at applied index %d", s.snapType, s.SnapUUID.Short(), s.raftAppliedIndex) } -// snapshot creates an OutgoingSnapshot containing a rocksdb snapshot for the +// snapshot creates an OutgoingSnapshot containing a pebble snapshot for the // given range. Note that snapshot() is called without Replica.raftMu held. func snapshot( ctx context.Context, @@ -758,9 +759,9 @@ func (r *Replica) applySnapshot( hs raftpb.HardState, subsumedRepls []*Replica, ) (err error) { - s := *inSnap.State - if s.Desc.RangeID != r.RangeID { - log.Fatalf(ctx, "unexpected range ID %d", s.Desc.RangeID) + desc := inSnap.Desc + if desc.RangeID != r.RangeID { + log.Fatalf(ctx, "unexpected range ID %d", desc.RangeID) } isInitialSnap := !r.IsInitialized() @@ -852,7 +853,11 @@ func (r *Replica) applySnapshot( r.store.raftEntryCache.Drop(r.RangeID) if err := r.raftMu.stateLoader.SetRaftTruncatedState( - ctx, &unreplicatedSST, s.TruncatedState, + ctx, &unreplicatedSST, + &roachpb.RaftTruncatedState{ + Index: nonemptySnap.Metadata.Index, + Term: nonemptySnap.Metadata.Term, + }, ); err != nil { return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") } @@ -868,11 +873,6 @@ func (r *Replica) applySnapshot( } } - if s.RaftAppliedIndex != nonemptySnap.Metadata.Index { - log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", - s.RaftAppliedIndex, nonemptySnap.Metadata.Index) - } - // If we're subsuming a replica below, we don't have its last NextReplicaID, // nor can we obtain it. That's OK: we can just be conservative and use the // maximum possible replica ID. preDestroyRaftMuLocked will write a replica @@ -880,7 +880,7 @@ func (r *Replica) applySnapshot( // problematic, as it would prevent this store from ever having a new replica // of the removed range. In this case, however, it's copacetic, as subsumed // ranges _can't_ have new replicas. - if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil { + if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, desc, subsumedRepls, mergedTombstoneReplicaID); err != nil { return err } stats.subsumedReplicas = timeutil.Now() @@ -896,6 +896,16 @@ func (r *Replica) applySnapshot( } stats.ingestion = timeutil.Now() + state, err := stateloader.Make(desc.RangeID).Load(ctx, r.store.engine, desc) + if err != nil { + log.Fatalf(ctx, "unable to load replica state: %s", err) + } + + if state.RaftAppliedIndex != nonemptySnap.Metadata.Index { + log.Fatalf(ctx, "snapshot RaftAppliedIndex %d doesn't match its metadata index %d", + state.RaftAppliedIndex, nonemptySnap.Metadata.Index) + } + // The on-disk state is now committed, but the corresponding in-memory state // has not yet been updated. Any errors past this point must therefore be // treated as fatal. @@ -926,7 +936,7 @@ func (r *Replica) applySnapshot( log.Fatalf(ctx, "unable to remove placeholder: %s", err) } } - r.setDescLockedRaftMuLocked(ctx, s.Desc) + r.setDescLockedRaftMuLocked(ctx, desc) if err := r.store.maybeMarkReplicaInitializedLockedReplLocked(ctx, r); err != nil { log.Fatalf(ctx, "unable to mark replica initialized while applying snapshot: %+v", err) } @@ -942,18 +952,18 @@ func (r *Replica) applySnapshot( // performance implications are not likely to be drastic. If our // feelings about this ever change, we can add a LastIndex field to // raftpb.SnapshotMetadata. - r.mu.lastIndex = s.RaftAppliedIndex + r.mu.lastIndex = state.RaftAppliedIndex r.mu.lastTerm = invalidLastTerm r.mu.raftLogSize = 0 // Update the store stats for the data in the snapshot. r.store.metrics.subtractMVCCStats(ctx, r.mu.tenantID, *r.mu.state.Stats) - r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *s.Stats) + r.store.metrics.addMVCCStats(ctx, r.mu.tenantID, *state.Stats) lastKnownLease := r.mu.state.Lease // Update the rest of the Raft state. Changes to r.mu.state.Desc must be // managed by r.setDescRaftMuLocked and changes to r.mu.state.Lease must be handled // by r.leasePostApply, but we called those above, so now it's safe to // wholesale replace r.mu.state. - r.mu.state = s + r.mu.state = state // Snapshots typically have fewer log entries than the leaseholder. The next // time we hold the lease, recompute the log size before making decisions. r.mu.raftLogSizeTrusted = false @@ -962,13 +972,13 @@ func (r *Replica) applySnapshot( // replica according to whether it holds the lease. We allow jumps in the // lease sequence because there may be multiple lease changes accounted for // in the snapshot. - r.leasePostApplyLocked(ctx, lastKnownLease, s.Lease /* newLease */, prioReadSum, allowLeaseJump) + r.leasePostApplyLocked(ctx, lastKnownLease, state.Lease /* newLease */, prioReadSum, allowLeaseJump) // Similarly, if we subsumed any replicas through the snapshot (meaning that // we missed the application of a merge) and we are the new leaseholder, we // make sure to update the timestamp cache using the prior read summary to // account for any reads that were served on the right-hand side range(s). - if len(subsumedRepls) > 0 && s.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil { + if len(subsumedRepls) > 0 && state.Lease.Replica.ReplicaID == r.mu.replicaID && prioReadSum != nil { applyReadSummaryToTimestampCache(r.store.tsCache, r.descRLocked(), *prioReadSum) } @@ -995,7 +1005,7 @@ func (r *Replica) applySnapshot( // Update the replica's cached byte thresholds. This is a no-op if the system // config is not available, in which case we rely on the next gossip update // to perform the update. - if err := r.updateRangeInfo(ctx, s.Desc); err != nil { + if err := r.updateRangeInfo(ctx, desc); err != nil { log.Fatalf(ctx, "unable to update range info while applying snapshot: %+v", err) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 035a3251424d..d5bf8264c207 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -290,8 +290,9 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( inSnap := IncomingSnapshot{ SnapUUID: snapUUID, SSTStorageScratch: kvSS.scratch, - State: &header.State, + Desc: header.State.Desc, snapType: header.Type, + raftAppliedIndex: header.State.RaftAppliedIndex, } kvSS.status = fmt.Sprintf("ssts: %d", len(kvSS.scratch.SSTs())) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 65aa6cbf7062..586721e522c8 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2817,7 +2817,7 @@ func TestStoreRemovePlaceholderOnRaftIgnored(t *testing.T) { if err := s.processRaftSnapshotRequest(ctx, req, IncomingSnapshot{ SnapUUID: uuid.MakeV4(), - State: &kvserverpb.ReplicaState{Desc: repl1.Desc()}, + Desc: repl1.Desc(), placeholder: placeholder, }, ); err != nil {