diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 4043399b974b..60be58efe729 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -863,6 +863,11 @@ func (r *Replica) ReplicaID() roachpb.ReplicaID { return r.replicaID } +// ID returns the FullReplicaID for the Replica. +func (r *Replica) ID() storage.FullReplicaID { + return storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID} +} + // cleanupFailedProposal cleans up after a proposal that has failed. It // clears any references to the proposal and releases associated quota. // It requires that Replica.mu is exclusively held. diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 5a28563740e8..9507093fca78 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -482,59 +482,21 @@ func (r *Replica) applySnapshot( log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) }(timeutil.Now()) - unreplicatedSSTFile := &storage.MemFile{} - unreplicatedSST := storage.MakeIngestionSSTWriter( - ctx, r.ClusterSettings(), unreplicatedSSTFile, + unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST( + ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader, ) - defer unreplicatedSST.Close() - - // Clearing the unreplicated state. - // - // NB: We do not expect to see range keys in the unreplicated state, so - // we don't drop a range tombstone across the range key space. - unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID) - unreplicatedStart := unreplicatedPrefixKey - unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() - if err = unreplicatedSST.ClearRawRange( - unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ - ); err != nil { - return errors.Wrapf(err, "error clearing range of unreplicated SST writer") - } - - // Update HardState. - if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") - } - // We've cleared all the raft state above, so we are forced to write the - // RaftReplicaID again here. - if err := r.raftMu.stateLoader.SetRaftReplicaID( - ctx, &unreplicatedSST, r.replicaID); err != nil { - return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") - } - - // Update Raft entries. - r.store.raftEntryCache.Drop(r.RangeID) - - if err := r.raftMu.stateLoader.SetRaftTruncatedState( - ctx, &unreplicatedSST, - &roachpb.RaftTruncatedState{ - Index: nonemptySnap.Metadata.Index, - Term: nonemptySnap.Metadata.Term, - }, - ); err != nil { - return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") - } - - if err := unreplicatedSST.Finish(); err != nil { + if err != nil { return err } - if unreplicatedSST.DataSize > 0 { + if nonempty { // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than // buffering in a MemFile first. if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { return err } } + // Update Raft entries. + r.store.raftEntryCache.Drop(r.RangeID) // If we're subsuming a replica below, we don't have its last NextReplicaID, // nor can we obtain it. That's OK: we can just be conservative and use the @@ -699,6 +661,67 @@ func (r *Replica) applySnapshot( return nil } +// writeUnreplicatedSST creates an SST for snapshot application that +// covers the RangeID-unreplicated keyspace. A range tombstone is +// laid down and the Raft state provided by the arguments is overlaid +// onto it. +// +// TODO(sep-raft-log): when is `nonempty` ever false? We always +// perform a number of writes to this SST. +func writeUnreplicatedSST( + ctx context.Context, + id storage.FullReplicaID, + st *cluster.Settings, + meta raftpb.SnapshotMetadata, + hs raftpb.HardState, + sl *logstore.StateLoader, +) (_ *storage.MemFile, nonempty bool, _ error) { + unreplicatedSSTFile := &storage.MemFile{} + unreplicatedSST := storage.MakeIngestionSSTWriter( + ctx, st, unreplicatedSSTFile, + ) + defer unreplicatedSST.Close() + + // Clearing the unreplicated state. + // + // NB: We do not expect to see range keys in the unreplicated state, so + // we don't drop a range tombstone across the range key space. + unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) + unreplicatedStart := unreplicatedPrefixKey + unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + if err := unreplicatedSST.ClearRawRange( + unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ + ); err != nil { + return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer") + } + + // Update HardState. + if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil { + return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + } + // We've cleared all the raft state above, so we are forced to write the + // RaftReplicaID again here. + if err := sl.SetRaftReplicaID( + ctx, &unreplicatedSST, id.ReplicaID); err != nil { + return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + } + + if err := sl.SetRaftTruncatedState( + ctx, &unreplicatedSST, + &roachpb.RaftTruncatedState{ + Index: meta.Index, + Term: meta.Term, + }, + ); err != nil { + return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") + } + + if err := unreplicatedSST.Finish(); err != nil { + return nil, false, err + } + return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil +} + // clearSubsumedReplicaDiskData clears the on disk data of the subsumed // replicas by creating SSTs with range deletion tombstones. We have to be // careful here not to have overlapping ranges with the SSTs we have already