Skip to content

Commit

Permalink
kvserver: opt-in "fast" external file snapshotting
Browse files Browse the repository at this point in the history
Like SharedSSTs, we may want to send a pebble.ExternalFile's metadata
rather than its content during a snapshot.

This is opt-in via a cluster setting and is only attempted when the
store appears to actually have external files.

Epic: none
Release note: None
  • Loading branch information
stevendanna committed Mar 20, 2024
1 parent d047ae3 commit 85386fd
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 39 deletions.
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ message SnapshotRequest {
// metadata present in the snapshot, but not file contents.
bool shared_replicate = 12;

// If true, the snapshot could contain external files. Such files
// will have their metadata present in the snapshot but not the
// file contents.
bool external_replicate = 13;

reserved 1, 4, 6, 7, 8, 9;
}

Expand Down Expand Up @@ -242,6 +247,54 @@ message SnapshotRequest {
uint64 size = 10;
}

// ExternalTable represents one SSTable present in external storage.
// Intended to be the protobuf version of pebble.ExternalFile.
message ExternalTable {
// Used by the Pebble objstorage package to generate new blob storage drivers.
// Reserved for future use.
bytes locator = 1;

// ObjName is the unique name of this sstable on Locator.
string object_name = 2;

// Physical size of the sstable in bytes.
uint64 size = 3;

// StartKey is the loose, inclusive start key of the sstable.
bytes StartKey = 4;
// EndKey is the loose, end key of the sstable. Whether it is
// inclusive or exclusive is controlled by EndKeyIsInclusive.
bytes EndKey = 5;

// EndKeyIsInclusive is true if the EndKey is inclusive.
bool EndKeyIsInclusive = 6;

// HasPointKey denote whether this file contains point keys.
bool has_point_key = 7;

// HasRangeKey denote whether this file contains range keys.
bool has_range_key = 8;

// SyntheticPrefix will prepend this suffix to all keys in the file during
// iteration. Note that the backing file itself is not modified.
bytes synthetic_prefix = 9;

// SyntheticSuffix will replace the suffix of every key in the file during
// iteration. Note that the file itself is not modified, rather, every key
// returned by an iterator will have the synthetic suffix.
//
// SyntheticSuffix can only be used under the following conditions:
// - the synthetic suffix must sort before any non-empty suffixes in the
// backing sst (the entire sst, not just the part restricted to Bounds).
// - the backing sst must not contain multiple keys with the same prefix.
bytes synthetic_suffix = 10;

// LSM level of the original sstable. This sstable will go into the same
// level in the destination LSM.
int32 level = 11;

}

Header header = 1;

// A BatchRepr. Multiple kv_batches may be sent across multiple request messages.
Expand All @@ -259,6 +312,8 @@ message SnapshotRequest {
// flag must be set to true before any user keys are streamed.
bool transition_from_shared_to_regular_replicate = 6;

repeated ExternalTable external_tables = 7 [(gogoproto.nullable) = false];

reserved 3;
}

Expand Down
30 changes: 29 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,12 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting(
"trace logged (set to 0 to disable);", 0,
)

var externalFileSnapshotting = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.snapshot.external_files.enabled",
"enables sending external files as metadata during a snapshot",
false)

// followerSendSnapshot receives a delegate snapshot request and generates the
// snapshot from this replica. The entire process of generating and transmitting
// the snapshot is handled, and errors are propagated back to the leaseholder.
Expand Down Expand Up @@ -3157,7 +3163,28 @@ func (r *Replica) followerSendSnapshot(
// a snapshot for a non-system range. This allows us to send metadata of
// sstables in shared storage as opposed to streaming their contents. Keys
// in higher levels of the LSM are still streamed in the snapshot.
sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
nonSystemRange := snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
sharedReplicate := r.store.cfg.SharedStorageEnabled && nonSystemRange

// Use external replication if we aren't using shared
// replication, are dealing with a non-system range, are on at
// least 24.1, and our store has external files.
externalReplicate := !sharedReplicate && nonSystemRange &&
r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1) &&
externalFileSnapshotting.Get(&r.store.ClusterSettings().SV)
if externalReplicate {
start := snap.State.Desc.StartKey.AsRawKey()
end := snap.State.Desc.EndKey.AsRawKey()
total, _, external, err := r.store.StateEngine().ApproximateDiskBytes(start, end)
if err != nil {
log.Warningf(ctx, "could not determine if store has external bytes: %v", err)
externalReplicate = false
}

// Enable ExternalReplicate only if we have more
// external bytes than local bytes.
externalReplicate = external > (total - external)
}

