Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] kv: use write batch for small Raft snapshots #63013

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
299 changes: 194 additions & 105 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ type IncomingSnapshot struct {
SnapUUID uuid.UUID
// The storage interface for the underlying SSTs.
SSTStorageScratch *SSTSnapshotStorageScratch
// Non-nil for small snapshots.
Batch storage.Batch
// The Raft log entries for this snapshot.
LogEntries [][]byte
// The replica state at the time the snapshot was generated (never nil).
Expand Down Expand Up @@ -835,77 +837,128 @@ func (r *Replica) applySnapshot(
stats.subsumedReplicas.Sub(start).Seconds()*1000,
)
}
ingestionLog := fmt.Sprintf(
"ingestion=%d@%0.0fms ",
len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000,
)
var ingestionLog string
if inSnap.SSTStorageScratch != nil {
ingestionLog = fmt.Sprintf(
"ingestion=%d@%0.0fms ",
len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000,
)
}
log.Infof(
ctx, "applied snapshot of type %s [%s%s%sid=%s index=%d]", inSnap.snapType, totalLog,
subsumedReplicasLog, ingestionLog, inSnap.SnapUUID.Short(), snap.Metadata.Index,
)
}(timeutil.Now())

unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
var lastTerm uint64
var raftLogSize int64
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
if inSnap.SSTStorageScratch != nil {
unreplicatedSSTFile := &storage.MemFile{}
unreplicatedSST := storage.MakeIngestionSSTWriter(unreplicatedSSTFile)
defer unreplicatedSST.Close()

// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = unreplicatedSST.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {
return err
}
} else {
lastTerm = invalidLastTerm
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, &unreplicatedSST, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {

if err := unreplicatedSST.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
}
} else {
lastTerm = invalidLastTerm
}
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, &unreplicatedSST, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
// Clearing the unreplicated state.
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(r.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
if err = inSnap.Batch.ClearRawRange(unreplicatedStart, unreplicatedEnd); err != nil {
return errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}
}

if err := unreplicatedSST.Finish(); err != nil {
return err
}
if unreplicatedSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemFile first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
// Update HardState.
if err := r.raftMu.stateLoader.SetHardState(ctx, inSnap.Batch, hs); err != nil {
return errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}

// Update Raft entries.
if len(inSnap.LogEntries) > 0 {
logEntries := make([]raftpb.Entry, len(inSnap.LogEntries))
for i, bytes := range inSnap.LogEntries {
if err := protoutil.Unmarshal(bytes, &logEntries[i]); err != nil {
return err
}
}
var sideloadedEntriesSize int64
var err error
logEntries, sideloadedEntriesSize, err = r.maybeSideloadEntriesRaftMuLocked(ctx, logEntries)
if err != nil {
return err
}
raftLogSize += sideloadedEntriesSize
_, lastTerm, raftLogSize, err = r.append(ctx, inSnap.Batch, 0, invalidLastTerm, raftLogSize, logEntries)
if err != nil {
return err
}
} else {
lastTerm = invalidLastTerm
}
r.store.raftEntryCache.Drop(r.RangeID)

// Update TruncatedState if it is unreplicated.
if inSnap.UsesUnreplicatedTruncatedState {
if err := r.raftMu.stateLoader.SetRaftTruncatedState(
ctx, inSnap.Batch, s.TruncatedState,
); err != nil {
return errors.Wrapf(err, "unable to write UnreplicatedTruncatedState to unreplicated SST writer")
}
}
}

Expand Down Expand Up @@ -941,19 +994,30 @@ func (r *Replica) applySnapshot(
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
if err := r.clearSubsumedReplicaDiskData(ctx, inSnap.SSTStorageScratch, inSnap.Batch, s.Desc, subsumedRepls, mergedTombstoneReplicaID); err != nil {
return err
}
stats.subsumedReplicas = timeutil.Now()

// Ingest all SSTs atomically.
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
if inSnap.SSTStorageScratch != nil {
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, inSnap.SSTStorageScratch.SSTs()); err != nil {
return err
}
}
if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
} else {
if fn := r.store.cfg.TestingKnobs.BeforeSnapshotSSTIngestion; fn != nil {
if err := fn(inSnap, inSnap.snapType, nil); err != nil {
return err
}
}
if err := inSnap.Batch.Commit(true /* sync */); err != nil {
return errors.Wrapf(err, "while ingesting batch")
}
}
if err := r.store.engine.IngestExternalFiles(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
stats.ingestion = timeutil.Now()

Expand Down Expand Up @@ -1068,6 +1132,7 @@ func (r *Replica) applySnapshot(
func (r *Replica) clearSubsumedReplicaDiskData(
ctx context.Context,
scratch *SSTSnapshotStorageScratch,
batch storage.Batch,
desc *roachpb.RangeDescriptor,
subsumedRepls []*Replica,
subsumedNextReplicaID roachpb.ReplicaID,
Expand All @@ -1087,31 +1152,44 @@ func (r *Replica) clearSubsumedReplicaDiskData(
destroyReasonRemoved)
sr.mu.Unlock()

// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
// NOTE: We set mustClearRange to true because we are setting
// RangeTombstoneKey. Since Clears and Puts need to be done in increasing
// order of keys, it is not safe to use ClearRangeIter.
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
&subsumedReplSST,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
if scratch != nil {
// We have to create an SST for the subsumed replica's range-id local keys.
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
// NOTE: We set mustClearRange to true because we are setting
// RangeTombstoneKey. Since Clears and Puts need to be done in increasing
// order of keys, it is not safe to use ClearRangeIter.
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
&subsumedReplSST,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
}
}
} else {
if err := sr.preDestroyRaftMuLocked(
ctx,
r.store.Engine(),
batch,
subsumedNextReplicaID,
true, /* clearRangeIDLocalOnly */
true, /* mustClearRange */
); err != nil {
return err
}
}
Expand Down Expand Up @@ -1144,25 +1222,36 @@ func (r *Replica) clearSubsumedReplicaDiskData(
// subsume both r1 and r2 in S1.
for i := range keyRanges {
if totalKeyRanges[i].End.Key.Compare(keyRanges[i].End.Key) > 0 {
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
if scratch != nil {
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
subsumedReplSST.Close()
return err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemFile first.
if err := scratch.WriteSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
}
}
} else {
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
batch,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
return err
}
}
Expand Down
Loading