Skip to content

Commit

Permalink
kvserver,storage: ingest small snapshot as writes
Browse files Browse the repository at this point in the history
Small snapshots cause LSM overload by resulting in many tiny memtable
flushes, which result in high sub-level count, which then needs to be
compensated by running many inefficient compactions from L0 to Lbase.
Despite some compaction scoring changes, we have not been able to
fully eliminate impact of this in foreground traffic as discussed in
cockroachdb/pebble#2832 (comment).

Fixes cockroachdb#109808

Epic: none

Release note (ops change): The cluster setting
kv.snapshot.ingest_as_write_threshold controls the size threshold below
which snapshots are converted to regular writes. It defaults to 100KiB.
  • Loading branch information
sumeerbhola committed Sep 24, 2023
1 parent d42b76a commit 4ef5f3c
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 35 deletions.
97 changes: 66 additions & 31 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
Expand All @@ -40,6 +41,16 @@ import (
"go.etcd.io/raft/v3/raftpb"
)

var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.snapshot.ingest_as_write_threshold",
"size below which a range snapshot ingestion will be performed as a normal write",
int64(util.ConstantWithMetamorphicTestValue(
"kv.snapshot.ingest_as_write_threshold",
100<<10, /* default value is 100KiB */
1<<30, /* metamorphic value is 1GiB */
)))

// replicaRaftStorage implements the raft.Storage interface.
type replicaRaftStorage Replica

Expand Down Expand Up @@ -347,6 +358,10 @@ type IncomingSnapshot struct {
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
// ssts into a WriteBatch if the total size of the ssts is small.
clearedSpans []roachpb.Span
}

func (s IncomingSnapshot) String() string {
Expand Down Expand Up @@ -538,6 +553,7 @@ func (r *Replica) applySnapshot(
ingestion time.Time
}
log.KvDistribution.Infof(ctx, "applying %s", inSnap)
appliedAsWrite := false
defer func(start time.Time) {
var logDetails redact.StringBuilder
logDetails.Printf("total=%0.0fms", timeutil.Since(start).Seconds()*1000)
Expand All @@ -554,21 +570,24 @@ func (r *Replica) applySnapshot(
}
logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()),
stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000)
log.Infof(ctx, "applied %s (%s)", inSnap, logDetails)
var appliedAsWriteStr string
if appliedAsWrite {
appliedAsWriteStr = "as write "
}
log.Infof(ctx, "applied %s %s(%s)", inSnap, appliedAsWriteStr, logDetails)
}(timeutil.Now())

unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST(
unreplicatedSSTFile, clearedSpan, err := writeUnreplicatedSST(
ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader,
)
if err != nil {
return err
}
if nonempty {
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
inSnap.clearedSpans = append(inSnap.clearedSpans, clearedSpan)
// TODO(itsbilal): Write to SST directly in unreplicatedSST rather than
// buffering in a MemObject first.
if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil {
return err
}
// Update Raft entries.
r.store.raftEntryCache.Drop(r.RangeID)
Expand Down Expand Up @@ -600,13 +619,15 @@ func (r *Replica) applySnapshot(
// problematic, as it would prevent this store from ever having a new replica
// of the removed range. In this case, however, it's copacetic, as subsumed
// ranges _can't_ have new replicas.
if err := clearSubsumedReplicaDiskData(
clearedSpans, err := clearSubsumedReplicaDiskData(
// TODO(sep-raft-log): needs access to both engines.
ctx, r.store.ClusterSettings(), r.store.TODOEngine(), inSnap.SSTStorageScratch.WriteSST,
desc, subsumedDescs, mergedTombstoneReplicaID,
); err != nil {
)
if err != nil {
return err
}
inSnap.clearedSpans = append(inSnap.clearedSpans, clearedSpans...)
stats.subsumedReplicas = timeutil.Now()

// Ingest all SSTs atomically.
Expand All @@ -626,9 +647,18 @@ func (r *Replica) applySnapshot(
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
if inSnap.DataSize > snapshotIngestAsWriteThreshold.Get(&r.ClusterSettings().SV) {
if ingestStats, err =
r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil {
return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs())
}
} else {
appliedAsWrite = true
err := r.store.TODOEngine().ConvertFilesToBatchAndCommit(
ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.clearedSpans)
if err != nil {
return errors.Wrapf(err, "while applying as batch %s", inSnap.SSTStorageScratch.SSTs())
}
}
}
if r.store.cfg.KVAdmissionController != nil {
Expand Down Expand Up @@ -783,17 +813,14 @@ func (r *Replica) applySnapshot(
// covers the RangeID-unreplicated keyspace. A range tombstone is
// laid down and the Raft state provided by the arguments is overlaid
// onto it.
//
// TODO(sep-raft-log): when is `nonempty` ever false? We always
// perform a number of writes to this SST.
func writeUnreplicatedSST(
ctx context.Context,
id storage.FullReplicaID,
st *cluster.Settings,
meta raftpb.SnapshotMetadata,
hs raftpb.HardState,
sl *logstore.StateLoader,
) (_ *storage.MemObject, nonempty bool, _ error) {
) (_ *storage.MemObject, clearedSpan roachpb.Span, _ error) {
unreplicatedSSTFile := &storage.MemObject{}
unreplicatedSST := storage.MakeIngestionSSTWriter(
ctx, st, unreplicatedSSTFile,
Expand All @@ -807,21 +834,22 @@ func writeUnreplicatedSST(
unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID)
unreplicatedStart := unreplicatedPrefixKey
unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd()
clearedSpan = roachpb.Span{Key: unreplicatedStart, EndKey: unreplicatedEnd}
if err := unreplicatedSST.ClearRawRange(
unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */
); err != nil {
return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "error clearing range of unreplicated SST writer")
}

// Update HardState.
if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil {
return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer")
}
// We've cleared all the raft state above, so we are forced to write the
// RaftReplicaID again here.
if err := sl.SetRaftReplicaID(
ctx, &unreplicatedSST, id.ReplicaID); err != nil {
return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer")
}

if err := sl.SetRaftTruncatedState(
Expand All @@ -831,13 +859,13 @@ func writeUnreplicatedSST(
Term: kvpb.RaftTerm(meta.Term),
},
); err != nil {
return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer")
}

if err := unreplicatedSST.Finish(); err != nil {
return nil, false, err
return nil, roachpb.Span{}, err
}
return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil
return unreplicatedSSTFile, clearedSpan, nil
}

// clearSubsumedReplicaDiskData clears the on disk data of the subsumed
Expand All @@ -855,7 +883,7 @@ func clearSubsumedReplicaDiskData(
desc *roachpb.RangeDescriptor,
subsumedDescs []*roachpb.RangeDescriptor,
subsumedNextReplicaID roachpb.ReplicaID,
) error {
) (clearedSpans []roachpb.Span, _ error) {
// NB: we don't clear RangeID local key spans here. That happens
// via the call to preDestroyRaftMuLocked.
getKeySpans := func(d *roachpb.RangeDescriptor) []roachpb.Span {
Expand All @@ -880,18 +908,23 @@ func clearSubsumedReplicaDiskData(
ClearUnreplicatedByRangeID: true,
MustUseClearRange: true,
}
subsumedClearedSpans := rditer.Select(subDesc.RangeID, rditer.SelectOpts{
ReplicatedByRangeID: opts.ClearReplicatedByRangeID,
UnreplicatedByRangeID: opts.ClearUnreplicatedByRangeID,
})
clearedSpans = append(clearedSpans, subsumedClearedSpans...)
if err := kvstorage.DestroyReplica(ctx, subDesc.RangeID, reader, &subsumedReplSST, subsumedNextReplicaID, opts); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}

Expand Down Expand Up @@ -937,16 +970,18 @@ func clearSubsumedReplicaDiskData(
kvstorage.ClearRangeThresholdRangeKeys,
); err != nil {
subsumedReplSST.Close()
return err
return nil, err
}
clearedSpans = append(clearedSpans,
roachpb.Span{Key: keySpans[i].EndKey, EndKey: totalKeySpans[i].EndKey})
if err := subsumedReplSST.Finish(); err != nil {
return err
return nil, err
}
if subsumedReplSST.DataSize > 0 {
// TODO(itsbilal): Write to SST directly in subsumedReplSST rather than
// buffering in a MemObject first.
if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil {
return err
return nil, err
}
}
}
Expand All @@ -961,7 +996,7 @@ func clearSubsumedReplicaDiskData(
keySpans[i], totalKeySpans[i])
}
}
return nil
return clearedSpans, nil
}

// clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
msgAppRespCh: make(chan raftpb.Message, 1),
sharedSSTs: sharedSSTs,
doExcise: doExcise,
clearedSpans: keyRanges,
}

timingTag.stop("totalTime")
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ type Engine interface {
// files. These files can be referred to by multiple stores, but are not
// modified or deleted by the Engine doing the ingestion.
IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error)

// PreIngestDelay offers an engine the chance to backpressure ingestions.
// When called, it may choose to block if the engine determines that it is in
// or approaching a state where further ingestions may risk its health.
Expand All @@ -1051,7 +1052,21 @@ type Engine interface {
// counts for the given key span, along with how many of those bytes are on
// remote, as well as specifically external remote, storage.
ApproximateDiskBytes(from, to roachpb.Key) (total, remote, external uint64, _ error)

// ConvertFilesToBatchAndCommit converts local files with the given paths to
// a WriteBatch and commits the batch with sync=true. The files represented
// in paths must not be overlapping -- this is the same contract as
// IngestLocalFiles*. Additionally, clearedSpans represents the spans which
// must be deleted before writing the data contained in these paths.
//
// This method is expected to be used instead of IngestLocalFiles* or
// IngestAndExciseFiles when the sum of the file sizes is small.
//
// TODO(sumeer): support this as an alternative to IngestAndExciseFiles.
// This should be easy since we use NewSSTEngineIterator to read the ssts,
// which supports multiple levels.
ConvertFilesToBatchAndCommit(
ctx context.Context, paths []string, clearedSpans []roachpb.Span,
) error
// CompactRange ensures that the specified range of key value pairs is
// optimized for space efficiency.
CompactRange(start, end roachpb.Key) error
Expand Down
81 changes: 81 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2377,6 +2377,87 @@ func (p *Pebble) BufferedSize() int {
return 0
}

// ConvertFilesToBatchAndCommit implements the Engine interface.
func (p *Pebble) ConvertFilesToBatchAndCommit(
_ context.Context, paths []string, clearedSpans []roachpb.Span,
) error {
files := make([]sstable.ReadableFile, len(paths))
closeFiles := func() {
for i := range files {
if files[i] != nil {
files[i].Close()
}
}
}
for i, fileName := range paths {
f, err := p.FS.Open(fileName)
if err != nil {
closeFiles()
return err
}
files[i] = f
}
iter, err := NewSSTEngineIterator(
[][]sstable.ReadableFile{files},
IterOptions{
KeyTypes: IterKeyTypePointsAndRanges,
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
}, true)
if err != nil {
return err
}
defer iter.Close()

batch := p.NewWriteBatch()
for i := range clearedSpans {
err :=
batch.ClearRawRange(clearedSpans[i].Key, clearedSpans[i].EndKey, true, true)
if err != nil {
return err
}
}
valid, err := iter.SeekEngineKeyGE(EngineKey{Key: roachpb.KeyMin})
for valid {
hasPoint, hasRange := iter.HasPointAndRange()
if hasPoint {
var k EngineKey
if k, err = iter.UnsafeEngineKey(); err != nil {
break
}
var v []byte
if v, err = iter.UnsafeValue(); err != nil {
break
}
if err = batch.PutEngineKey(k, v); err != nil {
break
}
}
if hasRange && iter.RangeKeyChanged() {
var rangeBounds roachpb.Span
if rangeBounds, err = iter.EngineRangeBounds(); err != nil {
break
}
rangeKeys := iter.EngineRangeKeys()
for i := range rangeKeys {
if err = batch.PutEngineRangeKey(rangeBounds.Key, rangeBounds.EndKey, rangeKeys[i].Version,
rangeKeys[i].Value); err != nil {
break
}
}
if err != nil {
break
}
}
valid, err = iter.NextEngineKey()
}
if err != nil {
batch.Close()
return err
}
return batch.Commit(true)
}

type pebbleReadOnly struct {
parent *Pebble
// The iterator reuse optimization in pebbleReadOnly is for servicing a
Expand Down
Loading

0 comments on commit 4ef5f3c

Please sign in to comment.