From 02a4b7ecef39ed9e5c4ae4511a0089ec86d836a0 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Wed, 14 Jun 2023 20:57:44 -0400 Subject: [PATCH] kvserver,storage: Update snapshot strategy to use shared storage If the sender node was created with a SharedStorage, switch to fast ingestion where we ScanInternal() the keys not in shared levels, and just share the metadata for files in shared levels. The sender of the snapshot specifies in the Header that it is using this ability, and the receiver rejects the snapshot if it cannot accept shared snapshots. If ScanInternal() returns an `ErrInvalidSkipSharedIteration`, we switch back to old-style snapshots where the entirety of the range is sent over the stream as SnapshotRequests. Future changes will add better support for detection of when different nodes point to different blob storage buckets / shared storage locations, and incorporate that in rebalancing. Fixes #103028. Release note (general change): Takes advantage of new CLI option, `--experimental-shared-storage` to rebalance faster from node to node. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/kvserverpb/raft.proto | 58 +++ pkg/kv/kvserver/rditer/replica_data_iter.go | 13 +- .../r1/replicatedOnly/user-only/output | 20 +- .../r2/replicatedOnly/user-only/output | 20 +- .../r3/replicatedOnly/user-only/output | 20 +- pkg/kv/kvserver/replica_command.go | 7 + pkg/kv/kvserver/replica_raftstorage.go | 40 +- .../replica_sst_snapshot_storage_test.go | 82 ++++ pkg/kv/kvserver/store.go | 3 + pkg/kv/kvserver/store_snapshot.go | 460 +++++++++++++++--- pkg/server/server.go | 1 + 12 files changed, 602 insertions(+), 123 deletions(-) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index dc9c923f9ba5..913e647fecd7 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -224,6 +224,7 @@ go_library( "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_pebble//objstorage/remote", + "@com_github_cockroachdb_pebble//rangekey", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index eccb862fa541..fa69180cd8fe 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -249,9 +249,57 @@ message SnapshotRequest { // from a particular sending source. double sender_queue_priority = 11; + // If true, the snapshot could contain shared files present in a pre-configured + // or explicitly specified shared.Storage instance. Such files will have their + // metadata present in the snapshot, but not file contents. + bool shared_replicate = 12; + reserved 1, 4; } + // SharedTable represents one shared SSTable present in shared storage. + // Intended to be the protobuf version of pebble.SharedSSTMeta. + message SharedTable { + // Internal key represents a Pebble-internal key. See pebble.InternalKey + // for details on how these keys are used. + message InternalKey { + // User key portion of the internal key. + bytes user_key = 1; + // Trailer portion of the internal key, as defined by Pebble. + uint64 trailer = 2; + } + + // Used by the Pebble objstorage package to resolve a reference to a shared object. + bytes backing = 1; + + // Used by the Pebble objstorage package to generate new blob storage drivers. + // Reserved for future use. + bytes locator = 2; + + // Smallest internal key in the sstable. + InternalKey smallest = 3; + // Largest internal key in the sstable. + InternalKey largest = 4; + // Smallest range key in the sstable. Zero value if no range keys are + // present. + InternalKey smallest_range_key = 5; + // Largest range key in the sstable. Zero value if no range keys are + // present. + InternalKey largest_range_key = 6; + // Smallest point key in the sstable. Zero value if no point keys are + // present. + InternalKey smallest_point_key = 7; + // Largest point key in the sstable. Zero value if no point keys are + // present. + InternalKey largest_point_key = 8; + + // LSM level of the original sstable. This sstable will go into the same + // level in the destination LSM. + int32 level = 9; + // Physical size of the sstable in bytes. + uint64 size = 10; + } + Header header = 1; // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. @@ -259,6 +307,16 @@ message SnapshotRequest { bool final = 4; + repeated SharedTable shared_tables = 5 [(gogoproto.nullable) = false]; + + // If true, signals the receiver that the sender can no longer replicate + // using shared files, even though the Header initially contained + // shared_replicate = true. All contents of this range will be streamed as + // usual beyond this point. This bool must be set to true in a request before + // the end of the snapshot (i.e. before the final = true request), and this + // flag must be set to true before any user keys are streamed. + bool transition_from_shared_to_regular_replicate = 6; + reserved 3; } diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index 6bf86136e838..55da433f215a 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -304,9 +304,10 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) { // IterateReplicaKeySpans iterates over each of a range's key spans, and calls // the given visitor with an iterator over its data. Specifically, it iterates -// over the spans returned by either makeAllKeySpans or MakeReplicatedKeySpans, -// and for each one provides first a point key iterator and then a range key -// iterator. This is the expected order for Raft snapshots. +// over the spans returned by a Select() over all spans or replicated only spans +// (with replicatedSpansFilter applied on replicated spans), and for each one +// provides first a point key iterator and then a range key iterator. This is the +// expected order for Raft snapshots. // // The iterator will be pre-seeked to the span, and is provided along with the // key span and key type (point or range). Iterators that have no data are @@ -328,9 +329,11 @@ func IterateReplicaKeySpans( var spans []roachpb.Span if replicatedOnly { spans = Select(desc.RangeID, SelectOpts{ - ReplicatedSpansFilter: replicatedSpansFilter, ReplicatedBySpan: desc.RSpan(), - ReplicatedByRangeID: true, + ReplicatedSpansFilter: replicatedSpansFilter, + // NB: We exclude ReplicatedByRangeID if replicatedSpansFilter is + // ReplicatedSpansUserOnly. + ReplicatedByRangeID: replicatedSpansFilter != ReplicatedSpansUserOnly, }) } else { spans = Select(desc.RangeID, SelectOpts{ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output index 0d6abd2a4650..ffd95d052634 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/1/{r""-s""} | 016989726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/1/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/1/{r""-s""} | 016989726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/1/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/1/{r""-s""} | 016989726c67632d | | | /Local/RangeID/1/r/RangeGCThreshold | -| /Local/RangeID/1/{r""-s""} | 016989727261736b | | | /Local/RangeID/1/r/RangeAppliedState | -| /Local/RangeID/1/{r""-s""} | 01698972726c6c2d | | | /Local/RangeID/1/r/RangeLease | -| /Local/RangeID/1/{r""-s""} | 016989723a61 | 016989723a78 | 000000000000000109 | /Local/RangeID/1/r":{a"-x"}/0.000000001,0 | -| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 | -| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 | -| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 | +| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 | +| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output index 704f2b962503..c70d703437d1 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/2/{r""-s""} | 01698a726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/2/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/2/{r""-s""} | 01698a726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/2/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/2/{r""-s""} | 01698a726c67632d | | | /Local/RangeID/2/r/RangeGCThreshold | -| /Local/RangeID/2/{r""-s""} | 01698a727261736b | | | /Local/RangeID/2/r/RangeAppliedState | -| /Local/RangeID/2/{r""-s""} | 01698a72726c6c2d | | | /Local/RangeID/2/r/RangeLease | -| /Local/RangeID/2/{r""-s""} | 01698a723a61 | 01698a723a78 | 000000000000000109 | /Local/RangeID/2/r":{a"-x"}/0.000000001,0 | -| {b-c} | 62 | | 0000000000000001 | "b"/0.000000001,0 | -| {b-c} | 62ffffffff | | 0000000000000001 | "b\xff\xff\xff\xff"/0.000000001,0 | -| {b-c} | 62 | 63 | 000000000000000109 | {b-c}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {b-c} | 62 | | 0000000000000001 | "b"/0.000000001,0 | +| {b-c} | 62ffffffff | | 0000000000000001 | "b\xff\xff\xff\xff"/0.000000001,0 | +| {b-c} | 62 | 63 | 000000000000000109 | {b-c}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output index b5130d147176..9949d61859e3 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/3/{r""-s""} | 01698b726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/3/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/3/{r""-s""} | 01698b726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/3/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/3/{r""-s""} | 01698b726c67632d | | | /Local/RangeID/3/r/RangeGCThreshold | -| /Local/RangeID/3/{r""-s""} | 01698b727261736b | | | /Local/RangeID/3/r/RangeAppliedState | -| /Local/RangeID/3/{r""-s""} | 01698b72726c6c2d | | | /Local/RangeID/3/r/RangeLease | -| /Local/RangeID/3/{r""-s""} | 01698b723a61 | 01698b723a78 | 000000000000000109 | /Local/RangeID/3/r":{a"-x"}/0.000000001,0 | -| {c-d} | 63 | | 0000000000000001 | "c"/0.000000001,0 | -| {c-d} | 63ffffffff | | 0000000000000001 | "c\xff\xff\xff\xff"/0.000000001,0 | -| {c-d} | 63 | 64 | 000000000000000109 | {c-d}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {c-d} | 63 | | 0000000000000001 | "c"/0.000000001,0 | +| {c-d} | 63ffffffff | | 0000000000000001 | "c\xff\xff\xff\xff"/0.000000001,0 | +| {c-d} | 63 | 64 | 000000000000000109 | {c-d}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 2daefe8e8d0f..5bd28858c209 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3138,6 +3138,12 @@ func (r *Replica) followerSendSnapshot( // explicitly for snapshots going out to followers. snap.State.DeprecatedUsingAppliedStateKey = true + // Use shared replication if shared storage is enabled and we're sending + // a snapshot for a non-system range. This allows us to send metadata of + // sstables in shared storage as opposed to streaming their contents. Keys + // in higher levels of the LSM are still streamed in the snapshot. + sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0 + // Create new snapshot request header using the delegate snapshot request. header := kvserverpb.SnapshotRequest_Header{ State: snap.State, @@ -3160,6 +3166,7 @@ func (r *Replica) followerSendSnapshot( SenderQueuePriority: req.SenderQueuePriority, Strategy: kvserverpb.SnapshotRequest_KV_BATCH, Type: req.Type, + SharedReplicate: sharedReplicate, } newBatchFn := func() storage.WriteBatch { return r.store.TODOEngine().NewWriteBatch() diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bd269d450ac4..d53fd1c58d93 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/redact" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -279,9 +280,10 @@ type OutgoingSnapshot struct { // The Pebble snapshot that will be streamed from. EngineSnap storage.Reader // The replica state within the snapshot. - State kvserverpb.ReplicaState - snapType kvserverpb.SnapshotRequest_Type - onClose func() + State kvserverpb.ReplicaState + snapType kvserverpb.SnapshotRequest_Type + sharedBackings []objstorage.RemoteObjectBackingHandle + onClose func() } func (s OutgoingSnapshot) String() string { @@ -297,6 +299,9 @@ func (s OutgoingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { // Close releases the resources associated with the snapshot. func (s *OutgoingSnapshot) Close() { s.EngineSnap.Close() + for i := range s.sharedBackings { + s.sharedBackings[i].Close() + } if s.onClose != nil { s.onClose() } @@ -311,10 +316,13 @@ type IncomingSnapshot struct { // The descriptor in the snapshot, never nil. Desc *roachpb.RangeDescriptor DataSize int64 + SharedSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder raftAppliedIndex kvpb.RaftIndex // logging only msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied + sharedSSTs []pebble.SharedSSTMeta + doExcise bool } func (s IncomingSnapshot) String() string { @@ -510,6 +518,12 @@ func (r *Replica) applySnapshot( logDetails.Printf(" subsumedReplicas=%d@%0.0fms", len(subsumedRepls), stats.subsumedReplicas.Sub(start).Seconds()*1000) } + if len(inSnap.sharedSSTs) > 0 { + logDetails.Printf(" shared=%d sharedSize=%s", len(inSnap.sharedSSTs), humanizeutil.IBytes(inSnap.SharedSize)) + } + if inSnap.doExcise { + logDetails.Printf(" excise=true") + } logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()), stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000) log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) @@ -574,12 +588,20 @@ func (r *Replica) applySnapshot( } } var ingestStats pebble.IngestOperationStats - if ingestStats, err = - // TODO: separate ingestions for log and statemachine engine. See: - // - // https://github.com/cockroachdb/cockroach/issues/93251 - r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { - return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + // TODO: separate ingestions for log and statemachine engine. See: + // + // https://github.com/cockroachdb/cockroach/issues/93251 + if inSnap.doExcise { + exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals() + if ingestStats, err = + r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, exciseSpan); err != nil { + return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey) + } + } else { + if ingestStats, err = + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + } } if r.store.cfg.KVAdmissionController != nil { r.store.cfg.KVAdmissionController.SnapshotIngested(r.store.StoreID(), ingestStats) diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index caa8174cad7b..c84666678912 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "fmt" io "io" "path/filepath" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -277,6 +279,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) { msstw, err := newMultiSSTWriter( ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, + false, /* skipRangeDelForLastSpan */ ) require.NoError(t, err) _, err = msstw.Finish(ctx) @@ -312,6 +315,85 @@ func TestMultiSSTWriterInitSST(t *testing.T) { } } +// TestMultiSSTWriterAddLastSpan tests that multiSSTWriter initializes each of +// the SST files associated with the replicated key ranges by writing a range +// deletion tombstone that spans the entire range of each respectively, except +// for the last span which only gets a rangedel when explicitly added. +func TestMultiSSTWriterAddLastSpan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + addRangeDels := []bool{false, true} + for _, addRangeDel := range addRangeDels { + t.Run(fmt.Sprintf("addRangeDel=%v", addRangeDel), func(t *testing.T) { + ctx := context.Background() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, eng := newOnDiskEngine(ctx, t) + defer cleanup() + defer eng.Close() + + sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter) + scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + keySpans := rditer.MakeReplicatedKeySpans(&desc) + + msstw, err := newMultiSSTWriter( + ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, + true, /* skipRangeDelForLastSpan */ + ) + require.NoError(t, err) + if addRangeDel { + require.NoError(t, msstw.addRangeDelForLastSpan()) + } + testKey := storage.MVCCKey{Key: roachpb.RKey("d1").AsRawKey(), Timestamp: hlc.Timestamp{WallTime: 1}} + testEngineKey, _ := storage.DecodeEngineKey(storage.EncodeMVCCKey(testKey)) + require.NoError(t, msstw.Put(ctx, testEngineKey, []byte("foo"))) + _, err = msstw.Finish(ctx) + require.NoError(t, err) + + var actualSSTs [][]byte + fileNames := msstw.scratch.SSTs() + for _, file := range fileNames { + sst, err := fs.ReadFile(eng, file) + require.NoError(t, err) + actualSSTs = append(actualSSTs, sst) + } + + // Construct an SST file for each of the key ranges and write a rangedel + // tombstone that spans from Start to End. + var expectedSSTs [][]byte + for i, s := range keySpans { + func() { + sstFile := &storage.MemObject{} + sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile) + defer sst.Close() + if i < len(keySpans)-1 || addRangeDel { + err := sst.ClearRawRange(s.Key, s.EndKey, true, true) + require.NoError(t, err) + } + if i == len(keySpans)-1 { + require.NoError(t, sst.PutEngineKey(testEngineKey, []byte("foo"))) + } + err = sst.Finish() + require.NoError(t, err) + expectedSSTs = append(expectedSSTs, sstFile.Data()) + }() + } + + require.Equal(t, len(actualSSTs), len(expectedSSTs)) + for i := range fileNames { + require.Equal(t, actualSSTs[i], expectedSSTs[i]) + } + }) + } +} + func newOnDiskEngine(ctx context.Context, t *testing.T) (func(), storage.Engine) { dir, cleanup := testutils.TempDir(t) eng, err := storage.Open( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 5239e1e0af05..4fb367179aea 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1198,6 +1198,9 @@ type StoreConfig struct { // data structure useful for retrieving span configs. Only available if // SpanConfigsDisabled is unset. SpanConfigSubscriber spanconfig.KVSubscriber + // SharedStorageEnabled stores whether this store is configured with a + // shared.Storage instance and can accept shared snapshots. + SharedStorageEnabled bool // KVAdmissionController is used for admission control. KVAdmissionController kvadmission.Controller diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 54bb08e74f81..44333c0a998a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/redact" "go.etcd.io/raft/v3/raftpb" "go.opentelemetry.io/otel/attribute" @@ -182,6 +184,10 @@ type multiSSTWriter struct { sstChunkSize int64 // The total size of SST data. Updated on SST finalization. dataSize int64 + // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the + // same sstable. We rely on the caller to take care of clearing this span + // through a different process (eg. IngestAndExcise on pebble). + skipRangeDelForLastSpan bool } func newMultiSSTWriter( @@ -190,12 +196,14 @@ func newMultiSSTWriter( scratch *SSTSnapshotStorageScratch, keySpans []roachpb.Span, sstChunkSize int64, + skipRangeDelForLastSpan bool, ) (multiSSTWriter, error) { msstw := multiSSTWriter{ - st: st, - scratch: scratch, - keySpans: keySpans, - sstChunkSize: sstChunkSize, + st: st, + scratch: scratch, + keySpans: keySpans, + sstChunkSize: sstChunkSize, + skipRangeDelForLastSpan: skipRangeDelForLastSpan, } if err := msstw.initSST(ctx); err != nil { return msstw, err @@ -210,6 +218,11 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error { } newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile) msstw.currSST = newSST + if msstw.skipRangeDelForLastSpan && msstw.currSpan == len(msstw.keySpans)-1 { + // Skip this ClearRange, as it will be excised at ingestion time in the + // engine instead. + return nil + } if err := msstw.currSST.ClearRawRange( msstw.keySpans[msstw.currSpan].Key, msstw.keySpans[msstw.currSpan].EndKey, true /* pointKeys */, true, /* rangeKeys */ @@ -231,8 +244,33 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return nil } -func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { - for msstw.keySpans[msstw.currSpan].EndKey.Compare(key.Key) <= 0 { +// addRangeDelForLastSpan allows us to explicitly add a deletion tombstone +// for the last span in the msstw, if it was instantiated with the expectation +// that no tombstone was necessary. +func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { + if !msstw.skipRangeDelForLastSpan { + // Nothing to do. + return nil + } + if msstw.currSpan < len(msstw.keySpans)-1 { + // When we switch to the last key span, we will just add a rangedel for it. + // Set skipRangeDelForLastSpan to false. + msstw.skipRangeDelForLastSpan = false + return nil + } + if msstw.currSpan > len(msstw.keySpans)-1 { + panic("cannot addRangeDel if sst writer has moved past user keys") + } + panic("multiSSTWriter already added keys to sstable that cannot be deleted by a rangedel/rangekeydel within it") +} + +// rolloverSST rolls the underlying SST writer over to the appropriate SST +// writer for writing a point/range key at key. For point keys, endKey and key +// must equal each other. +func (msstw *multiSSTWriter) rolloverSST( + ctx context.Context, key roachpb.Key, endKey roachpb.Key, +) error { + for msstw.keySpans[msstw.currSpan].EndKey.Compare(key) <= 0 { // Finish the current SST, write to the file, and move to the next key // range. if err := msstw.finalizeSST(ctx); err != nil { @@ -242,8 +280,20 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, val return err } } - if msstw.keySpans[msstw.currSpan].Key.Compare(key.Key) > 0 { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keySpans) + if msstw.keySpans[msstw.currSpan].Key.Compare(key) > 0 || + msstw.keySpans[msstw.currSpan].EndKey.Compare(endKey) < 0 { + if !key.Equal(endKey) { + return errors.AssertionFailedf("client error: expected %s to fall in one of %s", + roachpb.Span{Key: key, EndKey: endKey}, msstw.keySpans) + } + return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key, msstw.keySpans) + } + return nil +} + +func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { + if err := msstw.rolloverSST(ctx, key.Key, key.Key); err != nil { + return err } if err := msstw.currSST.PutEngineKey(key, value); err != nil { return errors.Wrap(err, "failed to put in sst") @@ -251,26 +301,87 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, val return nil } +func (msstw *multiSSTWriter) PutInternalPointKey( + ctx context.Context, key []byte, kind pebble.InternalKeyKind, val []byte, +) error { + decodedKey, ok := storage.DecodeEngineKey(key) + if !ok { + return errors.New("cannot decode engine key") + } + if err := msstw.rolloverSST(ctx, decodedKey.Key, decodedKey.Key); err != nil { + return err + } + var err error + switch kind { + case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete: + err = msstw.currSST.PutEngineKey(decodedKey, val) + case pebble.InternalKeyKindDelete, pebble.InternalKeyKindDeleteSized: + err = msstw.currSST.ClearEngineKey(decodedKey, storage.ClearOptions{ValueSizeKnown: false}) + default: + err = errors.New("unexpected key kind") + } + if err != nil { + return errors.Wrap(err, "failed to put in sst") + } + return nil +} + +func decodeRangeStartEnd( + start, end []byte, +) (decodedStart, decodedEnd storage.EngineKey, err error) { + var emptyKey storage.EngineKey + decodedStart, ok := storage.DecodeEngineKey(start) + if !ok { + return emptyKey, emptyKey, errors.New("cannot decode start engine key") + } + decodedEnd, ok = storage.DecodeEngineKey(end) + if !ok { + return emptyKey, emptyKey, errors.New("cannot decode end engine key") + } + if decodedStart.Key.Compare(decodedEnd.Key) >= 0 { + return emptyKey, emptyKey, errors.AssertionFailedf("start key %s must be before end key %s", end, start) + } + return decodedStart, decodedEnd, nil +} + +func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, end []byte) error { + decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) + if err != nil { + return err + } + if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + return err + } + if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil { + return errors.Wrap(err, "failed to put range delete in sst") + } + return nil +} + +func (msstw *multiSSTWriter) PutInternalRangeKey( + ctx context.Context, start, end []byte, key rangekey.Key, +) error { + decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) + if err != nil { + return err + } + if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + return err + } + if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil { + return errors.Wrap(err, "failed to put range key in sst") + } + return nil +} + func (msstw *multiSSTWriter) PutRangeKey( ctx context.Context, start, end roachpb.Key, suffix []byte, value []byte, ) error { if start.Compare(end) >= 0 { return errors.AssertionFailedf("start key %s must be before end key %s", end, start) } - for msstw.keySpans[msstw.currSpan].EndKey.Compare(start) <= 0 { - // Finish the current SST, write to the file, and move to the next key - // range. - if err := msstw.finalizeSST(ctx); err != nil { - return err - } - if err := msstw.initSST(ctx); err != nil { - return err - } - } - if msstw.keySpans[msstw.currSpan].Key.Compare(start) > 0 || - msstw.keySpans[msstw.currSpan].EndKey.Compare(end) < 0 { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", - roachpb.Span{Key: start, EndKey: end}, msstw.keySpans) + if err := msstw.rolloverSST(ctx, start, end); err != nil { + return err } if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { return errors.Wrap(err, "failed to put range key in sst") @@ -358,6 +469,26 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { return tags } +// stubBackingHandle is a stub implementation of RemoteObjectBackingHandle +// that just wraps a RemoteObjectBacking. This is used by a snapshot receiver as +// it is on a different node than the one that created the original +// RemoteObjectBackingHandle, so the Close() function is a no-op. +type stubBackingHandle struct { + backing objstorage.RemoteObjectBacking +} + +// Get implements the RemoteObjectBackingHandle interface. +func (s stubBackingHandle) Get() (objstorage.RemoteObjectBacking, error) { + return s.backing, nil +} + +// Close implements the RemoteObjectBackingHandle interface. +func (s stubBackingHandle) Close() { + // No-op. +} + +var _ objstorage.RemoteObjectBackingHandle = &stubBackingHandle{} + // Receive implements the snapshotStrategy interface. // // NOTE: This function assumes that the point and range (e.g. MVCC range @@ -410,7 +541,25 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // At the moment we'll write at most five SSTs. // TODO(jeffreyxiao): Re-evaluate as the default range size grows. keyRanges := rditer.MakeReplicatedKeySpans(header.State.Desc) - msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize) + + doExcise := header.SharedReplicate + if doExcise && !s.cfg.SharedStorageEnabled { + return noSnap, sendSnapshotError(ctx, s, stream, errors.New("cannot accept shared sstables")) + } + + // We rely on the last keyRange passed into multiSSTWriter being the user key + // span. If the sender signals that it can no longer do shared replication + // (with a TransitionFromSharedToRegularReplicate = true), we will have to + // switch to adding a rangedel for that span. Since multiSSTWriter acts on an + // opaque slice of keyRanges, we just tell it to add a rangedel for the last + // span. To avoid bugs, assert on the last span in keyRanges actually being + // equal to the user key span. + if doExcise { + if !keyRanges[len(keyRanges)-1].Equal(header.State.Desc.KeySpan().AsRawSpanWithNoLocals()) { + return noSnap, errors.AssertionFailedf("last span in multiSSTWriter did not equal the user key span: %s", keyRanges[len(keyRanges)-1].String()) + } + } + msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize, doExcise) if err != nil { return noSnap, err } @@ -418,6 +567,8 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( log.Event(ctx, "waiting for snapshot batches to begin") + var sharedSSTs []pebble.SharedSSTMeta + for { timingTag.start("recv") req, err := stream.Recv() @@ -429,6 +580,13 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( err := errors.New("client error: provided a header mid-stream") return noSnap, sendSnapshotError(snapshotCtx, s, stream, err) } + if req.TransitionFromSharedToRegularReplicate { + doExcise = false + sharedSSTs = nil + if err := msstw.addRangeDelForLastSpan(); err != nil { + return noSnap, errors.Wrap(err, "adding tombstone for last span") + } + } if req.KVBatch != nil { recordBytesReceived(int64(len(req.KVBatch))) @@ -449,7 +607,45 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") } + case pebble.InternalKeyKindDelete, pebble.InternalKeyKindDeleteSized: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + if err := msstw.PutInternalPointKey(ctx, batchReader.Key(), batchReader.KeyKind(), nil); err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + case pebble.InternalKeyKindRangeDelete: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + start := batchReader.Key() + end, err := batchReader.EndKey() + if err != nil { + return noSnap, err + } + if err := msstw.PutInternalRangeDelete(ctx, start, end); err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + case pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + start := batchReader.Key() + end, err := batchReader.EndKey() + if err != nil { + return noSnap, err + } + rangeKeys, err := batchReader.RawRangeKeys() + if err != nil { + return noSnap, err + } + for _, rkv := range rangeKeys { + err := msstw.PutInternalRangeKey(ctx, start, end, rkv) + if err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + } case pebble.InternalKeyKindRangeKeySet: start, err := batchReader.EngineKey() if err != nil { @@ -476,6 +672,22 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } timingTag.stop("sst") } + if len(req.SharedTables) > 0 && doExcise { + for i := range req.SharedTables { + sst := req.SharedTables[i] + sharedSSTs = append(sharedSSTs, pebble.SharedSSTMeta{ + Backing: stubBackingHandle{sst.Backing}, + Smallest: pebble.InternalKey{UserKey: sst.Smallest.UserKey, Trailer: sst.Smallest.Trailer}, + Largest: pebble.InternalKey{UserKey: sst.Largest.UserKey, Trailer: sst.Largest.Trailer}, + SmallestRangeKey: pebble.InternalKey{UserKey: sst.SmallestRangeKey.UserKey, Trailer: sst.SmallestRangeKey.Trailer}, + LargestRangeKey: pebble.InternalKey{UserKey: sst.LargestRangeKey.UserKey, Trailer: sst.LargestRangeKey.Trailer}, + SmallestPointKey: pebble.InternalKey{UserKey: sst.SmallestPointKey.UserKey, Trailer: sst.SmallestPointKey.Trailer}, + LargestPointKey: pebble.InternalKey{UserKey: sst.LargestPointKey.UserKey, Trailer: sst.LargestPointKey.Trailer}, + Level: uint8(sst.Level), + Size: sst.Size_, + }) + } + } if req.Final { // We finished receiving all batches and log entries. It's possible that // we did not receive any key-value pairs for some of the key spans, but @@ -489,6 +701,10 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( msstw.Close() timingTag.stop("sst") log.Eventf(ctx, "all data received from snapshot and all SSTs were finalized") + var sharedSize int64 + for i := range sharedSSTs { + sharedSize += int64(sharedSSTs[i].Size) + } snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { @@ -502,14 +718,17 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( FromReplica: header.RaftMessageRequest.FromReplica, Desc: header.State.Desc, DataSize: dataSize, + SharedSize: sharedSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, msgAppRespCh: make(chan raftpb.Message, 1), + sharedSSTs: sharedSSTs, + doExcise: doExcise, } timingTag.stop("totalTime") - kvSS.status = redact.Sprintf("ssts: %d", len(kvSS.scratch.SSTs())) + kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs)) return inSnap, nil } } @@ -528,7 +747,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). var bytesSent int64 - var kvs, rangeKVs int + var kvs, rangeKVs, sharedSSTs int // These stopwatches allow us to time the various components of Send(). // - totalTimeStopwatch measures the total time spent within this function. @@ -554,6 +773,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // Iterate over all keys (point keys and range keys) and stream out batches of // key-values. var b storage.WriteBatch + var ssts []kvserverpb.SnapshotRequest_SharedTable + var transitionFromSharedToRegularReplicate bool defer func() { if b != nil { b.Close() @@ -561,7 +782,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( }() flushBatch := func() error { - if err := kvSS.sendBatch(ctx, stream, b, timingTag); err != nil { + if err := kvSS.sendBatch(ctx, stream, b, ssts, transitionFromSharedToRegularReplicate, timingTag); err != nil { return err } bLen := int64(b.Len()) @@ -569,6 +790,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( recordBytesSent(bLen) b.Close() b = nil + ssts = ssts[:0] + transitionFromSharedToRegularReplicate = false return nil } @@ -579,63 +802,154 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return nil } - err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true /* replicatedOnly */, rditer.ReplicatedSpansAll, - func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { - timingTag.start("iter") - defer timingTag.stop("iter") + // If snapshots containing shared files are allowed, and this range is a + // non-system range, take advantage of shared storage to minimize the amount + // of data we're iterating on and sending over the network. + sharedReplicate := header.SharedReplicate + replicatedFilter := rditer.ReplicatedSpansAll + if sharedReplicate { + replicatedFilter = rditer.ReplicatedSpansExcludeUser + } - var err error - switch keyType { - case storage.IterKeyTypePointsOnly: - for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { - kvs++ + iterateRKSpansVisitor := func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + timingTag.start("iter") + defer timingTag.stop("iter") + + var err error + switch keyType { + case storage.IterKeyTypePointsOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() + } + key, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + v, err := iter.UnsafeValue() + if err != nil { + return err + } + if err = b.PutEngineKey(key, v); err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } + } + + case storage.IterKeyTypeRangesOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + rangeKVs++ if b == nil { b = kvSS.newWriteBatch() } - key, err := iter.UnsafeEngineKey() + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) if err != nil { return err } - v, err := iter.UnsafeValue() - if err != nil { - return err - } - if err = b.PutEngineKey(key, v); err != nil { - return err - } if err = maybeFlushBatch(); err != nil { return err } } + } - case storage.IterKeyTypeRangesOnly: - for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { - bounds, err := iter.EngineRangeBounds() - if err != nil { - return err - } - for _, rkv := range iter.EngineRangeKeys() { - rangeKVs++ - if b == nil { - b = kvSS.newWriteBatch() - } - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return err - } - if err = maybeFlushBatch(); err != nil { - return err - } - } - } + default: + return errors.AssertionFailedf("unexpected key type %v", keyType) + } + return err + } + err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + replicatedFilter, iterateRKSpansVisitor) + if err != nil { + return 0, err + } - default: - return errors.AssertionFailedf("unexpected key type %v", keyType) + var valBuf []byte + if sharedReplicate { + err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() } - return err + var val []byte + switch key.Kind() { + case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete, pebble.InternalKeyKindMerge: + var callerOwned bool + var err error + val, callerOwned, err = value.Value(valBuf) + if err != nil { + return err + } + if callerOwned && val != nil { + valBuf = val[:0] + } + } + if err := b.PutInternalPointKey(key, val); err != nil { + return err + } + return maybeFlushBatch() + }, func(start, end []byte, seqNum uint64) error { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() + } + if err := b.ClearRawEncodedRange(start, end); err != nil { + return err + } + return maybeFlushBatch() + }, func(start, end []byte, keys []rangekey.Key) error { + if b == nil { + b = kvSS.newWriteBatch() + } + for i := range keys { + rangeKVs++ + err := b.PutInternalRangeKey(start, end, keys[i]) + if err != nil { + return err + } + } + return maybeFlushBatch() + }, func(sst *pebble.SharedSSTMeta) error { + sharedSSTs++ + snap.sharedBackings = append(snap.sharedBackings, sst.Backing) + backing, err := sst.Backing.Get() + if err != nil { + return err + } + ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey { + return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{ + UserKey: ik.UserKey, + Trailer: ik.Trailer, + } + } + ssts = append(ssts, kvserverpb.SnapshotRequest_SharedTable{ + Backing: backing, + Smallest: ikeyToPb(sst.Smallest), + Largest: ikeyToPb(sst.Largest), + SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey), + LargestRangeKey: ikeyToPb(sst.LargestRangeKey), + SmallestPointKey: ikeyToPb(sst.SmallestPointKey), + LargestPointKey: ikeyToPb(sst.LargestPointKey), + Level: int32(sst.Level), + Size_: sst.Size, + }) + return nil }) - if err != nil { - return 0, err + if err != nil && errors.Is(err, pebble.ErrInvalidSkipSharedIteration) { + transitionFromSharedToRegularReplicate = true + err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + rditer.ReplicatedSpansUserOnly, iterateRKSpansVisitor) + } + if err != nil { + return 0, err + } } if b != nil { if err = flushBatch(); err != nil { @@ -646,7 +960,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( timingTag.stop("totalTime") log.Eventf(ctx, "finished sending snapshot batches, sent a total of %d bytes", bytesSent) - kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d", kvs, rangeKVs) + kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d", kvs, rangeKVs, sharedSSTs) return bytesSent, nil } @@ -654,6 +968,8 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( ctx context.Context, stream outgoingSnapshotStream, batch storage.WriteBatch, + ssts []kvserverpb.SnapshotRequest_SharedTable, + transitionToRegularReplicate bool, timerTag *snapshotTimingTag, ) error { timerTag.start("rateLimit") @@ -663,7 +979,11 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( return err } timerTag.start("send") - res := stream.Send(&kvserverpb.SnapshotRequest{KVBatch: batch.Repr()}) + res := stream.Send(&kvserverpb.SnapshotRequest{ + KVBatch: batch.Repr(), + SharedTables: ssts, + TransitionFromSharedToRegularReplicate: transitionToRegularReplicate, + }) timerTag.stop("send") return res } diff --git a/pkg/server/server.go b/pkg/server/server.go index 31a465e4998e..3e0722eaa9ec 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -842,6 +842,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf EagerLeaseAcquisitionLimiter: eagerLeaseAcquisitionLimiter, KVMemoryMonitor: kvMemoryMonitor, RangefeedBudgetFactory: rangeReedBudgetFactory, + SharedStorageEnabled: cfg.SharedStorage != "", SystemConfigProvider: systemConfigWatcher, SpanConfigSubscriber: spanConfig.subscriber, SpanConfigsDisabled: cfg.SpanConfigsDisabled,