Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver,storage: ingest small snapshot as writes #110943

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 85 additions & 34 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -40,6 +41,19 @@ import (
"go.etcd.io/raft/v3/raftpb"
)

var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.snapshot.ingest_as_write_threshold",
"size below which a range snapshot ingestion will be performed as a normal write",
func() int64 {
return int64(util.ConstantWithMetamorphicTestChoice(
"kv.snapshot.ingest_as_write_threshold",
100<<10, /* default value is 100KiB */
1<<30, /* 1GiB causes everything to be a normal write */
0, /* 0B causes everything to be an ingest */
).(int))
}())

// replicaRaftStorage implements the raft.Storage interface.
type replicaRaftStorage Replica

Expand Down Expand Up @@ -337,14 +351,21 @@ type IncomingSnapshot struct {
SSTStorageScratch *SSTSnapshotStorageScratch
FromReplica roachpb.ReplicaDescriptor
// The descriptor in the snapshot, never nil.
Desc *roachpb.RangeDescriptor
DataSize int64
Desc *roachpb.RangeDescriptor
// Size of the key-value pairs.
DataSize int64
// Size of the ssts containing these key-value pairs.
SSTSize int64
SharedSize int64
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
// ssts into a WriteBatch if the total size of the ssts is small.
clearedSpans []roachpb.Span
}

func (s IncomingSnapshot) String() string {
Expand Down Expand Up @@ -534,6 +555,7 @@ func (r *Replica) applySnapshot(
ingestion time.Time
}
log.KvDistribution.Infof(ctx, "applying %s", inSnap)
appliedAsWrite := false
defer func(start time.Time) {
var logDetails redact.StringBuilder
logDetails.Printf("total=%0.0fms", timeutil.Since(start).Seconds()*1000)
Expand All @@ -550,21 +572,25 @@ func (r *Replica) applySnapshot(
}
logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000)
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
var appliedAsWriteStr string
if appliedAsWrite {
appliedAsWriteStr = "as write "
}
log.Infof(ctx, "applied %s %s(%s)", inSnap, appliedAsWriteStr, logDetails)
}(timeutil.Now())

unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST(
clearedSpans := inSnap.clearedSpans
unreplicatedSSTFile, clearedSpan, err := writeUnreplicatedSST(
ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader,
)
if err != nil {
return err
}
if nonempty {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
clearedSpans = append(clearedSpans, clearedSpan)
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)
Expand Down Expand Up @@ -596,13 +622,15 @@ func (r *Replica) applySnapshot(
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := clearSubsumedReplicaDiskData(
clearedSubsumedSpans, err := clearSubsumedReplicaDiskData(
// TODO(sep-raft-log): needs access to both engines.
ctx, r.store.ClusterSettings(), r.store.TODOEngine(), inSnap.SSTStorageScratch.WriteSST,
desc, subsumedDescs, mergedTombstoneReplicaID,
); err != nil {
)
if err != nil {
return err
}
clearedSpans = append(clearedSpans, clearedSubsumedSpans...)
stats.subsumedReplicas = timeutil.Now()

// Ingest all SSTs atomically.
Expand All @@ -612,6 +640,7 @@ func (r *Replica) applySnapshot(
}
}
var ingestStats pebble.IngestOperationStats
var writeBytes uint64
// TODO: separate ingestions for log and statemachine engine. See:
//
// https://github.com/cockroachdb/cockroach/issues/93251
Expand All @@ -622,13 +651,30 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
if inSnap.SSTSize > snapshotIngestAsWriteThreshold.Get(&r.ClusterSettings().SV) {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
} else {
appliedAsWrite = true
err := r.store.TODOEngine().ConvertFilesToBatchAndCommit(
ctx, inSnap.SSTStorageScratch.SSTs(), clearedSpans)
if err != nil {
return errors.Wrapf(err, "while applying as batch %s", inSnap.SSTStorageScratch.SSTs())
}
// Admission control wants the writeBytes to be roughly equivalent to
// the bytes in the SST when these writes are eventually flushed. We use
// the SST size of the incoming snapshot as that approximation. We've
// written additional SSTs to clear some data earlier in this method,
// but we ignore those since the bulk of the data is in the incoming
// snapshot.
writeBytes = uint64(inSnap.SSTSize)
}
}
if r.store.cfg.KVAdmissionController != nil {
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(r.store.StoreID(), ingestStats, 0)
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(
r.store.StoreID(), ingestStats, writeBytes)
}
stats.ingestion = timeutil.Now()

Expand Down Expand Up @@ -779,17 +825,14 @@ func (r *Replica) applySnapshot(
// covers the RangeID-unreplicated keyspace. A range tombstone is
// laid down and the Raft state provided by the arguments is overlaid
// onto it.
//
// TODO(sep-raft-log): when is `nonempty` ever false? We always
// perform a number of writes to this SST.
func writeUnreplicatedSST(
ctx context.Context,
id storage.FullReplicaID,
st *cluster.Settings,
meta raftpb.SnapshotMetadata,
hs raftpb.HardState,
sl *logstore.StateLoader,
) (_ *storage.MemObject, nonempty bool, _ error) {
) (_ *storage.MemObject, clearedSpan roachpb.Span, _ error) {
unreplicatedSSTFile := &storage.MemObject{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
Expand All @@ -803,21 +846,22 @@ func writeUnreplicatedSST(
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
clearedSpan = roachpb.Span{Key: unreplicatedStart, EndKey: unreplicatedEnd}
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
Expand All @@ -827,13 +871,13 @@ func writeUnreplicatedSST(
Term: kvpb.RaftTerm(meta.Term),
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
return nil, roachpb.Span{}, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
return unreplicatedSSTFile, clearedSpan, nil
}

// clearSubsumedReplicaDiskData clears the on disk data of the subsumed
Expand All @@ -851,7 +895,7 @@ func clearSubsumedReplicaDiskData(
desc *roachpb.RangeDescriptor,
subsumedDescs []*roachpb.RangeDescriptor,
subsumedNextReplicaID roachpb.ReplicaID,
) error {
) (clearedSpans []roachpb.Span, _ error) {
// NB: we don't clear RangeID local key spans here. That happens
// via the call to preDestroyRaftMuLocked.
getKeySpans := func(d *roachpb.RangeDescriptor) []roachpb.Span {
Expand All @@ -876,18 +920,23 @@ func clearSubsumedReplicaDiskData(
ClearUnreplicatedByRangeID: true,
MustUseClearRange: true,
}
subsumedClearedSpans := rditer.Select(subDesc.RangeID, rditer.SelectOpts{
ReplicatedByRangeID: opts.ClearReplicatedByRangeID,
UnreplicatedByRangeID: opts.ClearUnreplicatedByRangeID,
})
clearedSpans = append(clearedSpans, subsumedClearedSpans...)
if err := kvstorage.DestroyReplica(ctx, subDesc.RangeID, reader, &subsumedReplSST, subsumedNextReplicaID, opts); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}

Expand Down Expand Up @@ -933,16 +982,18 @@ func clearSubsumedReplicaDiskData(
kvstorage.ClearRangeThresholdRangeKeys,
); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
clearedSpans = append(clearedSpans,
roachpb.Span{Key: keySpans[i].EndKey, EndKey: totalKeySpans[i].EndKey})
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}
}
Expand All @@ -957,7 +1008,7 @@ func clearSubsumedReplicaDiskData(
keySpans[i], totalKeySpans[i])
}
}
return nil
return clearedSpans, nil
}

// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ type multiSSTWriter struct {
// The approximate size of the SST chunk to buffer in memory on the receiver
// before flushing to disk.
sstChunkSize int64
// The total size of SST data. Updated on SST finalization.
// The total size of the key and value pairs (not the total size of the
// SSTs). Updated on SST finalization.
dataSize int64
// The total size of the SSTs.
sstSize int64
// if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the
// same sstable. We rely on the caller to take care of clearing this span
// through a different process (eg. IngestAndExcise on pebble).
Expand Down Expand Up @@ -198,6 +201,7 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error {
return errors.Wrap(err, "failed to finish sst")
}
msstw.dataSize += msstw.currSST.DataSize
msstw.sstSize += int64(msstw.currSST.Meta.Size)
msstw.currSpan++
msstw.currSST.Close()
return nil
Expand Down Expand Up @@ -654,6 +658,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// the data.
timingTag.start("sst")
dataSize, err := msstw.Finish(ctx)
sstSize := msstw.sstSize
if err != nil {
return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot")
}
Expand All @@ -677,11 +682,13 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
FromReplica: header.RaftMessageRequest.FromReplica,
Desc: header.State.Desc,
DataSize: dataSize,
SSTSize: sstSize,
SharedSize: sharedSize,
raftAppliedIndex: header.State.RaftAppliedIndex,
msgAppRespCh: make(chan raftpb.Message, 1),
sharedSSTs: sharedSSTs,
doExcise: doExcise,
clearedSpans: keyRanges,
}

timingTag.stop("totalTime")
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ type Engine interface {
// files. These files can be referred to by multiple stores, but are not
// modified or deleted by the Engine doing the ingestion.
IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error)

// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand All @@ -1028,7 +1029,21 @@ type Engine interface {
// counts for the given key span, along with how many of those bytes are on
// remote, as well as specifically external remote, storage.
ApproximateDiskBytes(from, to roachpb.Key) (total, remote, external uint64, _ error)

// ConvertFilesToBatchAndCommit converts local files with the given paths to
// a WriteBatch and commits the batch with sync=true. The files represented
// in paths must not be overlapping -- this is the same contract as
// IngestLocalFiles*. Additionally, clearedSpans represents the spans which
// must be deleted before writing the data contained in these paths.
//
// This method is expected to be used instead of IngestLocalFiles* or
// IngestAndExciseFiles when the sum of the file sizes is small.
//
// TODO(sumeer): support this as an alternative to IngestAndExciseFiles.
// This should be easy since we use NewSSTEngineIterator to read the ssts,
// which supports multiple levels.
ConvertFilesToBatchAndCommit(
ctx context.Context, paths []string, clearedSpans []roachpb.Span,
) error
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
Expand Down
Loading