From d047ae30205dae2326e3ae51e1cd3deac8a17237 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Mar 2024 00:07:09 +0000 Subject: [PATCH 1/2] storage: add external file arguments to ScanInternal and IngestAndExcise Epic: none Release note: None --- .../storageccl/engineccl/shared_storage.go | 3 ++- pkg/kv/kvserver/rditer/replica_data_iter.go | 1 + pkg/kv/kvserver/replica_raftstorage.go | 2 +- pkg/kv/kvserver/spanset/batch.go | 4 +++- pkg/kv/kvserver/store_snapshot.go | 2 +- pkg/storage/engine.go | 3 ++- pkg/storage/pebble.go | 20 +++++++++++++------ pkg/storage/pebble_batch.go | 1 + 8 files changed, 25 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/storageccl/engineccl/shared_storage.go b/pkg/ccl/storageccl/engineccl/shared_storage.go index 00034fcbc12f..ae5d414496e0 100644 --- a/pkg/ccl/storageccl/engineccl/shared_storage.go +++ b/pkg/ccl/storageccl/engineccl/shared_storage.go @@ -50,6 +50,7 @@ func iterateReplicaKeySpansShared( visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { if !reader.ConsistentIterators() { panic("reader must provide consistent iterators") @@ -65,7 +66,7 @@ func iterateReplicaKeySpansShared( ReplicatedBySpan: desc.RSpan(), }) span := spans[0] - return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile) + return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } func init() { diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index aafbafc028cd..6522da7f8201 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -446,6 +446,7 @@ var IterateReplicaKeySpansShared func( visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error // IterateOptions instructs how points and ranges should be presented to visitor diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 702b0951dc61..d5146bd1b4fb 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -673,7 +673,7 @@ func (r *Replica) applySnapshot( if inSnap.doExcise { exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals() if ingestStats, err = - r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, exciseSpan); err != nil { + r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, nil /* external */, exciseSpan); err != nil { return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey) } } else { diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 552ccc6a4acd..41e2891e52fa 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -452,8 +452,9 @@ func (s spanSetReader) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { - return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return s.r.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } func (s spanSetReader) Close() { @@ -788,6 +789,7 @@ func (s spanSetBatch) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { // Only used on Engine. panic("unimplemented") diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index b771412d9b33..e56a971dee95 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -898,7 +898,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( Size_: sst.Size, }) return nil - }) + }, nil /* visitExternalFile */) if err != nil && errors.Is(err, pebble.ErrInvalidSkipSharedIteration) { transitionFromSharedToRegularReplicate = true err = rditer.IterateReplicaKeySpans(ctx, snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index a47200d1fc15..f55a03bde6eb 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -604,6 +604,7 @@ type Reader interface { visitRangeDel func(start, end []byte, seqNum uint64) error, visitRangeKey func(start, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error // ConsistentIterators returns true if the Reader implementation guarantees // that the different iterators constructed by this Reader will see the same @@ -1036,7 +1037,7 @@ type Engine interface { // that excises an ExciseSpan, and ingests either local or shared sstables or // both. IngestAndExciseFiles( - ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error) + ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, external []pebble.ExternalFile, exciseSpan roachpb.Span) (pebble.IngestOperationStats, error) // IngestExternalFiles is a variant of IngestLocalFiles that takes external // files. These files can be referred to by multiple stores, but are not // modified or deleted by the Engine doing the ingestion. diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 9f3bb5fade74..9230e433574f 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1552,12 +1552,13 @@ func (p *Pebble) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { rawLower := EngineKey{Key: lower}.Encode() rawUpper := EngineKey{Key: upper}.Encode() // TODO(sumeer): set CategoryAndQoS. return p.db.ScanInternal(ctx, sstable.CategoryAndQoS{}, rawLower, rawUpper, visitPointKey, - visitRangeDel, visitRangeKey, visitSharedFile, nil /* visitExternalFile */) + visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } // ConsistentIterators implements the Engine interface. @@ -2132,13 +2133,17 @@ func (p *Pebble) IngestLocalFilesWithStats( // IngestAndExciseFiles implements the Engine interface. func (p *Pebble) IngestAndExciseFiles( - ctx context.Context, paths []string, shared []pebble.SharedSSTMeta, exciseSpan roachpb.Span, + ctx context.Context, + paths []string, + shared []pebble.SharedSSTMeta, + external []pebble.ExternalFile, + exciseSpan roachpb.Span, ) (pebble.IngestOperationStats, error) { rawSpan := pebble.KeyRange{ Start: EngineKey{Key: exciseSpan.Key}.Encode(), End: EngineKey{Key: exciseSpan.EndKey}.Encode(), } - return p.db.IngestAndExcise(paths, shared, nil /* external */, rawSpan) + return p.db.IngestAndExcise(paths, shared, external, rawSpan) } // IngestExternalFiles implements the Engine interface. @@ -2718,8 +2723,9 @@ func (p *pebbleReadOnly) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { - return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile) + return p.parent.ScanInternal(ctx, lower, upper, visitPointKey, visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } // Writer methods are not implemented for pebbleReadOnly. Ideally, the code @@ -2908,12 +2914,13 @@ func (p *pebbleSnapshot) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { rawLower := EngineKey{Key: lower}.Encode() rawUpper := EngineKey{Key: upper}.Encode() // TODO(sumeer): set CategoryAndQoS. return p.snapshot.ScanInternal(ctx, sstable.CategoryAndQoS{}, rawLower, rawUpper, visitPointKey, - visitRangeDel, visitRangeKey, visitSharedFile, nil /* visitExternalFile */) + visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } // pebbleEFOS represents an eventually file-only snapshot created using @@ -3031,12 +3038,13 @@ func (p *pebbleEFOS) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { rawLower := EngineKey{Key: lower}.Encode() rawUpper := EngineKey{Key: upper}.Encode() // TODO(sumeer): set CategoryAndQoS. return p.efos.ScanInternal(ctx, sstable.CategoryAndQoS{}, rawLower, rawUpper, visitPointKey, - visitRangeDel, visitRangeKey, visitSharedFile, nil /* visitExternalFile */) + visitRangeDel, visitRangeKey, visitSharedFile, visitExternalFile) } // ExceedMaxSizeError is the error returned when an export request diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 5b515272ab02..08f0cfe59ba6 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -678,6 +678,7 @@ func (p *pebbleBatch) ScanInternal( visitRangeDel func(start []byte, end []byte, seqNum uint64) error, visitRangeKey func(start []byte, end []byte, keys []rangekey.Key) error, visitSharedFile func(sst *pebble.SharedSSTMeta) error, + visitExternalFile func(sst *pebble.ExternalFile) error, ) error { panic("ScanInternal only supported on Engine and Snapshot.") } From 85386fd45c69d60cc8c890ea233d6fe94e60731c Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 7 Mar 2024 03:10:04 +0000 Subject: [PATCH 2/2] kvserver: opt-in "fast" external file snapshotting Like SharedSSTs, we may want to send a pebble.ExternalFile's metadata rather than its content during a snapshot. This is opt-in via a cluster setting and is only attempted when the store appears to actually have external files. Epic: none Release note: None --- pkg/kv/kvserver/kvserverpb/raft.proto | 55 +++++++++++ pkg/kv/kvserver/replica_command.go | 30 +++++- pkg/kv/kvserver/replica_raftstorage.go | 3 +- pkg/kv/kvserver/store_snapshot.go | 125 +++++++++++++++++-------- 4 files changed, 174 insertions(+), 39 deletions(-) diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 527dab2ccf96..fb2eb61e8cd6 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -196,6 +196,11 @@ message SnapshotRequest { // metadata present in the snapshot, but not file contents. bool shared_replicate = 12; + // If true, the snapshot could contain external files. Such files + // will have their metadata present in the snapshot but not the + // file contents. + bool external_replicate = 13; + reserved 1, 4, 6, 7, 8, 9; } @@ -242,6 +247,54 @@ message SnapshotRequest { uint64 size = 10; } + // ExternalTable represents one SSTable present in external storage. + // Intended to be the protobuf version of pebble.ExternalFile. + message ExternalTable { + // Used by the Pebble objstorage package to generate new blob storage drivers. + // Reserved for future use. + bytes locator = 1; + + // ObjName is the unique name of this sstable on Locator. + string object_name = 2; + + // Physical size of the sstable in bytes. + uint64 size = 3; + + // StartKey is the loose, inclusive start key of the sstable. + bytes StartKey = 4; + // EndKey is the loose, end key of the sstable. Whether it is + // inclusive or exclusive is controlled by EndKeyIsInclusive. + bytes EndKey = 5; + + // EndKeyIsInclusive is true if the EndKey is inclusive. + bool EndKeyIsInclusive = 6; + + // HasPointKey denote whether this file contains point keys. + bool has_point_key = 7; + + // HasRangeKey denote whether this file contains range keys. + bool has_range_key = 8; + + // SyntheticPrefix will prepend this suffix to all keys in the file during + // iteration. Note that the backing file itself is not modified. + bytes synthetic_prefix = 9; + + // SyntheticSuffix will replace the suffix of every key in the file during + // iteration. Note that the file itself is not modified, rather, every key + // returned by an iterator will have the synthetic suffix. + // + // SyntheticSuffix can only be used under the following conditions: + // - the synthetic suffix must sort before any non-empty suffixes in the + // backing sst (the entire sst, not just the part restricted to Bounds). + // - the backing sst must not contain multiple keys with the same prefix. + bytes synthetic_suffix = 10; + + // LSM level of the original sstable. This sstable will go into the same + // level in the destination LSM. + int32 level = 11; + + } + Header header = 1; // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. @@ -259,6 +312,8 @@ message SnapshotRequest { // flag must be set to true before any user keys are streamed. bool transition_from_shared_to_regular_replicate = 6; + repeated ExternalTable external_tables = 7 [(gogoproto.nullable) = false]; + reserved 3; } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 1ba7dee1b749..b51827108508 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3078,6 +3078,12 @@ var traceSnapshotThreshold = settings.RegisterDurationSetting( "trace logged (set to 0 to disable);", 0, ) +var externalFileSnapshotting = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.snapshot.external_files.enabled", + "enables sending external files as metadata during a snapshot", + false) + // followerSendSnapshot receives a delegate snapshot request and generates the // snapshot from this replica. The entire process of generating and transmitting // the snapshot is handled, and errors are propagated back to the leaseholder. @@ -3157,7 +3163,28 @@ func (r *Replica) followerSendSnapshot( // a snapshot for a non-system range. This allows us to send metadata of // sstables in shared storage as opposed to streaming their contents. Keys // in higher levels of the LSM are still streamed in the snapshot. - sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0 + nonSystemRange := snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0 + sharedReplicate := r.store.cfg.SharedStorageEnabled && nonSystemRange + + // Use external replication if we aren't using shared + // replication, are dealing with a non-system range, are on at + // least 24.1, and our store has external files. + externalReplicate := !sharedReplicate && nonSystemRange && + r.store.ClusterSettings().Version.IsActive(ctx, clusterversion.V24_1) && + externalFileSnapshotting.Get(&r.store.ClusterSettings().SV) + if externalReplicate { + start := snap.State.Desc.StartKey.AsRawKey() + end := snap.State.Desc.EndKey.AsRawKey() + total, _, external, err := r.store.StateEngine().ApproximateDiskBytes(start, end) + if err != nil { + log.Warningf(ctx, "could not determine if store has external bytes: %v", err) + externalReplicate = false + } + + // Enable ExternalReplicate only if we have more + // external bytes than local bytes. + externalReplicate = external > (total - external) + } // Create new snapshot request header using the delegate snapshot request. header := kvserverpb.SnapshotRequest_Header{ @@ -3178,6 +3205,7 @@ func (r *Replica) followerSendSnapshot( SenderQueueName: req.SenderQueueName, SenderQueuePriority: req.SenderQueuePriority, SharedReplicate: sharedReplicate, + ExternalReplicate: externalReplicate, } newBatchFn := func() storage.WriteBatch { return r.store.TODOEngine().NewWriteBatch() diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index d5146bd1b4fb..e044de030bb2 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -386,6 +386,7 @@ type IncomingSnapshot struct { raftAppliedIndex kvpb.RaftIndex // logging only msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied sharedSSTs []pebble.SharedSSTMeta + externalSSTs []pebble.ExternalFile doExcise bool // clearedSpans represents the key spans in the existing store that will be // cleared by doing the Ingest*. This is tracked so that we can convert the @@ -673,7 +674,7 @@ func (r *Replica) applySnapshot( if inSnap.doExcise { exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals() if ingestStats, err = - r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, nil /* external */, exciseSpan); err != nil { + r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, inSnap.externalSSTs, exciseSpan); err != nil { return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey) } } else { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index e56a971dee95..e51310780774 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/objstorage/remote" "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/redact" "go.opentelemetry.io/otel/attribute" @@ -494,7 +495,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // TODO(jeffreyxiao): Re-evaluate as the default range size grows. keyRanges := rditer.MakeReplicatedKeySpans(header.State.Desc) - doExcise := header.SharedReplicate || (storage.UseExciseForSnapshots.Get(&s.ClusterSettings().SV) && + doExcise := header.SharedReplicate || header.ExternalReplicate || (storage.UseExciseForSnapshots.Get(&s.ClusterSettings().SV) && s.cfg.Settings.Version.IsActive(ctx, clusterversion.V23_2_EnablePebbleFormatVirtualSSTables)) if header.SharedReplicate && !s.cfg.SharedStorageEnabled { return noSnap, sendSnapshotError(ctx, s, stream, errors.New("cannot accept shared sstables")) @@ -521,6 +522,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( log.Event(ctx, "waiting for snapshot batches to begin") var sharedSSTs []pebble.SharedSSTMeta + var externalSSTs []pebble.ExternalFile for { timingTag.start("recv") @@ -536,6 +538,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( if req.TransitionFromSharedToRegularReplicate { doExcise = false sharedSSTs = nil + externalSSTs = nil if err := msstw.addRangeDelForLastSpan(); err != nil { return noSnap, errors.Wrap(err, "adding tombstone for last span") } @@ -644,6 +647,24 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( }) } } + if len(req.ExternalTables) > 0 && doExcise { + for i := range req.ExternalTables { + sst := req.ExternalTables[i] + externalSSTs = append(externalSSTs, pebble.ExternalFile{ + Locator: remote.Locator(sst.Locator), + ObjName: sst.ObjectName, + StartKey: sst.StartKey, + EndKey: sst.EndKey, + EndKeyIsInclusive: sst.EndKeyIsInclusive, + HasPointKey: sst.HasPointKey, + HasRangeKey: sst.HasRangeKey, + SyntheticPrefix: sst.SyntheticPrefix, + SyntheticSuffix: sst.SyntheticSuffix, + Level: uint8(sst.Level), + Size: sst.Size_, + }) + } + } if req.Final { // We finished receiving all batches and log entries. It's possible that // we did not receive any key-value pairs for some of the key spans, but @@ -680,13 +701,14 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( raftAppliedIndex: header.State.RaftAppliedIndex, msgAppRespCh: make(chan raftpb.Message, 1), sharedSSTs: sharedSSTs, + externalSSTs: externalSSTs, doExcise: doExcise, clearedSpans: keyRanges, } timingTag.stop("totalTime") - kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs)) + kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d, external ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs), len(externalSSTs)) return inSnap, nil } } @@ -704,7 +726,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). var bytesSent int64 - var kvs, rangeKVs, sharedSSTs int + var kvs, rangeKVs, sharedSSTCount, externalSSTCount int // These stopwatches allow us to time the various components of Send(). // - totalTimeStopwatch measures the total time spent within this function. @@ -730,7 +752,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // Iterate over all keys (point keys and range keys) and stream out batches of // key-values. var b storage.WriteBatch - var ssts []kvserverpb.SnapshotRequest_SharedTable + var sharedSSTs []kvserverpb.SnapshotRequest_SharedTable + var externalSSTs []kvserverpb.SnapshotRequest_ExternalTable var transitionFromSharedToRegularReplicate bool defer func() { if b != nil { @@ -739,7 +762,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( }() flushBatch := func() error { - if err := kvSS.sendBatch(ctx, stream, b, ssts, transitionFromSharedToRegularReplicate, timingTag); err != nil { + if err := kvSS.sendBatch(ctx, stream, b, sharedSSTs, externalSSTs, transitionFromSharedToRegularReplicate, timingTag); err != nil { return err } bLen := int64(b.Len()) @@ -747,7 +770,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( recordBytesSent(bLen) b.Close() b = nil - ssts = ssts[:0] + sharedSSTs = sharedSSTs[:0] + externalSSTs = externalSSTs[:0] transitionFromSharedToRegularReplicate = false return nil } @@ -763,8 +787,9 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // non-system range, take advantage of shared storage to minimize the amount // of data we're iterating on and sending over the network. sharedReplicate := header.SharedReplicate && rditer.IterateReplicaKeySpansShared != nil + externalReplicate := header.ExternalReplicate && rditer.IterateReplicaKeySpansShared != nil replicatedFilter := rditer.ReplicatedSpansAll - if sharedReplicate { + if sharedReplicate || externalReplicate { replicatedFilter = rditer.ReplicatedSpansExcludeUser } @@ -829,7 +854,56 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } var valBuf []byte - if sharedReplicate { + if sharedReplicate || externalReplicate { + var sharedVisitor func(sst *pebble.SharedSSTMeta) error + if sharedReplicate { + sharedVisitor = func(sst *pebble.SharedSSTMeta) error { + sharedSSTCount++ + snap.sharedBackings = append(snap.sharedBackings, sst.Backing) + backing, err := sst.Backing.Get() + if err != nil { + return err + } + ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey { + return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{ + UserKey: ik.UserKey, + Trailer: ik.Trailer, + } + } + sharedSSTs = append(sharedSSTs, kvserverpb.SnapshotRequest_SharedTable{ + Backing: backing, + Smallest: ikeyToPb(sst.Smallest), + Largest: ikeyToPb(sst.Largest), + SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey), + LargestRangeKey: ikeyToPb(sst.LargestRangeKey), + SmallestPointKey: ikeyToPb(sst.SmallestPointKey), + LargestPointKey: ikeyToPb(sst.LargestPointKey), + Level: int32(sst.Level), + Size_: sst.Size, + }) + return nil + } + } + var externalVisitor func(sst *pebble.ExternalFile) error + if externalReplicate { + externalVisitor = func(sst *pebble.ExternalFile) error { + externalSSTCount++ + externalSSTs = append(externalSSTs, kvserverpb.SnapshotRequest_ExternalTable{ + Locator: []byte(sst.Locator), + ObjectName: sst.ObjName, + Size_: sst.Size, + StartKey: sst.StartKey, + EndKey: sst.EndKey, + EndKeyIsInclusive: sst.EndKeyIsInclusive, + HasPointKey: sst.HasPointKey, + HasRangeKey: sst.HasRangeKey, + SyntheticPrefix: sst.SyntheticPrefix, + SyntheticSuffix: sst.SyntheticSuffix, + Level: int32(sst.Level), + }) + return nil + } + } err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, kvSS.st, kvSS.clusterID, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { kvs++ if b == nil { @@ -873,32 +947,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( } } return maybeFlushBatch() - }, func(sst *pebble.SharedSSTMeta) error { - sharedSSTs++ - snap.sharedBackings = append(snap.sharedBackings, sst.Backing) - backing, err := sst.Backing.Get() - if err != nil { - return err - } - ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey { - return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{ - UserKey: ik.UserKey, - Trailer: ik.Trailer, - } - } - ssts = append(ssts, kvserverpb.SnapshotRequest_SharedTable{ - Backing: backing, - Smallest: ikeyToPb(sst.Smallest), - Largest: ikeyToPb(sst.Largest), - SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey), - LargestRangeKey: ikeyToPb(sst.LargestRangeKey), - SmallestPointKey: ikeyToPb(sst.SmallestPointKey), - LargestPointKey: ikeyToPb(sst.LargestPointKey), - Level: int32(sst.Level), - Size_: sst.Size, - }) - return nil - }, nil /* visitExternalFile */) + }, sharedVisitor, externalVisitor) if err != nil && errors.Is(err, pebble.ErrInvalidSkipSharedIteration) { transitionFromSharedToRegularReplicate = true err = rditer.IterateReplicaKeySpans(ctx, snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ @@ -917,7 +966,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( timingTag.stop("totalTime") log.Eventf(ctx, "finished sending snapshot batches, sent a total of %d bytes", bytesSent) - kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d", kvs, rangeKVs, sharedSSTs) + kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d, externalSSTs=%d", kvs, rangeKVs, sharedSSTCount, externalSSTCount) return bytesSent, nil } @@ -925,7 +974,8 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( ctx context.Context, stream outgoingSnapshotStream, batch storage.WriteBatch, - ssts []kvserverpb.SnapshotRequest_SharedTable, + sharedSSTs []kvserverpb.SnapshotRequest_SharedTable, + externalSSTs []kvserverpb.SnapshotRequest_ExternalTable, transitionToRegularReplicate bool, timerTag *snapshotTimingTag, ) error { @@ -938,7 +988,8 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( timerTag.start("send") res := stream.Send(&kvserverpb.SnapshotRequest{ KVBatch: batch.Repr(), - SharedTables: ssts, + SharedTables: sharedSSTs, + ExternalTables: externalSSTs, TransitionFromSharedToRegularReplicate: transitionToRegularReplicate, }) timerTag.stop("send")