Skip to content

Commit

Permalink
kvserver,storage: Update snapshot strategy to use shared storage
Browse files Browse the repository at this point in the history
If the sender node was created with a SharedStorage, switch to
fast ingestion where we ScanInternal() the keys not in shared
levels, and just share the metadata for files in shared levels.
The sender of the snapshot specifies in the Header that it
is using this ability, and the receiver rejects the snapshot if
it cannot accept shared snapshots.

If ScanInternal() returns an `ErrInvalidSkipSharedIteration`,
we switch back to old-style snapshots where the entirety
of the range is sent over the stream as SnapshotRequests.

Future changes will add better support for detection of when
different nodes point to different blob storage buckets / shared
storage locations, and incorporate that in rebalancing.

Fixes #103028.

Release note (general change): Takes advantage of new CLI option,
`--experimental-shared-storage` to rebalance faster from node to node.
  • Loading branch information
itsbilal committed Jul 5, 2023
1 parent ab24188 commit 1cd6602
Show file tree
Hide file tree
Showing 41 changed files with 1,062 additions and 82 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
defer snapshot.Close()

var results int
return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated,
return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated, rditer.TableKeysInclude, /* skipTableKeys */
func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error {
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
switch keyType {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//objstorage",
"@com_github_cockroachdb_pebble//rangekey",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3813,7 +3813,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
}
}

err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true, /* replicatedOnly */
err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true /* replicatedOnly */, rditer.TableKeysInclude,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
fw, ok := sstFileWriters[string(span.Key)]
if !ok || !fw.span.Equal(span) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/kvserverpb/raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,71 @@ message SnapshotRequest {
// from a particular sending source.
double sender_queue_priority = 11;

// If true, the snapshot could contain shared files present in a pre-configured
// or explicitly specified shared.Storage instance. Such files will have their
// metadata present in the snapshot, but not file contents.
bool fast_replicate = 12;

reserved 1, 4;
}

// SharedTable represents one shared SSTable present in shared storage.
message SharedTable {
// Internal key represents a Pebble-internal key.
message InternalKey {
// User key portion of the internal key.
bytes user_key = 1;
// Trailer portion of the internal key, as defined by Pebble.
uint64 trailer = 2;
}

// Used by the Pebble objstorage package to resolve a reference to a shared object.
bytes backing = 1;

// Used by the Pebble objstorage package to generate new blob storage drivers.
// Reserved for future use.
bytes locator = 2;

// Smallest internal key in the sstable.
InternalKey smallest = 3;
// Largest internal key in the sstable.
InternalKey largest = 4;
// Smallest range key in the sstable. Zero value if no range keys are
// present.
InternalKey smallest_range_key = 5;
// Largest range key in the sstable. Zero value if no range keys are
// present.
InternalKey largest_range_key = 6;
// Smallest point key in the sstable. Zero value if no point keys are
// present.
InternalKey smallest_point_key = 7;
// Largest point key in the sstable. Zero value if no point keys are
// present.
InternalKey largest_point_key = 8;

// LSM level of the original sstable. This sstable will go into the same
// level in the destination LSM.
int32 level = 9;
// Physical size of the sstable in bytes.
uint64 size = 10;
}

Header header = 1;

// A BatchRepr. Multiple kv_batches may be sent across multiple request messages.
bytes kv_batch = 2 [(gogoproto.customname) = "KVBatch"];

bool final = 4;

repeated SharedTable shared_tables = 5 [(gogoproto.nullable) = false];

// If true, signals the receiver that the sender can no longer fast replicate
// using shared files, even though the Header initially contained
// fast_replicate = true. All contents of this range will be streamed as
// usual beyond this point. This bool must be set to true in a request before
// the end of the snapshot (i.e. before the final = true request).
bool stop_fast_replicate = 6;

reserved 3;
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rditer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/iterutil",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//rangekey",
],
)

Expand Down
56 changes: 56 additions & 0 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package rditer

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)

// ReplicaDataIteratorOptions defines ReplicaMVCCDataIterator creation options.
Expand Down Expand Up @@ -298,6 +302,21 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) {
return ri.it.HasPointAndRange()
}

// IterateTableKeys is for use with IterateReplicaKeySpans. It specifies whether
// table keys (aka replicated user keys) are iterated on alongside other spans
// in the range (tableKeysInclude), exclusively iterated on (tableKeysOnly),
// or skipped (tableKeysSkip).
type IterateTableKeys int

const (
// TableKeysInclude includes table keys in iteration.
TableKeysInclude IterateTableKeys = iota
// TableKeysSkip skips table keys in iteration.
TableKeysSkip
// TableKeysOnly exclusively iterates on table keys.
TableKeysOnly
)

