Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: extract writeUnreplicatedSST #96882

Merged
merged 4 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,11 @@ func (r *Replica) ReplicaID() roachpb.ReplicaID {
return r.replicaID
}

// ID returns the FullReplicaID for the Replica.
func (r *Replica) ID() storage.FullReplicaID {
return storage.FullReplicaID{RangeID: r.RangeID, ReplicaID: r.replicaID}
}

// cleanupFailedProposal cleans up after a proposal that has failed. It
// clears any references to the proposal and releases associated quota.
// It requires that Replica.mu is exclusively held.
Expand Down
105 changes: 59 additions & 46 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,59 +482,18 @@ func (r *Replica) applySnapshot(
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
}(timeutil.Now())

unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, r.ClusterSettings(), unreplicatedSSTFile,
unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST(
ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader,
)
tbg marked this conversation as resolved.
Show resolved Hide resolved
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := r.raftMu.stateLoader.SetRaftReplicaID(
ctx, &unreplicatedSST, r.replicaID); err != nil {
return errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)

if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: nonemptySnap.Metadata.Index,
Term: nonemptySnap.Metadata.Term,
},
); err != nil {
return errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
if nonempty {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
}
// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)

// If we're subsuming a replica below, we don't have its last NextReplicaID,
// nor can we obtain it. That's OK: we can just be conservative and use the
Expand Down Expand Up @@ -699,6 +658,60 @@ func (r *Replica) applySnapshot(
return nil
}

func writeUnreplicatedSST(
ctx context.Context,
id storage.FullReplicaID,
st *cluster.Settings,
meta raftpb.SnapshotMetadata,
hs raftpb.HardState,
sl *logstore.StateLoader,
) (*storage.MemFile, bool, error) {
tbg marked this conversation as resolved.
Show resolved Hide resolved
unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: meta.Index,
Term: meta.Term,
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
}

// clearSubsumedReplicaDiskData clears the on disk data of the subsumed
// replicas by creating SSTs with range deletion tombstones. We have to be
// careful here not to have overlapping ranges with the SSTs we have already
Expand Down