diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index dc9c923f9ba5..913e647fecd7 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -224,6 +224,7 @@ go_library( "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//objstorage", "@com_github_cockroachdb_pebble//objstorage/remote", + "@com_github_cockroachdb_pebble//rangekey", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index eccb862fa541..fa69180cd8fe 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -249,9 +249,57 @@ message SnapshotRequest { // from a particular sending source. double sender_queue_priority = 11; + // If true, the snapshot could contain shared files present in a pre-configured + // or explicitly specified shared.Storage instance. Such files will have their + // metadata present in the snapshot, but not file contents. + bool shared_replicate = 12; + reserved 1, 4; } + // SharedTable represents one shared SSTable present in shared storage. + // Intended to be the protobuf version of pebble.SharedSSTMeta. + message SharedTable { + // Internal key represents a Pebble-internal key. See pebble.InternalKey + // for details on how these keys are used. + message InternalKey { + // User key portion of the internal key. + bytes user_key = 1; + // Trailer portion of the internal key, as defined by Pebble. + uint64 trailer = 2; + } + + // Used by the Pebble objstorage package to resolve a reference to a shared object. + bytes backing = 1; + + // Used by the Pebble objstorage package to generate new blob storage drivers. + // Reserved for future use. + bytes locator = 2; + + // Smallest internal key in the sstable. + InternalKey smallest = 3; + // Largest internal key in the sstable. + InternalKey largest = 4; + // Smallest range key in the sstable. Zero value if no range keys are + // present. + InternalKey smallest_range_key = 5; + // Largest range key in the sstable. Zero value if no range keys are + // present. + InternalKey largest_range_key = 6; + // Smallest point key in the sstable. Zero value if no point keys are + // present. + InternalKey smallest_point_key = 7; + // Largest point key in the sstable. Zero value if no point keys are + // present. + InternalKey largest_point_key = 8; + + // LSM level of the original sstable. This sstable will go into the same + // level in the destination LSM. + int32 level = 9; + // Physical size of the sstable in bytes. + uint64 size = 10; + } + Header header = 1; // A BatchRepr. Multiple kv_batches may be sent across multiple request messages. @@ -259,6 +307,16 @@ message SnapshotRequest { bool final = 4; + repeated SharedTable shared_tables = 5 [(gogoproto.nullable) = false]; + + // If true, signals the receiver that the sender can no longer replicate + // using shared files, even though the Header initially contained + // shared_replicate = true. All contents of this range will be streamed as + // usual beyond this point. This bool must be set to true in a request before + // the end of the snapshot (i.e. before the final = true request), and this + // flag must be set to true before any user keys are streamed. + bool transition_from_shared_to_regular_replicate = 6; + reserved 3; } diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index 6bf86136e838..55da433f215a 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -304,9 +304,10 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) { // IterateReplicaKeySpans iterates over each of a range's key spans, and calls // the given visitor with an iterator over its data. Specifically, it iterates -// over the spans returned by either makeAllKeySpans or MakeReplicatedKeySpans, -// and for each one provides first a point key iterator and then a range key -// iterator. This is the expected order for Raft snapshots. +// over the spans returned by a Select() over all spans or replicated only spans +// (with replicatedSpansFilter applied on replicated spans), and for each one +// provides first a point key iterator and then a range key iterator. This is the +// expected order for Raft snapshots. // // The iterator will be pre-seeked to the span, and is provided along with the // key span and key type (point or range). Iterators that have no data are @@ -328,9 +329,11 @@ func IterateReplicaKeySpans( var spans []roachpb.Span if replicatedOnly { spans = Select(desc.RangeID, SelectOpts{ - ReplicatedSpansFilter: replicatedSpansFilter, ReplicatedBySpan: desc.RSpan(), - ReplicatedByRangeID: true, + ReplicatedSpansFilter: replicatedSpansFilter, + // NB: We exclude ReplicatedByRangeID if replicatedSpansFilter is + // ReplicatedSpansUserOnly. + ReplicatedByRangeID: replicatedSpansFilter != ReplicatedSpansUserOnly, }) } else { spans = Select(desc.RangeID, SelectOpts{ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output index 0d6abd2a4650..ffd95d052634 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r1/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/1/{r""-s""} | 016989726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/1/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/1/{r""-s""} | 016989726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/1/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/1/{r""-s""} | 016989726c67632d | | | /Local/RangeID/1/r/RangeGCThreshold | -| /Local/RangeID/1/{r""-s""} | 016989727261736b | | | /Local/RangeID/1/r/RangeAppliedState | -| /Local/RangeID/1/{r""-s""} | 01698972726c6c2d | | | /Local/RangeID/1/r/RangeLease | -| /Local/RangeID/1/{r""-s""} | 016989723a61 | 016989723a78 | 000000000000000109 | /Local/RangeID/1/r":{a"-x"}/0.000000001,0 | -| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 | -| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 | -| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 | +| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 | +| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output index 704f2b962503..c70d703437d1 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r2/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/2/{r""-s""} | 01698a726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/2/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/2/{r""-s""} | 01698a726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/2/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/2/{r""-s""} | 01698a726c67632d | | | /Local/RangeID/2/r/RangeGCThreshold | -| /Local/RangeID/2/{r""-s""} | 01698a727261736b | | | /Local/RangeID/2/r/RangeAppliedState | -| /Local/RangeID/2/{r""-s""} | 01698a72726c6c2d | | | /Local/RangeID/2/r/RangeLease | -| /Local/RangeID/2/{r""-s""} | 01698a723a61 | 01698a723a78 | 000000000000000109 | /Local/RangeID/2/r":{a"-x"}/0.000000001,0 | -| {b-c} | 62 | | 0000000000000001 | "b"/0.000000001,0 | -| {b-c} | 62ffffffff | | 0000000000000001 | "b\xff\xff\xff\xff"/0.000000001,0 | -| {b-c} | 62 | 63 | 000000000000000109 | {b-c}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {b-c} | 62 | | 0000000000000001 | "b"/0.000000001,0 | +| {b-c} | 62ffffffff | | 0000000000000001 | "b\xff\xff\xff\xff"/0.000000001,0 | +| {b-c} | 62 | 63 | 000000000000000109 | {b-c}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output index b5130d147176..9949d61859e3 100644 --- a/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output +++ b/pkg/kv/kvserver/rditer/testdata/TestReplicaDataIterator/r3/replicatedOnly/user-only/output @@ -1,15 +1,9 @@ echo ---- -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ -| /Local/RangeID/3/{r""-s""} | 01698b726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/3/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" | -| /Local/RangeID/3/{r""-s""} | 01698b726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/3/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" | -| /Local/RangeID/3/{r""-s""} | 01698b726c67632d | | | /Local/RangeID/3/r/RangeGCThreshold | -| /Local/RangeID/3/{r""-s""} | 01698b727261736b | | | /Local/RangeID/3/r/RangeAppliedState | -| /Local/RangeID/3/{r""-s""} | 01698b72726c6c2d | | | /Local/RangeID/3/r/RangeLease | -| /Local/RangeID/3/{r""-s""} | 01698b723a61 | 01698b723a78 | 000000000000000109 | /Local/RangeID/3/r":{a"-x"}/0.000000001,0 | -| {c-d} | 63 | | 0000000000000001 | "c"/0.000000001,0 | -| {c-d} | 63ffffffff | | 0000000000000001 | "c\xff\xff\xff\xff"/0.000000001,0 | -| {c-d} | 63 | 64 | 000000000000000109 | {c-d}/0.000000001,0 | -+----------------------------+--------------------------------------------------------+--------------+--------------------+---------------------------------------------------------------------+ ++-------+------------+------------+--------------------+-----------------------------------+ +| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY | ++-------+------------+------------+--------------------+-----------------------------------+ +| {c-d} | 63 | | 0000000000000001 | "c"/0.000000001,0 | +| {c-d} | 63ffffffff | | 0000000000000001 | "c\xff\xff\xff\xff"/0.000000001,0 | +| {c-d} | 63 | 64 | 000000000000000109 | {c-d}/0.000000001,0 | ++-------+------------+------------+--------------------+-----------------------------------+ diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 2daefe8e8d0f..5bd28858c209 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3138,6 +3138,12 @@ func (r *Replica) followerSendSnapshot( // explicitly for snapshots going out to followers. snap.State.DeprecatedUsingAppliedStateKey = true + // Use shared replication if shared storage is enabled and we're sending + // a snapshot for a non-system range. This allows us to send metadata of + // sstables in shared storage as opposed to streaming their contents. Keys + // in higher levels of the LSM are still streamed in the snapshot. + sharedReplicate := r.store.cfg.SharedStorageEnabled && snap.State.Desc.StartKey.AsRawKey().Compare(keys.TableDataMin) >= 0 + // Create new snapshot request header using the delegate snapshot request. header := kvserverpb.SnapshotRequest_Header{ State: snap.State, @@ -3160,6 +3166,7 @@ func (r *Replica) followerSendSnapshot( SenderQueuePriority: req.SenderQueuePriority, Strategy: kvserverpb.SnapshotRequest_KV_BATCH, Type: req.Type, + SharedReplicate: sharedReplicate, } newBatchFn := func() storage.WriteBatch { return r.store.TODOEngine().NewWriteBatch() diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index bd269d450ac4..d53fd1c58d93 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/redact" "go.etcd.io/raft/v3" "go.etcd.io/raft/v3/raftpb" @@ -279,9 +280,10 @@ type OutgoingSnapshot struct { // The Pebble snapshot that will be streamed from. EngineSnap storage.Reader // The replica state within the snapshot. - State kvserverpb.ReplicaState - snapType kvserverpb.SnapshotRequest_Type - onClose func() + State kvserverpb.ReplicaState + snapType kvserverpb.SnapshotRequest_Type + sharedBackings []objstorage.RemoteObjectBackingHandle + onClose func() } func (s OutgoingSnapshot) String() string { @@ -297,6 +299,9 @@ func (s OutgoingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { // Close releases the resources associated with the snapshot. func (s *OutgoingSnapshot) Close() { s.EngineSnap.Close() + for i := range s.sharedBackings { + s.sharedBackings[i].Close() + } if s.onClose != nil { s.onClose() } @@ -311,10 +316,13 @@ type IncomingSnapshot struct { // The descriptor in the snapshot, never nil. Desc *roachpb.RangeDescriptor DataSize int64 + SharedSize int64 snapType kvserverpb.SnapshotRequest_Type placeholder *ReplicaPlaceholder raftAppliedIndex kvpb.RaftIndex // logging only msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied + sharedSSTs []pebble.SharedSSTMeta + doExcise bool } func (s IncomingSnapshot) String() string { @@ -510,6 +518,12 @@ func (r *Replica) applySnapshot( logDetails.Printf(" subsumedReplicas=%d@%0.0fms", len(subsumedRepls), stats.subsumedReplicas.Sub(start).Seconds()*1000) } + if len(inSnap.sharedSSTs) > 0 { + logDetails.Printf(" shared=%d sharedSize=%s", len(inSnap.sharedSSTs), humanizeutil.IBytes(inSnap.SharedSize)) + } + if inSnap.doExcise { + logDetails.Printf(" excise=true") + } logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()), stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000) log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) @@ -574,12 +588,20 @@ func (r *Replica) applySnapshot( } } var ingestStats pebble.IngestOperationStats - if ingestStats, err = - // TODO: separate ingestions for log and statemachine engine. See: - // - // https://github.com/cockroachdb/cockroach/issues/93251 - r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { - return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + // TODO: separate ingestions for log and statemachine engine. See: + // + // https://github.com/cockroachdb/cockroach/issues/93251 + if inSnap.doExcise { + exciseSpan := desc.KeySpan().AsRawSpanWithNoLocals() + if ingestStats, err = + r.store.TODOEngine().IngestAndExciseFiles(ctx, inSnap.SSTStorageScratch.SSTs(), inSnap.sharedSSTs, exciseSpan); err != nil { + return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey) + } + } else { + if ingestStats, err = + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + } } if r.store.cfg.KVAdmissionController != nil { r.store.cfg.KVAdmissionController.SnapshotIngested(r.store.StoreID(), ingestStats) diff --git a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go index caa8174cad7b..c84666678912 100644 --- a/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go +++ b/pkg/kv/kvserver/replica_sst_snapshot_storage_test.go @@ -12,6 +12,7 @@ package kvserver import ( "context" + "fmt" io "io" "path/filepath" "strconv" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -277,6 +279,7 @@ func TestMultiSSTWriterInitSST(t *testing.T) { msstw, err := newMultiSSTWriter( ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, + false, /* skipRangeDelForLastSpan */ ) require.NoError(t, err) _, err = msstw.Finish(ctx) @@ -312,6 +315,85 @@ func TestMultiSSTWriterInitSST(t *testing.T) { } } +// TestMultiSSTWriterAddLastSpan tests that multiSSTWriter initializes each of +// the SST files associated with the replicated key ranges by writing a range +// deletion tombstone that spans the entire range of each respectively, except +// for the last span which only gets a rangedel when explicitly added. +func TestMultiSSTWriterAddLastSpan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + addRangeDels := []bool{false, true} + for _, addRangeDel := range addRangeDels { + t.Run(fmt.Sprintf("addRangeDel=%v", addRangeDel), func(t *testing.T) { + ctx := context.Background() + testRangeID := roachpb.RangeID(1) + testSnapUUID := uuid.Must(uuid.FromBytes([]byte("foobar1234567890"))) + testLimiter := rate.NewLimiter(rate.Inf, 0) + + cleanup, eng := newOnDiskEngine(ctx, t) + defer cleanup() + defer eng.Close() + + sstSnapshotStorage := NewSSTSnapshotStorage(eng, testLimiter) + scratch := sstSnapshotStorage.NewScratchSpace(testRangeID, testSnapUUID) + desc := roachpb.RangeDescriptor{ + StartKey: roachpb.RKey("d"), + EndKey: roachpb.RKeyMax, + } + keySpans := rditer.MakeReplicatedKeySpans(&desc) + + msstw, err := newMultiSSTWriter( + ctx, cluster.MakeTestingClusterSettings(), scratch, keySpans, 0, + true, /* skipRangeDelForLastSpan */ + ) + require.NoError(t, err) + if addRangeDel { + require.NoError(t, msstw.addRangeDelForLastSpan()) + } + testKey := storage.MVCCKey{Key: roachpb.RKey("d1").AsRawKey(), Timestamp: hlc.Timestamp{WallTime: 1}} + testEngineKey, _ := storage.DecodeEngineKey(storage.EncodeMVCCKey(testKey)) + require.NoError(t, msstw.Put(ctx, testEngineKey, []byte("foo"))) + _, err = msstw.Finish(ctx) + require.NoError(t, err) + + var actualSSTs [][]byte + fileNames := msstw.scratch.SSTs() + for _, file := range fileNames { + sst, err := fs.ReadFile(eng, file) + require.NoError(t, err) + actualSSTs = append(actualSSTs, sst) + } + + // Construct an SST file for each of the key ranges and write a rangedel + // tombstone that spans from Start to End. + var expectedSSTs [][]byte + for i, s := range keySpans { + func() { + sstFile := &storage.MemObject{} + sst := storage.MakeIngestionSSTWriter(ctx, cluster.MakeTestingClusterSettings(), sstFile) + defer sst.Close() + if i < len(keySpans)-1 || addRangeDel { + err := sst.ClearRawRange(s.Key, s.EndKey, true, true) + require.NoError(t, err) + } + if i == len(keySpans)-1 { + require.NoError(t, sst.PutEngineKey(testEngineKey, []byte("foo"))) + } + err = sst.Finish() + require.NoError(t, err) + expectedSSTs = append(expectedSSTs, sstFile.Data()) + }() + } + + require.Equal(t, len(actualSSTs), len(expectedSSTs)) + for i := range fileNames { + require.Equal(t, actualSSTs[i], expectedSSTs[i]) + } + }) + } +} + func newOnDiskEngine(ctx context.Context, t *testing.T) (func(), storage.Engine) { dir, cleanup := testutils.TempDir(t) eng, err := storage.Open( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 5239e1e0af05..4fb367179aea 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -1198,6 +1198,9 @@ type StoreConfig struct { // data structure useful for retrieving span configs. Only available if // SpanConfigsDisabled is unset. SpanConfigSubscriber spanconfig.KVSubscriber + // SharedStorageEnabled stores whether this store is configured with a + // shared.Storage instance and can accept shared snapshots. + SharedStorageEnabled bool // KVAdmissionController is used for admission control. KVAdmissionController kvadmission.Controller diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 54bb08e74f81..44333c0a998a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -39,6 +39,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage" + "github.com/cockroachdb/pebble/rangekey" "github.com/cockroachdb/redact" "go.etcd.io/raft/v3/raftpb" "go.opentelemetry.io/otel/attribute" @@ -182,6 +184,10 @@ type multiSSTWriter struct { sstChunkSize int64 // The total size of SST data. Updated on SST finalization. dataSize int64 + // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the + // same sstable. We rely on the caller to take care of clearing this span + // through a different process (eg. IngestAndExcise on pebble). + skipRangeDelForLastSpan bool } func newMultiSSTWriter( @@ -190,12 +196,14 @@ func newMultiSSTWriter( scratch *SSTSnapshotStorageScratch, keySpans []roachpb.Span, sstChunkSize int64, + skipRangeDelForLastSpan bool, ) (multiSSTWriter, error) { msstw := multiSSTWriter{ - st: st, - scratch: scratch, - keySpans: keySpans, - sstChunkSize: sstChunkSize, + st: st, + scratch: scratch, + keySpans: keySpans, + sstChunkSize: sstChunkSize, + skipRangeDelForLastSpan: skipRangeDelForLastSpan, } if err := msstw.initSST(ctx); err != nil { return msstw, err @@ -210,6 +218,11 @@ func (msstw *multiSSTWriter) initSST(ctx context.Context) error { } newSST := storage.MakeIngestionSSTWriter(ctx, msstw.st, newSSTFile) msstw.currSST = newSST + if msstw.skipRangeDelForLastSpan && msstw.currSpan == len(msstw.keySpans)-1 { + // Skip this ClearRange, as it will be excised at ingestion time in the + // engine instead. + return nil + } if err := msstw.currSST.ClearRawRange( msstw.keySpans[msstw.currSpan].Key, msstw.keySpans[msstw.currSpan].EndKey, true /* pointKeys */, true, /* rangeKeys */ @@ -231,8 +244,33 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return nil } -func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { - for msstw.keySpans[msstw.currSpan].EndKey.Compare(key.Key) <= 0 { +// addRangeDelForLastSpan allows us to explicitly add a deletion tombstone +// for the last span in the msstw, if it was instantiated with the expectation +// that no tombstone was necessary. +func (msstw *multiSSTWriter) addRangeDelForLastSpan() error { + if !msstw.skipRangeDelForLastSpan { + // Nothing to do. + return nil + } + if msstw.currSpan < len(msstw.keySpans)-1 { + // When we switch to the last key span, we will just add a rangedel for it. + // Set skipRangeDelForLastSpan to false. + msstw.skipRangeDelForLastSpan = false + return nil + } + if msstw.currSpan > len(msstw.keySpans)-1 { + panic("cannot addRangeDel if sst writer has moved past user keys") + } + panic("multiSSTWriter already added keys to sstable that cannot be deleted by a rangedel/rangekeydel within it") +} + +// rolloverSST rolls the underlying SST writer over to the appropriate SST +// writer for writing a point/range key at key. For point keys, endKey and key +// must equal each other. +func (msstw *multiSSTWriter) rolloverSST( + ctx context.Context, key roachpb.Key, endKey roachpb.Key, +) error { + for msstw.keySpans[msstw.currSpan].EndKey.Compare(key) <= 0 { // Finish the current SST, write to the file, and move to the next key // range. if err := msstw.finalizeSST(ctx); err != nil { @@ -242,8 +280,20 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, val return err } } - if msstw.keySpans[msstw.currSpan].Key.Compare(key.Key) > 0 { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key.Key, msstw.keySpans) + if msstw.keySpans[msstw.currSpan].Key.Compare(key) > 0 || + msstw.keySpans[msstw.currSpan].EndKey.Compare(endKey) < 0 { + if !key.Equal(endKey) { + return errors.AssertionFailedf("client error: expected %s to fall in one of %s", + roachpb.Span{Key: key, EndKey: endKey}, msstw.keySpans) + } + return errors.AssertionFailedf("client error: expected %s to fall in one of %s", key, msstw.keySpans) + } + return nil +} + +func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, value []byte) error { + if err := msstw.rolloverSST(ctx, key.Key, key.Key); err != nil { + return err } if err := msstw.currSST.PutEngineKey(key, value); err != nil { return errors.Wrap(err, "failed to put in sst") @@ -251,26 +301,87 @@ func (msstw *multiSSTWriter) Put(ctx context.Context, key storage.EngineKey, val return nil } +func (msstw *multiSSTWriter) PutInternalPointKey( + ctx context.Context, key []byte, kind pebble.InternalKeyKind, val []byte, +) error { + decodedKey, ok := storage.DecodeEngineKey(key) + if !ok { + return errors.New("cannot decode engine key") + } + if err := msstw.rolloverSST(ctx, decodedKey.Key, decodedKey.Key); err != nil { + return err + } + var err error + switch kind { + case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete: + err = msstw.currSST.PutEngineKey(decodedKey, val) + case pebble.InternalKeyKindDelete, pebble.InternalKeyKindDeleteSized: + err = msstw.currSST.ClearEngineKey(decodedKey, storage.ClearOptions{ValueSizeKnown: false}) + default: + err = errors.New("unexpected key kind") + } + if err != nil { + return errors.Wrap(err, "failed to put in sst") + } + return nil +} + +func decodeRangeStartEnd( + start, end []byte, +) (decodedStart, decodedEnd storage.EngineKey, err error) { + var emptyKey storage.EngineKey + decodedStart, ok := storage.DecodeEngineKey(start) + if !ok { + return emptyKey, emptyKey, errors.New("cannot decode start engine key") + } + decodedEnd, ok = storage.DecodeEngineKey(end) + if !ok { + return emptyKey, emptyKey, errors.New("cannot decode end engine key") + } + if decodedStart.Key.Compare(decodedEnd.Key) >= 0 { + return emptyKey, emptyKey, errors.AssertionFailedf("start key %s must be before end key %s", end, start) + } + return decodedStart, decodedEnd, nil +} + +func (msstw *multiSSTWriter) PutInternalRangeDelete(ctx context.Context, start, end []byte) error { + decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) + if err != nil { + return err + } + if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + return err + } + if err := msstw.currSST.ClearRawEncodedRange(start, end); err != nil { + return errors.Wrap(err, "failed to put range delete in sst") + } + return nil +} + +func (msstw *multiSSTWriter) PutInternalRangeKey( + ctx context.Context, start, end []byte, key rangekey.Key, +) error { + decodedStart, decodedEnd, err := decodeRangeStartEnd(start, end) + if err != nil { + return err + } + if err := msstw.rolloverSST(ctx, decodedStart.Key, decodedEnd.Key); err != nil { + return err + } + if err := msstw.currSST.PutInternalRangeKey(start, end, key); err != nil { + return errors.Wrap(err, "failed to put range key in sst") + } + return nil +} + func (msstw *multiSSTWriter) PutRangeKey( ctx context.Context, start, end roachpb.Key, suffix []byte, value []byte, ) error { if start.Compare(end) >= 0 { return errors.AssertionFailedf("start key %s must be before end key %s", end, start) } - for msstw.keySpans[msstw.currSpan].EndKey.Compare(start) <= 0 { - // Finish the current SST, write to the file, and move to the next key - // range. - if err := msstw.finalizeSST(ctx); err != nil { - return err - } - if err := msstw.initSST(ctx); err != nil { - return err - } - } - if msstw.keySpans[msstw.currSpan].Key.Compare(start) > 0 || - msstw.keySpans[msstw.currSpan].EndKey.Compare(end) < 0 { - return errors.AssertionFailedf("client error: expected %s to fall in one of %s", - roachpb.Span{Key: start, EndKey: end}, msstw.keySpans) + if err := msstw.rolloverSST(ctx, start, end); err != nil { + return err } if err := msstw.currSST.PutEngineRangeKey(start, end, suffix, value); err != nil { return errors.Wrap(err, "failed to put range key in sst") @@ -358,6 +469,26 @@ func (tag *snapshotTimingTag) Render() []attribute.KeyValue { return tags } +// stubBackingHandle is a stub implementation of RemoteObjectBackingHandle +// that just wraps a RemoteObjectBacking. This is used by a snapshot receiver as +// it is on a different node than the one that created the original +// RemoteObjectBackingHandle, so the Close() function is a no-op. +type stubBackingHandle struct { + backing objstorage.RemoteObjectBacking +} + +// Get implements the RemoteObjectBackingHandle interface. +func (s stubBackingHandle) Get() (objstorage.RemoteObjectBacking, error) { + return s.backing, nil +} + +// Close implements the RemoteObjectBackingHandle interface. +func (s stubBackingHandle) Close() { + // No-op. +} + +var _ objstorage.RemoteObjectBackingHandle = &stubBackingHandle{} + // Receive implements the snapshotStrategy interface. // // NOTE: This function assumes that the point and range (e.g. MVCC range @@ -410,7 +541,25 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // At the moment we'll write at most five SSTs. // TODO(jeffreyxiao): Re-evaluate as the default range size grows. keyRanges := rditer.MakeReplicatedKeySpans(header.State.Desc) - msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize) + + doExcise := header.SharedReplicate + if doExcise && !s.cfg.SharedStorageEnabled { + return noSnap, sendSnapshotError(ctx, s, stream, errors.New("cannot accept shared sstables")) + } + + // We rely on the last keyRange passed into multiSSTWriter being the user key + // span. If the sender signals that it can no longer do shared replication + // (with a TransitionFromSharedToRegularReplicate = true), we will have to + // switch to adding a rangedel for that span. Since multiSSTWriter acts on an + // opaque slice of keyRanges, we just tell it to add a rangedel for the last + // span. To avoid bugs, assert on the last span in keyRanges actually being + // equal to the user key span. + if doExcise { + if !keyRanges[len(keyRanges)-1].Equal(header.State.Desc.KeySpan().AsRawSpanWithNoLocals()) { + return noSnap, errors.AssertionFailedf("last span in multiSSTWriter did not equal the user key span: %s", keyRanges[len(keyRanges)-1].String()) + } + } + msstw, err := newMultiSSTWriter(ctx, kvSS.st, kvSS.scratch, keyRanges, kvSS.sstChunkSize, doExcise) if err != nil { return noSnap, err } @@ -418,6 +567,8 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( log.Event(ctx, "waiting for snapshot batches to begin") + var sharedSSTs []pebble.SharedSSTMeta + for { timingTag.start("recv") req, err := stream.Recv() @@ -429,6 +580,13 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( err := errors.New("client error: provided a header mid-stream") return noSnap, sendSnapshotError(snapshotCtx, s, stream, err) } + if req.TransitionFromSharedToRegularReplicate { + doExcise = false + sharedSSTs = nil + if err := msstw.addRangeDelForLastSpan(); err != nil { + return noSnap, errors.Wrap(err, "adding tombstone for last span") + } + } if req.KVBatch != nil { recordBytesReceived(int64(len(req.KVBatch))) @@ -449,7 +607,45 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( if err := msstw.Put(ctx, key, batchReader.Value()); err != nil { return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") } + case pebble.InternalKeyKindDelete, pebble.InternalKeyKindDeleteSized: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + if err := msstw.PutInternalPointKey(ctx, batchReader.Key(), batchReader.KeyKind(), nil); err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + case pebble.InternalKeyKindRangeDelete: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + start := batchReader.Key() + end, err := batchReader.EndKey() + if err != nil { + return noSnap, err + } + if err := msstw.PutInternalRangeDelete(ctx, start, end); err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + case pebble.InternalKeyKindRangeKeyUnset, pebble.InternalKeyKindRangeKeyDelete: + if !doExcise { + return noSnap, errors.AssertionFailedf("unexpected batch entry key kind %d", batchReader.KeyKind()) + } + start := batchReader.Key() + end, err := batchReader.EndKey() + if err != nil { + return noSnap, err + } + rangeKeys, err := batchReader.RawRangeKeys() + if err != nil { + return noSnap, err + } + for _, rkv := range rangeKeys { + err := msstw.PutInternalRangeKey(ctx, start, end, rkv) + if err != nil { + return noSnap, errors.Wrapf(err, "writing sst for raft snapshot") + } + } case pebble.InternalKeyKindRangeKeySet: start, err := batchReader.EngineKey() if err != nil { @@ -476,6 +672,22 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( } timingTag.stop("sst") } + if len(req.SharedTables) > 0 && doExcise { + for i := range req.SharedTables { + sst := req.SharedTables[i] + sharedSSTs = append(sharedSSTs, pebble.SharedSSTMeta{ + Backing: stubBackingHandle{sst.Backing}, + Smallest: pebble.InternalKey{UserKey: sst.Smallest.UserKey, Trailer: sst.Smallest.Trailer}, + Largest: pebble.InternalKey{UserKey: sst.Largest.UserKey, Trailer: sst.Largest.Trailer}, + SmallestRangeKey: pebble.InternalKey{UserKey: sst.SmallestRangeKey.UserKey, Trailer: sst.SmallestRangeKey.Trailer}, + LargestRangeKey: pebble.InternalKey{UserKey: sst.LargestRangeKey.UserKey, Trailer: sst.LargestRangeKey.Trailer}, + SmallestPointKey: pebble.InternalKey{UserKey: sst.SmallestPointKey.UserKey, Trailer: sst.SmallestPointKey.Trailer}, + LargestPointKey: pebble.InternalKey{UserKey: sst.LargestPointKey.UserKey, Trailer: sst.LargestPointKey.Trailer}, + Level: uint8(sst.Level), + Size: sst.Size_, + }) + } + } if req.Final { // We finished receiving all batches and log entries. It's possible that // we did not receive any key-value pairs for some of the key spans, but @@ -489,6 +701,10 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( msstw.Close() timingTag.stop("sst") log.Eventf(ctx, "all data received from snapshot and all SSTs were finalized") + var sharedSize int64 + for i := range sharedSSTs { + sharedSize += int64(sharedSSTs[i].Size) + } snapUUID, err := uuid.FromBytes(header.RaftMessageRequest.Message.Snapshot.Data) if err != nil { @@ -502,14 +718,17 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( FromReplica: header.RaftMessageRequest.FromReplica, Desc: header.State.Desc, DataSize: dataSize, + SharedSize: sharedSize, snapType: header.Type, raftAppliedIndex: header.State.RaftAppliedIndex, msgAppRespCh: make(chan raftpb.Message, 1), + sharedSSTs: sharedSSTs, + doExcise: doExcise, } timingTag.stop("totalTime") - kvSS.status = redact.Sprintf("ssts: %d", len(kvSS.scratch.SSTs())) + kvSS.status = redact.Sprintf("local ssts: %d, shared ssts: %d", len(kvSS.scratch.SSTs()), len(sharedSSTs)) return inSnap, nil } } @@ -528,7 +747,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). var bytesSent int64 - var kvs, rangeKVs int + var kvs, rangeKVs, sharedSSTs int // These stopwatches allow us to time the various components of Send(). // - totalTimeStopwatch measures the total time spent within this function. @@ -554,6 +773,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // Iterate over all keys (point keys and range keys) and stream out batches of // key-values. var b storage.WriteBatch + var ssts []kvserverpb.SnapshotRequest_SharedTable + var transitionFromSharedToRegularReplicate bool defer func() { if b != nil { b.Close() @@ -561,7 +782,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( }() flushBatch := func() error { - if err := kvSS.sendBatch(ctx, stream, b, timingTag); err != nil { + if err := kvSS.sendBatch(ctx, stream, b, ssts, transitionFromSharedToRegularReplicate, timingTag); err != nil { return err } bLen := int64(b.Len()) @@ -569,6 +790,8 @@ func (kvSS *kvBatchSnapshotStrategy) Send( recordBytesSent(bLen) b.Close() b = nil + ssts = ssts[:0] + transitionFromSharedToRegularReplicate = false return nil } @@ -579,63 +802,154 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return nil } - err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true /* replicatedOnly */, rditer.ReplicatedSpansAll, - func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { - timingTag.start("iter") - defer timingTag.stop("iter") + // If snapshots containing shared files are allowed, and this range is a + // non-system range, take advantage of shared storage to minimize the amount + // of data we're iterating on and sending over the network. + sharedReplicate := header.SharedReplicate + replicatedFilter := rditer.ReplicatedSpansAll + if sharedReplicate { + replicatedFilter = rditer.ReplicatedSpansExcludeUser + } - var err error - switch keyType { - case storage.IterKeyTypePointsOnly: - for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { - kvs++ + iterateRKSpansVisitor := func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + timingTag.start("iter") + defer timingTag.stop("iter") + + var err error + switch keyType { + case storage.IterKeyTypePointsOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() + } + key, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + v, err := iter.UnsafeValue() + if err != nil { + return err + } + if err = b.PutEngineKey(key, v); err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } + } + + case storage.IterKeyTypeRangesOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + rangeKVs++ if b == nil { b = kvSS.newWriteBatch() } - key, err := iter.UnsafeEngineKey() + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) if err != nil { return err } - v, err := iter.UnsafeValue() - if err != nil { - return err - } - if err = b.PutEngineKey(key, v); err != nil { - return err - } if err = maybeFlushBatch(); err != nil { return err } } + } - case storage.IterKeyTypeRangesOnly: - for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { - bounds, err := iter.EngineRangeBounds() - if err != nil { - return err - } - for _, rkv := range iter.EngineRangeKeys() { - rangeKVs++ - if b == nil { - b = kvSS.newWriteBatch() - } - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return err - } - if err = maybeFlushBatch(); err != nil { - return err - } - } - } + default: + return errors.AssertionFailedf("unexpected key type %v", keyType) + } + return err + } + err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + replicatedFilter, iterateRKSpansVisitor) + if err != nil { + return 0, err + } - default: - return errors.AssertionFailedf("unexpected key type %v", keyType) + var valBuf []byte + if sharedReplicate { + err := rditer.IterateReplicaKeySpansShared(ctx, snap.State.Desc, snap.EngineSnap, func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() } - return err + var val []byte + switch key.Kind() { + case pebble.InternalKeyKindSet, pebble.InternalKeyKindSetWithDelete, pebble.InternalKeyKindMerge: + var callerOwned bool + var err error + val, callerOwned, err = value.Value(valBuf) + if err != nil { + return err + } + if callerOwned && val != nil { + valBuf = val[:0] + } + } + if err := b.PutInternalPointKey(key, val); err != nil { + return err + } + return maybeFlushBatch() + }, func(start, end []byte, seqNum uint64) error { + kvs++ + if b == nil { + b = kvSS.newWriteBatch() + } + if err := b.ClearRawEncodedRange(start, end); err != nil { + return err + } + return maybeFlushBatch() + }, func(start, end []byte, keys []rangekey.Key) error { + if b == nil { + b = kvSS.newWriteBatch() + } + for i := range keys { + rangeKVs++ + err := b.PutInternalRangeKey(start, end, keys[i]) + if err != nil { + return err + } + } + return maybeFlushBatch() + }, func(sst *pebble.SharedSSTMeta) error { + sharedSSTs++ + snap.sharedBackings = append(snap.sharedBackings, sst.Backing) + backing, err := sst.Backing.Get() + if err != nil { + return err + } + ikeyToPb := func(ik pebble.InternalKey) *kvserverpb.SnapshotRequest_SharedTable_InternalKey { + return &kvserverpb.SnapshotRequest_SharedTable_InternalKey{ + UserKey: ik.UserKey, + Trailer: ik.Trailer, + } + } + ssts = append(ssts, kvserverpb.SnapshotRequest_SharedTable{ + Backing: backing, + Smallest: ikeyToPb(sst.Smallest), + Largest: ikeyToPb(sst.Largest), + SmallestRangeKey: ikeyToPb(sst.SmallestRangeKey), + LargestRangeKey: ikeyToPb(sst.LargestRangeKey), + SmallestPointKey: ikeyToPb(sst.SmallestPointKey), + LargestPointKey: ikeyToPb(sst.LargestPointKey), + Level: int32(sst.Level), + Size_: sst.Size, + }) + return nil }) - if err != nil { - return 0, err + if err != nil && errors.Is(err, pebble.ErrInvalidSkipSharedIteration) { + transitionFromSharedToRegularReplicate = true + err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + rditer.ReplicatedSpansUserOnly, iterateRKSpansVisitor) + } + if err != nil { + return 0, err + } } if b != nil { if err = flushBatch(); err != nil { @@ -646,7 +960,7 @@ func (kvSS *kvBatchSnapshotStrategy) Send( timingTag.stop("totalTime") log.Eventf(ctx, "finished sending snapshot batches, sent a total of %d bytes", bytesSent) - kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d", kvs, rangeKVs) + kvSS.status = redact.Sprintf("kvs=%d rangeKVs=%d sharedSSTs=%d", kvs, rangeKVs, sharedSSTs) return bytesSent, nil } @@ -654,6 +968,8 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( ctx context.Context, stream outgoingSnapshotStream, batch storage.WriteBatch, + ssts []kvserverpb.SnapshotRequest_SharedTable, + transitionToRegularReplicate bool, timerTag *snapshotTimingTag, ) error { timerTag.start("rateLimit") @@ -663,7 +979,11 @@ func (kvSS *kvBatchSnapshotStrategy) sendBatch( return err } timerTag.start("send") - res := stream.Send(&kvserverpb.SnapshotRequest{KVBatch: batch.Repr()}) + res := stream.Send(&kvserverpb.SnapshotRequest{ + KVBatch: batch.Repr(), + SharedTables: ssts, + TransitionFromSharedToRegularReplicate: transitionToRegularReplicate, + }) timerTag.stop("send") return res } diff --git a/pkg/server/server.go b/pkg/server/server.go index 31a465e4998e..3e0722eaa9ec 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -842,6 +842,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf EagerLeaseAcquisitionLimiter: eagerLeaseAcquisitionLimiter, KVMemoryMonitor: kvMemoryMonitor, RangefeedBudgetFactory: rangeReedBudgetFactory, + SharedStorageEnabled: cfg.SharedStorage != "", SystemConfigProvider: systemConfigWatcher, SpanConfigSubscriber: spanConfig.subscriber, SpanConfigsDisabled: cfg.SpanConfigsDisabled,