// Create new snapshot request header using the delegate snapshot request.
header := kvserverpb.SnapshotRequest_Header{
Expand All @@ -3178,6 +3205,7 @@ func (r *Replica) followerSendSnapshot(
SenderQueueName: req.SenderQueueName,
SenderQueuePriority: req.SenderQueuePriority,
SharedReplicate: sharedReplicate,
ExternalReplicate: externalReplicate,
}
newBatchFn := func() storage.WriteBatch {
return r.store.TODOEngine().NewWriteBatch()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ type IncomingSnapshot struct {
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
externalSSTs []pebble.ExternalFile
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
Expand Down Expand Up @@ -673,7 +674,7 @@ func (r *Replica) applySnapshot(
if inSnap.doExcise {
exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals()
if ingestStats, err =
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, nil /* external */, exciseSpan); err != nil {
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, inSnap.externalSSTs, exciseSpan); err != nil {
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
Expand Down
125 changes: 88 additions & 37 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage"
"github.com/cockroachdb/pebble/objstorage/remote"
"github.com/cockroachdb/pebble/rangekey"
"github.com/cockroachdb/redact"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -494,7 +495,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
// TODO(jeffreyxiao): Re-evaluate as the default range size grows.
keyRanges := rditer.MakeReplicatedKeySpans(header.State.Desc)

doExcise := header.SharedReplicate || (storage.UseExciseForSnapshots.Get(&s.ClusterSettings().SV) &&
doExcise := header.SharedReplicate || header.ExternalReplicate || (storage.UseExciseForSnapshots.Get(&s.ClusterSettings().SV) &&
s.cfg.Settings.Version.IsActive(ctx, clusterversion.V23_2_EnablePebbleFormatVirtualSSTables))
if header.SharedReplicate && !s.cfg.SharedStorageEnabled {
return noSnap, sendSnapshotError(ctx, s, stream, errors.New("cannot accept shared sstables"))
Expand All @@ -521,6 +522,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
log.Event(ctx, "waiting for snapshot batches to begin")

var sharedSSTs []pebble.SharedSSTMeta
var externalSSTs []pebble.ExternalFile

for {
timingTag.start("recv")
Expand All @@ -536,6 +538,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
if req.TransitionFromSharedToRegularReplicate {
doExcise = false
sharedSSTs = nil
externalSSTs = nil
if err := msstw.addRangeDelForLastSpan(); err != nil {
return noSnap, errors.Wrap(err, "adding tombstone for last span")
}
Expand Down Expand Up @@ -644,6 +647,24 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
})
}
}
if len(req.ExternalTables) > 0 && doExcise {
for i := range req.ExternalTables {
sst := req.ExternalTables[i]
externalSSTs = append(externalSSTs, pebble.ExternalFile{
Locator: remote.Locator(sst.Locator),
ObjName: sst.ObjectName,
StartKey: sst.StartKey,
EndKey: sst.EndKey,
EndKeyIsInclusive: sst.EndKeyIsInclusive,
HasPointKey: sst.HasPointKey,
HasRangeKey: sst.HasRangeKey,
SyntheticPrefix: sst.SyntheticPrefix,
SyntheticSuffix: sst.SyntheticSuffix,
Level: uint8(sst.Level),
Size: sst.Size_,
})
}
}
if req.Final {
// We finished receiving all batches and log entries. It's possible that
// we did not receive any key-value pairs for some of the key spans, but
Expand Down Expand Up @@ -680,13 +701,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive(
raftAppliedIndex: header.State.RaftAppliedIndex,
msgAppRespCh: make(chan raftpb.Message, 1),
sharedSSTs: sharedSSTs,
externalSSTs: externalSSTs,
doExcise: doExcise,
clearedSpans: keyRanges,
}

timingTag.stop("totalTime")

kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs))
kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d, external ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs), len(externalSSTs))
return inSnap, nil
}
}
Expand All @@ -704,7 +726,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
// not reflect the log entries sent (which are never sent in newer versions of
// CRDB, as of VersionUnreplicatedTruncatedState).
var bytesSent int64
var kvs, rangeKVs, sharedSSTs int
var kvs, rangeKVs, sharedSSTCount, externalSSTCount int

