Skip to content

Commit

Permalink
kvserver: extract writeUnreplicatedSST
Browse files Browse the repository at this point in the history
CTRL-X CTRL-V

Epic: CRDB-220
Release note: None
  • Loading branch information
tbg committed Feb 9, 2023
1 parent 47ef7fd commit 9d18c63
Showing 1 changed file with 54 additions and 46 deletions.
100 changes: 54 additions & 46 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,52 +482,6 @@ func (r *Replica) applySnapshot(
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
}(timeutil.Now())

writeUnreplicatedSST := func(ctx context.Context, id storage.FullReplicaID, st *cluster.Settings, meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader) (*storage.MemFile, bool, error) {
unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: meta.Index,
Term: meta.Term,
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
}
unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST(
ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader,
)
Expand Down Expand Up @@ -704,6 +658,60 @@ func (r *Replica) applySnapshot(
return nil
}

func writeUnreplicatedSST(
ctx context.Context,
id storage.FullReplicaID,
st *cluster.Settings,
meta raftpb.SnapshotMetadata,
hs raftpb.HardState,
sl *logstore.StateLoader,
) (*storage.MemFile, bool, error) {
unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
//
// NB: We do not expect to see range keys in the unreplicated state, so
// we don't drop a range tombstone across the range key space.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
ctx, &unreplicatedSST,
&roachpb.RaftTruncatedState{
Index: meta.Index,
Term: meta.Term,
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
}

// clearSubsumedReplicaDiskData clears the on disk data of the subsumed
// replicas by creating SSTs with range deletion tombstones. We have to be
// careful here not to have overlapping ranges with the SSTs we have already
Expand Down

0 comments on commit 9d18c63

Please sign in to comment.