// IterateReplicaKeySpans iterates over each of a range's key spans, and calls
// the given visitor with an iterator over its data. Specifically, it iterates
// over the spans returned by either makeAllKeySpans or MakeReplicatedKeySpans,
Expand All @@ -315,6 +334,7 @@ func IterateReplicaKeySpans(
desc *roachpb.RangeDescriptor,
reader storage.Reader,
replicatedOnly bool,
includeTableKeys IterateTableKeys,
visitor func(storage.EngineIterator, roachpb.Span, storage.IterKeyType) error,
) error {
if !reader.ConsistentIterators() {
Expand All @@ -326,6 +346,17 @@ func IterateReplicaKeySpans(
} else {
spans = makeAllKeySpans(desc)
}
switch includeTableKeys {
case TableKeysInclude:
// Do nothing.
case TableKeysOnly:
// Only iterate on table keys, which should be the last span in spans.
spans = spans[len(spans)-1:]
case TableKeysSkip:
// We expect the last span to be the span of replicated table keys. Skip it
// if requested as such.
spans = spans[:len(spans)-1]
}
keyTypes := []storage.IterKeyType{storage.IterKeyTypePointsOnly, storage.IterKeyTypeRangesOnly}
for _, span := range spans {
for _, keyType := range keyTypes {
Expand All @@ -350,6 +381,31 @@ func IterateReplicaKeySpans(
return nil
}

// IterateReplicaKeySpansShared iterates over the range's user key span,
// skipping any keys present in shared files. It calls the appropriate visitor
// function for the type of key visited, namely, point keys, range deletes and
// range keys. Shared files that are skipped during this iteration are also
// surfaced through a dedicated visitor. Note that this method only iterates
// over a range's user key span; IterateReplicaKeySpans must be called to
// iterate over the other key spans.
//
// Must use a reader with consistent iterators.
func IterateReplicaKeySpansShared(
ctx context.Context,
desc *roachpb.RangeDescriptor,
reader storage.Reader,
visitPoint func(key *pebble.InternalKey, val pebble.LazyValue) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
}
span := desc.KeySpan().AsRawSpanWithNoLocals()
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
}

// IterateOptions instructs how points and ranges should be presented to visitor
// and if iterators should be visited in forward or reverse order.
// Reverse iterator are also positioned at the end of the range prior to being
Expand Down
54 changes: 38 additions & 16 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func verifyIterateReplicaKeySpans(
desc *roachpb.RangeDescriptor,
eng storage.Engine,
replicatedOnly bool,
iterateTableKeys IterateTableKeys,
) {
readWriter := eng.NewSnapshot()
defer readWriter.Close()
Expand All @@ -161,7 +162,7 @@ func verifyIterateReplicaKeySpans(
"pretty",
})

require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly,
require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly, iterateTableKeys,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
var err error
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
Expand All @@ -180,12 +181,21 @@ func verifyIterateReplicaKeySpans(
var err error
mvccKey, err = key.ToMVCCKey()
require.NoError(t, err)
if iterateTableKeys == TableKeysSkip && desc.KeySpan().AsRawSpanWithNoLocals().ContainsKey(key.Key) {
t.Fatalf("unexpected table key when table key are expected to be skipped: %s", mvccKey)
}
if iterateTableKeys == TableKeysOnly && !desc.KeySpan().AsRawSpanWithNoLocals().ContainsKey(key.Key) {
t.Fatalf("unexpected non-table key when only table keys requested: %s", mvccKey)
}
} else { // lock key
ltk, err := key.ToLockTableKey()
require.NoError(t, err)
mvccKey = storage.MVCCKey{
Key: ltk.Key,
}
if iterateTableKeys == TableKeysOnly {
t.Fatalf("unexpected lock table key when only table keys requested: %s", ltk.Key)
}
}
tbl.Append([]string{
span.String(),
Expand Down Expand Up @@ -271,21 +281,33 @@ func TestReplicaDataIterator(t *testing.T) {
parName := fmt.Sprintf("r%d", tc.desc.RangeID)
t.Run(parName, func(t *testing.T) {
testutils.RunTrueAndFalse(t, "replicatedOnly", func(t *testing.T, replicatedOnly bool) {
name := "all"
if replicatedOnly {
name = "replicatedOnly"
}
w := echotest.NewWalker(t, filepath.Join(path, parName, name))
tableInclude := []IterateTableKeys{TableKeysInclude, TableKeysSkip, TableKeysOnly}
for i := range tableInclude {
tableKeysName := "include"
switch tableInclude[i] {
case TableKeysOnly:
tableKeysName = "only"
case TableKeysSkip:
tableKeysName = "skip"
}
t.Run(fmt.Sprintf("iterateTableKeys=%v", tableKeysName), func(t *testing.T) {
name := "all"
if replicatedOnly {
name = "replicatedOnly"
}
w := echotest.NewWalker(t, filepath.Join(path, parName, name, tableKeysName))

w.Run(t, "output", func(t *testing.T) string {
var innerBuf strings.Builder
tbl := tablewriter.NewWriter(&innerBuf)
// Print contents of the Replica according to the iterator.
verifyIterateReplicaKeySpans(t, tbl, &tc.desc, eng, replicatedOnly)
w.Run(t, "output", func(t *testing.T) string {
var innerBuf strings.Builder
tbl := tablewriter.NewWriter(&innerBuf)
// Print contents of the Replica according to the iterator.
verifyIterateReplicaKeySpans(t, tbl, &tc.desc, eng, replicatedOnly, tableInclude[i])

tbl.Render()
return innerBuf.String()
})(t)
tbl.Render()
return innerBuf.String()
})(t)
})
}
})
})
}
Expand Down Expand Up @@ -449,7 +471,7 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) {
}

var actualSpans []roachpb.Span
require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly,
require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly, TableKeysInclude,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
// We should never see any point keys.
require.Equal(t, storage.IterKeyTypeRangesOnly, keyType)
Expand Down Expand Up @@ -556,7 +578,7 @@ func benchReplicaEngineDataIterator(b *testing.B, numRanges, numKeysPerRange, va

for i := 0; i < b.N; i++ {
for _, desc := range descs {
err := IterateReplicaKeySpans(&desc, snapshot, false, /* replicatedOnly */
err := IterateReplicaKeySpans(&desc, snapshot, false /* replicatedOnly */, TableKeysInclude,
func(iter storage.EngineIterator, _ roachpb.Span, _ storage.IterKeyType) error {
var err error
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
echo
----
+-------+------------+------------+--------------------+-----------------------------------+
| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY |
+-------+------------+------------+--------------------+-----------------------------------+
| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 |
| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 |
| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 |
+-------+------------+------------+--------------------+-----------------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
echo
----
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY |
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
| /Local/RangeID/1/{r""-s""} | 016989726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/1/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" |
| /Local/RangeID/1/{r""-s""} | 016989726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/1/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" |
| /Local/RangeID/1/{r""-s""} | 016989726c67632d | | | /Local/RangeID/1/r/RangeGCThreshold |
| /Local/RangeID/1/{r""-s""} | 016989727261736b | | | /Local/RangeID/1/r/RangeAppliedState |
| /Local/RangeID/1/{r""-s""} | 01698972726c6c2d | | | /Local/RangeID/1/r/RangeLease |
| /Local/RangeID/1/{r""-s""} | 016989723a61 | 016989723a78 | 000000000000000109 | /Local/RangeID/1/r":{a"-x"}/0.000000001,0 |
| /Local/RangeID/1/{u""-v""} | 0169897572667462 | | | /Local/RangeID/1/u/RangeTombstone |
| /Local/RangeID/1/{u""-v""} | 0169897572667468 | | | /Local/RangeID/1/u/RaftHardState |
| /Local/RangeID/1/{u""-v""} | 016989757266746c0000000000000001 | | | /Local/RangeID/1/u/RaftLog/logIndex:1 |
| /Local/RangeID/1/{u""-v""} | 016989757266746c0000000000000002 | | | /Local/RangeID/1/u/RaftLog/logIndex:2 |
| /Local/RangeID/1/{u""-v""} | 01698975726c7274 | | | /Local/RangeID/1/u/RangeLastReplicaGCTimestamp |
| /Local/RangeID/1/{u""-v""} | 016989753a61 | 016989753a78 | 000000000000000109 | /Local/RangeID/1/u":{a"-x"}/0.000000001,0 |
| /Local/Range"{a"-b"} | 016b1261000172647363 | | 0000000000000001 | /Local/Range"a"/RangeDescriptor/0.000000001,0 |
| /Local/Range"{a"-b"} | 016b1261000174786e2d0ce61c175eb445878c36dcf4062ada4c | | | /Local/Range"a"/Transaction/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" |
| /Local/Range"{a"-b"} | 016b126100ff000174786e2d9855a1ef8eb94c06a106cab1dda78a2b | | | /Local/Range"a\x00"/Transaction/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" |
| /Local/Range"{a"-b"} | 016b1261ffffffff000174786e2d295e727c8ca9437cbb5e8e2ebbad996f | | | /Local/Range"a\xff\xff\xff\xff"/Transaction/"295e727c-8ca9-437c-bb5e-8e2ebbad996f" |
| /Local/Lock/Intent/Local/Range"{a"-b"} | 017a6b12016b126100ff01726473630001 | | 030ce61c175eb445878c36dcf4062ada4c | /Local/Range"a"/RangeDescriptor |
| /Local/Lock/Intent"{a"-b"} | 017a6b12610001 | | 030ce61c175eb445878c36dcf4062ada4c | "a" |
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
echo
----
+-------+------------+------------+--------------------+-----------------------------------+
| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY |
+-------+------------+------------+--------------------+-----------------------------------+
| {a-b} | 61 | | 0000000000000001 | "a"/0.000000001,0 |
| {a-b} | 61ffffffff | | 0000000000000001 | "a\xff\xff\xff\xff"/0.000000001,0 |
| {a-b} | 61 | 62 | 000000000000000109 | {a-b}/0.000000001,0 |
+-------+------------+------------+--------------------+-----------------------------------+
Loading

0 comments on commit 1cd6602

Please sign in to comment.