// These stopwatches allow us to time the various components of Send().
// - totalTimeStopwatch measures the total time spent within this function.
Expand All @@ -730,7 +752,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
// Iterate over all keys (point keys and range keys) and stream out batches of
// key-values.
var b storage.WriteBatch
var ssts []kvserverpb.SnapshotRequest_SharedTable
var sharedSSTs []kvserverpb.SnapshotRequest_SharedTable
var externalSSTs []kvserverpb.SnapshotRequest_ExternalTable
var transitionFromSharedToRegularReplicate bool
defer func() {
if b != nil {
Expand All @@ -739,15 +762,16 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
}()

flushBatch := func() error {
if err := kvSS.sendBatch(ctx, stream, b, ssts, transitionFromSharedToRegularReplicate, timingTag); err != nil {
if err := kvSS.sendBatch(ctx, stream, b, sharedSSTs, externalSSTs, transitionFromSharedToRegularReplicate, timingTag); err != nil {
return err
}
bLen := int64(b.Len())
bytesSent += bLen
recordBytesSent(bLen)
b.Close()
b = nil
ssts = ssts[:0]
sharedSSTs = sharedSSTs[:0]
externalSSTs = externalSSTs[:0]
transitionFromSharedToRegularReplicate = false
return nil
}
Expand All @@ -763,8 +787,9 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
// non-system range, take advantage of shared storage to minimize the amount
// of data we're iterating on and sending over the network.
sharedReplicate := header.SharedReplicate && rditer.IterateReplicaKeySpansShared != nil
externalReplicate := header.ExternalReplicate && rditer.IterateReplicaKeySpansShared != nil
replicatedFilter := rditer.ReplicatedSpansAll
if sharedReplicate {
if sharedReplicate || externalReplicate {
replicatedFilter = rditer.ReplicatedSpansExcludeUser
}

Expand Down Expand Up @@ -829,7 +854,56 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
}

var valBuf []byte
if sharedReplicate {
if sharedReplicate || externalReplicate {
var sharedVisitor func(sst *pebble.SharedSSTMeta) error
if sharedReplicate {
sharedVisitor = func(sst *pebble.SharedSSTMeta) error {
sharedSSTCount++
snap.sharedBackings = append(snap.sharedBackings, sst.Backing)
backing, err := sst.Backing.Get()
if err != nil {
return err
}
ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey {
return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{
UserKey: ik.UserKey,
Trailer: ik.Trailer,
}
}
sharedSSTs = append(sharedSSTs, kvserverpb.SnapshotRequest_SharedTable{
Backing: backing,
Smallest: ikeyToPb(sst.Smallest),
Largest: ikeyToPb(sst.Largest),
SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey),
LargestRangeKey: ikeyToPb(sst.LargestRangeKey),
SmallestPointKey: ikeyToPb(sst.SmallestPointKey),
LargestPointKey: ikeyToPb(sst.LargestPointKey),
Level: int32(sst.Level),
Size_: sst.Size,
})
return nil
}
}
var externalVisitor func(sst *pebble.ExternalFile) error
if externalReplicate {
externalVisitor = func(sst *pebble.ExternalFile) error {
externalSSTCount++
externalSSTs = append(externalSSTs, kvserverpb.SnapshotRequest_ExternalTable{
Locator: []byte(sst.Locator),
ObjectName: sst.ObjName,
Size_: sst.Size,
StartKey: sst.StartKey,
EndKey: sst.EndKey,
EndKeyIsInclusive: sst.EndKeyIsInclusive,
HasPointKey: sst.HasPointKey,
HasRangeKey: sst.HasRangeKey,
SyntheticPrefix: sst.SyntheticPrefix,
SyntheticSuffix: sst.SyntheticSuffix,
Level: int32(sst.Level),
})
return nil
}
}
err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, kvSS.st, kvSS.clusterID, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
kvs++
if b == nil {
Expand Down Expand Up @@ -873,32 +947,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
}
}
return maybeFlushBatch()
}, func(sst *pebble.SharedSSTMeta) error {
sharedSSTs++
snap.sharedBackings = append(snap.sharedBackings, sst.Backing)
backing, err := sst.Backing.Get()
if err != nil {
return err
}
ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey {
return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{
UserKey: ik.UserKey,
Trailer: ik.Trailer,
}
}
ssts = append(ssts, kvserverpb.SnapshotRequest_SharedTable{
Backing: backing,
Smallest: ikeyToPb(sst.Smallest),
Largest: ikeyToPb(sst.Largest),
SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey),
LargestRangeKey: ikeyToPb(sst.LargestRangeKey),
SmallestPointKey: ikeyToPb(sst.SmallestPointKey),
LargestPointKey: ikeyToPb(sst.LargestPointKey),
Level: int32(sst.Level),
Size_: sst.Size,
})
return nil
}, nil /* visitExternalFile */)
}, sharedVisitor, externalVisitor)
if err != nil && errors.Is(err, pebble.ErrInvalidSkipSharedIteration) {
transitionFromSharedToRegularReplicate = true
err = rditer.IterateReplicaKeySpans(ctx, snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */
Expand All @@ -917,15 +966,16 @@ func (kvSS *kvBatchSnapshotStrategy) Send(
timingTag.stop("totalTime")
log.Eventf(ctx, "finished sending snapshot batches, sent a total of %d bytes", bytesSent)

kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d", kvs, rangeKVs, sharedSSTs)
kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d, externalSSTs=%d", kvs, rangeKVs, sharedSSTCount, externalSSTCount)
return bytesSent, nil
}

func (kvSS *kvBatchSnapshotStrategy) sendBatch(
ctx context.Context,
stream outgoingSnapshotStream,
batch storage.WriteBatch,
ssts []kvserverpb.SnapshotRequest_SharedTable,
sharedSSTs []kvserverpb.SnapshotRequest_SharedTable,
externalSSTs []kvserverpb.SnapshotRequest_ExternalTable,
transitionToRegularReplicate bool,
timerTag *snapshotTimingTag,
) error {
Expand All @@ -938,7 +988,8 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch(
timerTag.start("send")
res := stream.Send(&kvserverpb.SnapshotRequest{
KVBatch: batch.Repr(),
SharedTables: ssts,
SharedTables: sharedSSTs,
ExternalTables: externalSSTs,
TransitionFromSharedToRegularReplicate: transitionToRegularReplicate,
})
timerTag.stop("send")
Expand Down

0 comments on commit 85386fd

Please sign in to comment.