Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvserver: opt-in "fast" external file snapshotting #120030

Merged
merged 2 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/engineccl/shared_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func iterateReplicaKeySpansShared(
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
Expand All @@ -65,7 +66,7 @@ func iterateReplicaKeySpansShared(
ReplicatedBySpan: desc.RSpan(),
})
span := spans[0]
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile)
}

func init() {
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ message SnapshotRequest {
// metadata present in the snapshot, but not file contents.
bool shared_replicate = 12;

// If true, the snapshot could contain external files. Such files
// will have their metadata present in the snapshot but not the
// file contents.
bool external_replicate = 13;

reserved 1, 4, 6, 7, 8, 9;
}

Expand Down Expand Up @@ -242,6 +247,54 @@ message SnapshotRequest {
uint64 size = 10;
}

// ExternalTable represents one SSTable present in external storage.
// Intended to be the protobuf version of pebble.ExternalFile.
message ExternalTable {
// Used by the Pebble objstorage package to generate new blob storage drivers.
// Reserved for future use.
bytes locator = 1;

// ObjName is the unique name of this sstable on Locator.
string object_name = 2;

// Physical size of the sstable in bytes.
uint64 size = 3;

// StartKey is the loose, inclusive start key of the sstable.
bytes StartKey = 4;
// EndKey is the loose, end key of the sstable. Whether it is
// inclusive or exclusive is controlled by EndKeyIsInclusive.
bytes EndKey = 5;

stevendanna marked this conversation as resolved.
Show resolved Hide resolved
// EndKeyIsInclusive is true if the EndKey is inclusive.
bool EndKeyIsInclusive = 6;

// HasPointKey denote whether this file contains point keys.
bool has_point_key = 7;

// HasRangeKey denote whether this file contains range keys.
bool has_range_key = 8;

// SyntheticPrefix will prepend this suffix to all keys in the file during
// iteration. Note that the backing file itself is not modified.
bytes synthetic_prefix = 9;

// SyntheticSuffix will replace the suffix of every key in the file during
// iteration. Note that the file itself is not modified, rather, every key
// returned by an iterator will have the synthetic suffix.
//
// SyntheticSuffix can only be used under the following conditions:
// - the synthetic suffix must sort before any non-empty suffixes in the
// backing sst (the entire sst, not just the part restricted to Bounds).
// - the backing sst must not contain multiple keys with the same prefix.
bytes synthetic_suffix = 10;

// LSM level of the original sstable. This sstable will go into the same
// level in the destination LSM.
int32 level = 11;

}

Header header = 1;

// A BatchRepr. Multiple kv_batches may be sent across multiple request messages.
Expand All @@ -259,6 +312,8 @@ message SnapshotRequest {
// flag must be set to true before any user keys are streamed.
bool transition_from_shared_to_regular_replicate = 6;

repeated ExternalTable external_tables = 7 [(gogoproto.nullable) = false];

reserved 3;
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ var IterateReplicaKeySpansShared func(
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error

// IterateOptions instructs how points and ranges should be presented to visitor
Expand Down
30 changes: 29 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,12 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting(
"trace logged (set to 0 to disable);", 0,
)

var externalFileSnapshotting = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.snapshot.external_files.enabled",
"enables sending external files as metadata during a snapshot",
false)

// followerSendSnapshot receives a delegate snapshot request and generates the
// snapshot from this replica. The entire process of generating and transmitting
// the snapshot is handled, and errors are propagated back to the leaseholder.
Expand Down Expand Up @@ -3157,7 +3163,28 @@ func (r *Replica) followerSendSnapshot(
// a snapshot for a non-system range. This allows us to send metadata of
// sstables in shared storage as opposed to streaming their contents. Keys
// in higher levels of the LSM are still streamed in the snapshot.
sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
nonSystemRange := snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0
sharedReplicate := r.store.cfg.SharedStorageEnabled && nonSystemRange

// Use external replication if we aren't using shared
// replication, are dealing with a non-system range, are on at
// least 24.1, and our store has external files.
externalReplicate := !sharedReplicate && nonSystemRange &&
r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1) &&
externalFileSnapshotting.Get(&r.store.ClusterSettings().SV)
if externalReplicate {
start := snap.State.Desc.StartKey.AsRawKey()
end := snap.State.Desc.EndKey.AsRawKey()
total, _, external, err := r.store.StateEngine().ApproximateDiskBytes(start, end)
if err != nil {
log.Warningf(ctx, "could not determine if store has external bytes: %v", err)
externalReplicate = false
}

// Enable ExternalReplicate only if we have more
// external bytes than local bytes.
externalReplicate = external > (total - external)
}

// Create new snapshot request header using the delegate snapshot request.
header := kvserverpb.SnapshotRequest_Header{
Expand All @@ -3178,6 +3205,7 @@ func (r *Replica) followerSendSnapshot(
SenderQueueName: req.SenderQueueName,
SenderQueuePriority: req.SenderQueuePriority,
SharedReplicate: sharedReplicate,
ExternalReplicate: externalReplicate,
}
newBatchFn := func() storage.WriteBatch {
return r.store.TODOEngine().NewWriteBatch()
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ type IncomingSnapshot struct {
raftAppliedIndex kvpb.RaftIndex // logging only
msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied
sharedSSTs []pebble.SharedSSTMeta
externalSSTs []pebble.ExternalFile
doExcise bool
// clearedSpans represents the key spans in the existing store that will be
// cleared by doing the Ingest*. This is tracked so that we can convert the
Expand Down Expand Up @@ -673,7 +674,7 @@ func (r *Replica) applySnapshot(
if inSnap.doExcise {
exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals()
if ingestStats, err =
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, exciseSpan); err != nil {
r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, inSnap.externalSSTs, exciseSpan); err != nil {
return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey)
}
} else {
Expand Down
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,9 @@ func (s spanSetReader) ScanInternal(
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile)
return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile)
}

func (s spanSetReader) Close() {
Expand Down Expand Up @@ -788,6 +789,7 @@ func (s spanSetBatch) ScanInternal(
visitRangeDel func(start []byte, end []byte, seqNum uint64) error,
visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
visitExternalFile func(sst *pebble.ExternalFile) error,
) error {
// Only used on Engine.
panic("unimplemented")
Expand Down
Loading
Loading