Skip to content

Commit

Permalink
kvserver,storage: ingest small snapshot as writes
Browse files Browse the repository at this point in the history
Small snapshots cause LSM overload by resulting in many tiny memtable
flushes, which result in high sub-level count, which then needs to be
compensated by running many inefficient compactions from L0 to Lbase.
Despite some compaction scoring changes, we have not been able to
fully eliminate impact of this in foreground traffic as discussed in
cockroachdb/pebble#2832 (comment).

Fixes cockroachdb#109808

Epic: none

Release note (ops change): The cluster setting
kv.snapshot.ingest_as_write_threshold controls the size threshold below
which snapshots are converted to regular writes. It defaults to 100KiB.
  • Loading branch information
sumeerbhola committed Oct 3, 2023
1 parent f157ff5 commit d00178b
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 39 deletions.
119 changes: 85 additions & 34 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -40,6 +41,19 @@ import (
"go.etcd.io/raft/v3/raftpb"
)

var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.snapshot.ingest_as_write_threshold",
"size below which a range snapshot ingestion will be performed as a normal write",
func() int64 {
return int64(util.ConstantWithMetamorphicTestChoice(
"kv.snapshot.ingest_as_write_threshold",
100<<10, /* default value is 100KiB */
1<<30, /* 1GiB causes everything to be a normal write */
0, /* 0B causes everything to be an ingest */
).(int))
}())

// replicaRaftStorage implements the raft.Storage interface.
type replicaRaftStorage Replica

Expand Down Expand Up @@ -337,14 +351,21 @@ type IncomingSnapshot struct {
SSTStorageScratch *SSTSnapshotStorageScratch
FromReplica roachpb.ReplicaDescriptor
// The descriptor in the snapshot, never nil.
Desc *roachpb.RangeDescriptor
DataSize int64
Desc *roachpb.RangeDescriptor
// Size of the key-value pairs.
DataSize int64
// Size of the ssts containing these key-value pairs.
SSTSize int64
SharedSize int64
placeholder *ReplicaPlaceholder
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
// ssts into a WriteBatch if the total size of the ssts is small.
clearedSpans []roachpb.Span
}

func (s IncomingSnapshot) String() string {
Expand Down Expand Up @@ -534,6 +555,7 @@ func (r *Replica) applySnapshot(
ingestion time.Time
}
log.KvDistribution.Infof(ctx, "applying %s", inSnap)
appliedAsWrite := false
defer func(start time.Time) {
var logDetails redact.StringBuilder
logDetails.Printf("total=%0.0fms", timeutil.Since(start).Seconds()*1000)
Expand All @@ -550,21 +572,25 @@ func (r *Replica) applySnapshot(
}
logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000)
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
var appliedAsWriteStr string
if appliedAsWrite {
appliedAsWriteStr = "as write "
}
log.Infof(ctx, "applied %s %s(%s)", inSnap, appliedAsWriteStr, logDetails)
}(timeutil.Now())

unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST(
clearedSpans := inSnap.clearedSpans
unreplicatedSSTFile, clearedSpan, err := writeUnreplicatedSST(
ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader,
)
if err != nil {
return err
}
if nonempty {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
clearedSpans = append(clearedSpans, clearedSpan)
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)
Expand Down Expand Up @@ -596,13 +622,15 @@ func (r *Replica) applySnapshot(
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := clearSubsumedReplicaDiskData(
clearedSubsumedSpans, err := clearSubsumedReplicaDiskData(
// TODO(sep-raft-log): needs access to both engines.
ctx, r.store.ClusterSettings(), r.store.TODOEngine(), inSnap.SSTStorageScratch.WriteSST,
desc, subsumedDescs, mergedTombstoneReplicaID,
); err != nil {
)
if err != nil {
return err
}
clearedSpans = append(clearedSpans, clearedSubsumedSpans...)
stats.subsumedReplicas = timeutil.Now()

// Ingest all SSTs atomically.
Expand All @@ -612,6 +640,7 @@ func (r *Replica) applySnapshot(
}
}
var ingestStats pebble.IngestOperationStats
var writeBytes uint64
// TODO: separate ingestions for log and statemachine engine. See:
//
// https://github.com/cockroachdb/cockroach/issues/93251
Expand All @@ -622,13 +651,30 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
if inSnap.SSTSize > snapshotIngestAsWriteThreshold.Get(&r.ClusterSettings().SV) {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
} else {
appliedAsWrite = true
err := r.store.TODOEngine().ConvertFilesToBatchAndCommit(
ctx, inSnap.SSTStorageScratch.SSTs(), clearedSpans)
if err != nil {
return errors.Wrapf(err, "while applying as batch %s", inSnap.SSTStorageScratch.SSTs())
}
// Admission control wants the writeBytes to be roughly equivalent to
// the bytes in the SST when these writes are eventually flushed. We use
// the SST size of the incoming snapshot as that approximation. We've
// written additional SSTs to clear some data earlier in this method,
// but we ignore those since the bulk of the data is in the incoming
// snapshot.
writeBytes = uint64(inSnap.SSTSize)
}
}
if r.store.cfg.KVAdmissionController != nil {
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(r.store.StoreID(), ingestStats, 0)
r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(
r.store.StoreID(), ingestStats, writeBytes)
}
stats.ingestion = timeutil.Now()

Expand Down Expand Up @@ -779,17 +825,14 @@ func (r *Replica) applySnapshot(
// covers the RangeID-unreplicated keyspace. A range tombstone is
// laid down and the Raft state provided by the arguments is overlaid
// onto it.
//
// TODO(sep-raft-log): when is `nonempty` ever false? We always
// perform a number of writes to this SST.
func writeUnreplicatedSST(
ctx context.Context,
id storage.FullReplicaID,
st *cluster.Settings,
meta raftpb.SnapshotMetadata,
hs raftpb.HardState,
sl *logstore.StateLoader,
) (_ *storage.MemObject, nonempty bool, _ error) {
) (_ *storage.MemObject, clearedSpan roachpb.Span, _ error) {
unreplicatedSSTFile := &storage.MemObject{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
Expand All @@ -803,21 +846,22 @@ func writeUnreplicatedSST(
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
clearedSpan = roachpb.Span{Key: unreplicatedStart, EndKey: unreplicatedEnd}
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
Expand All @@ -827,13 +871,13 @@ func writeUnreplicatedSST(
Term: kvpb.RaftTerm(meta.Term),
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
return nil, roachpb.Span{}, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
return unreplicatedSSTFile, clearedSpan, nil
}

// clearSubsumedReplicaDiskData clears the on disk data of the subsumed
Expand All @@ -851,7 +895,7 @@ func clearSubsumedReplicaDiskData(
desc *roachpb.RangeDescriptor,
subsumedDescs []*roachpb.RangeDescriptor,
subsumedNextReplicaID roachpb.ReplicaID,
) error {
) (clearedSpans []roachpb.Span, _ error) {
// NB: we don't clear RangeID local key spans here. That happens
// via the call to preDestroyRaftMuLocked.
getKeySpans := func(d *roachpb.RangeDescriptor) []roachpb.Span {
Expand All @@ -876,18 +920,23 @@ func clearSubsumedReplicaDiskData(
ClearUnreplicatedByRangeID: true,
MustUseClearRange: true,
}
subsumedClearedSpans := rditer.Select(subDesc.RangeID, rditer.SelectOpts{
ReplicatedByRangeID: opts.ClearReplicatedByRangeID,
UnreplicatedByRangeID: opts.ClearUnreplicatedByRangeID,
})
clearedSpans = append(clearedSpans, subsumedClearedSpans...)
if err := kvstorage.DestroyReplica(ctx, subDesc.RangeID, reader, &subsumedReplSST, subsumedNextReplicaID, opts); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}

Expand Down Expand Up @@ -933,16 +982,18 @@ func clearSubsumedReplicaDiskData(
kvstorage.ClearRangeThresholdRangeKeys,
); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
clearedSpans = append(clearedSpans,
roachpb.Span{Key: keySpans[i].EndKey, EndKey: totalKeySpans[i].EndKey})
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}
}
Expand All @@ -957,7 +1008,7 @@ func clearSubsumedReplicaDiskData(
keySpans[i], totalKeySpans[i])
}
}
return nil
return clearedSpans, nil
}

// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed
Expand Down
9 changes: 8 additions & 1 deletion pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,11 @@ type multiSSTWriter struct {
// The approximate size of the SST chunk to buffer in memory on the receiver
// before flushing to disk.
sstChunkSize int64
// The total size of SST data. Updated on SST finalization.
// The total size of the key and value pairs (not the total size of the
// SSTs). Updated on SST finalization.
dataSize int64
// The total size of the SSTs.
sstSize int64
// if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the
// same sstable. We rely on the caller to take care of clearing this span
// through a different process (eg. IngestAndExcise on pebble).
Expand Down Expand Up @@ -198,6 +201,7 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error {
return errors.Wrap(err, "failed to finish sst")
}
msstw.dataSize += msstw.currSST.DataSize
msstw.sstSize += int64(msstw.currSST.Meta.Size)
msstw.currSpan++
msstw.currSST.Close()
return nil
Expand Down Expand Up @@ -654,6 +658,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// the data.
timingTag.start("sst")
dataSize, err := msstw.Finish(ctx)
sstSize := msstw.sstSize
if err != nil {
return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot")
}
Expand All @@ -677,11 +682,13 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
FromReplica: header.RaftMessageRequest.FromReplica,
Desc: header.State.Desc,
DataSize: dataSize,
SSTSize: sstSize,
SharedSize: sharedSize,
raftAppliedIndex: header.State.RaftAppliedIndex,
msgAppRespCh: make(chan raftpb.Message, 1),
sharedSSTs: sharedSSTs,
doExcise: doExcise,
clearedSpans: keyRanges,
}

timingTag.stop("totalTime")
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,7 @@ type Engine interface {
// files. These files can be referred to by multiple stores, but are not
// modified or deleted by the Engine doing the ingestion.
IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error)

// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand All @@ -1028,7 +1029,21 @@ type Engine interface {
// counts for the given key span, along with how many of those bytes are on
// remote, as well as specifically external remote, storage.
ApproximateDiskBytes(from, to roachpb.Key) (total, remote, external uint64, _ error)

// ConvertFilesToBatchAndCommit converts local files with the given paths to
// a WriteBatch and commits the batch with sync=true. The files represented
// in paths must not be overlapping -- this is the same contract as
// IngestLocalFiles*. Additionally, clearedSpans represents the spans which
// must be deleted before writing the data contained in these paths.
//
// This method is expected to be used instead of IngestLocalFiles* or
// IngestAndExciseFiles when the sum of the file sizes is small.
//
// TODO(sumeer): support this as an alternative to IngestAndExciseFiles.
// This should be easy since we use NewSSTEngineIterator to read the ssts,
// which supports multiple levels.
ConvertFilesToBatchAndCommit(
ctx context.Context, paths []string, clearedSpans []roachpb.Span,
) error
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
Expand Down
Loading

0 comments on commit d00178b

Please sign in to comment.