Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage,kvserver: Foundational changes for disaggregated ingestions #107297

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
defer snapshot.Close()

var results int
return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated,
return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated, rditer.ReplicatedSpansAll,
func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error {
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
switch keyType {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3812,7 +3812,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
}
}

err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true, /* replicatedOnly */
err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true /* replicatedOnly */, rditer.ReplicatedSpansAll,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
fw, ok := sstFileWriters[string(span.Key)]
if !ok || !fw.span.Equal(span) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/rditer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/iterutil",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//rangekey",
],
)

Expand Down
47 changes: 45 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package rditer

import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/rangekey"
)

// ReplicaDataIteratorOptions defines ReplicaMVCCDataIterator creation options.
Expand Down Expand Up @@ -315,16 +319,26 @@ func IterateReplicaKeySpans(
desc *roachpb.RangeDescriptor,
reader storage.Reader,
replicatedOnly bool,
replicatedSpansFilter ReplicatedSpansFilter,
visitor func(storage.EngineIterator, roachpb.Span, storage.IterKeyType) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
}
var spans []roachpb.Span
if replicatedOnly {
spans = MakeReplicatedKeySpans(desc)
spans = Select(desc.RangeID, SelectOpts{
ReplicatedSpansFilter: replicatedSpansFilter,
ReplicatedBySpan: desc.RSpan(),
ReplicatedByRangeID: true,
})
} else {
spans = makeAllKeySpans(desc)
spans = Select(desc.RangeID, SelectOpts{
ReplicatedBySpan: desc.RSpan(),
ReplicatedSpansFilter: replicatedSpansFilter,
ReplicatedByRangeID: true,
UnreplicatedByRangeID: true,
})
}
keyTypes := []storage.IterKeyType{storage.IterKeyTypePointsOnly, storage.IterKeyTypeRangesOnly}
for _, span := range spans {
Expand All @@ -350,6 +364,35 @@ func IterateReplicaKeySpans(
return nil
}

// IterateReplicaKeySpansShared iterates over the range's user key span,
// skipping any keys present in shared files. It calls the appropriate visitor
// function for the type of key visited, namely, point keys, range deletes and
// range keys. Shared files that are skipped during this iteration are also
// surfaced through a dedicated visitor. Note that this method only iterates
// over a range's user key span; IterateReplicaKeySpans must be called to
// iterate over the other key spans.
//
// Must use a reader with consistent iterators.
func IterateReplicaKeySpansShared(
ctx context.Context,
desc *roachpb.RangeDescriptor,
reader storage.Reader,
visitPoint func(key *pebble.InternalKey, val pebble.LazyValue, info pebble.IteratorLevel) error,
visitRangeDel func(start, end []byte, seqNum uint64) error,
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
visitSharedFile func(sst *pebble.SharedSSTMeta) error,
) error {
if !reader.ConsistentIterators() {
panic("reader must provide consistent iterators")
}
spans := Select(desc.RangeID, SelectOpts{
ReplicatedSpansFilter: ReplicatedSpansUserOnly,
ReplicatedBySpan: desc.RSpan(),
})
span := spans[0]
return reader.ScanInternal(ctx, span.Key, span.EndKey, visitPoint, visitRangeDel, visitRangeKey, visitSharedFile)
}

// IterateOptions instructs how points and ranges should be presented to visitor
// and if iterators should be visited in forward or reverse order.
// Reverse iterator are also positioned at the end of the range prior to being
Expand Down
51 changes: 35 additions & 16 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func verifyIterateReplicaKeySpans(
desc *roachpb.RangeDescriptor,
eng storage.Engine,
replicatedOnly bool,
replicatedSpansFilter ReplicatedSpansFilter,
) {
readWriter := eng.NewSnapshot()
defer readWriter.Close()
Expand All @@ -161,7 +162,7 @@ func verifyIterateReplicaKeySpans(
"pretty",
})

require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly,
require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly, replicatedSpansFilter,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
var err error
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
Expand All @@ -180,12 +181,18 @@ func verifyIterateReplicaKeySpans(
var err error
mvccKey, err = key.ToMVCCKey()
require.NoError(t, err)
if replicatedSpansFilter == ReplicatedSpansExcludeUser && desc.KeySpan().AsRawSpanWithNoLocals().ContainsKey(key.Key) {
t.Fatalf("unexpected user key when user key are expected to be skipped: %s", mvccKey)
}
} else { // lock key
ltk, err := key.ToLockTableKey()
require.NoError(t, err)
mvccKey = storage.MVCCKey{
Key: ltk.Key,
}
if replicatedSpansFilter == ReplicatedSpansUserOnly {
t.Fatalf("unexpected lock table key when only table keys requested: %s", ltk.Key)
}
}
tbl.Append([]string{
span.String(),
Expand Down Expand Up @@ -271,21 +278,33 @@ func TestReplicaDataIterator(t *testing.T) {
parName := fmt.Sprintf("r%d", tc.desc.RangeID)
t.Run(parName, func(t *testing.T) {
testutils.RunTrueAndFalse(t, "replicatedOnly", func(t *testing.T, replicatedOnly bool) {
name := "all"
if replicatedOnly {
name = "replicatedOnly"
}
w := echotest.NewWalker(t, filepath.Join(path, parName, name))
replicatedSpans := []ReplicatedSpansFilter{ReplicatedSpansAll, ReplicatedSpansExcludeUser, ReplicatedSpansUserOnly}
for i := range replicatedSpans {
replicatedKeysName := "all"
switch replicatedSpans[i] {
case ReplicatedSpansExcludeUser:
replicatedKeysName = "exclude-user"
case ReplicatedSpansUserOnly:
replicatedKeysName = "user-only"
}
t.Run(fmt.Sprintf("replicatedSpans=%v", replicatedKeysName), func(t *testing.T) {
name := "all"
if replicatedOnly {
name = "replicatedOnly"
}
w := echotest.NewWalker(t, filepath.Join(path, parName, name, replicatedKeysName))

w.Run(t, "output", func(t *testing.T) string {
var innerBuf strings.Builder
tbl := tablewriter.NewWriter(&innerBuf)
// Print contents of the Replica according to the iterator.
verifyIterateReplicaKeySpans(t, tbl, &tc.desc, eng, replicatedOnly)
w.Run(t, "output", func(t *testing.T) string {
var innerBuf strings.Builder
tbl := tablewriter.NewWriter(&innerBuf)
// Print contents of the Replica according to the iterator.
verifyIterateReplicaKeySpans(t, tbl, &tc.desc, eng, replicatedOnly, replicatedSpans[i])

tbl.Render()
return innerBuf.String()
})(t)
tbl.Render()
return innerBuf.String()
})(t)
})
}
})
})
}
Expand Down Expand Up @@ -449,7 +468,7 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) {
}

var actualSpans []roachpb.Span
require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly,
require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly, ReplicatedSpansAll,
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
// We should never see any point keys.
require.Equal(t, storage.IterKeyTypeRangesOnly, keyType)
Expand Down Expand Up @@ -556,7 +575,7 @@ func benchReplicaEngineDataIterator(b *testing.B, numRanges, numKeysPerRange, va

for i := 0; i < b.N; i++ {
for _, desc := range descs {
err := IterateReplicaKeySpans(&desc, snapshot, false, /* replicatedOnly */
err := IterateReplicaKeySpans(&desc, snapshot, false /* replicatedOnly */, ReplicatedSpansAll,
func(iter storage.EngineIterator, _ roachpb.Span, _ storage.IterKeyType) error {
var err error
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
Expand Down
59 changes: 40 additions & 19 deletions pkg/kv/kvserver/rditer/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
)

type ReplicatedSpansFilter int

const (
// ReplicatedSpansAll includes all replicated spans, including user keys,
// range descriptors, and lock keys.
ReplicatedSpansAll ReplicatedSpansFilter = iota
// ReplicatedSpansExcludeUser includes all replicated spans except for user keys.
ReplicatedSpansExcludeUser
// ReplicatedSpansUserOnly includes just user keys, and no other replicated
// spans.
ReplicatedSpansUserOnly
)

// SelectOpts configures which spans for a Replica to return from Select.
// A Replica comprises replicated (i.e. belonging to the state machine) spans
// and unreplicated spans, and depending on external circumstances one may want
Expand All @@ -24,6 +37,10 @@ type SelectOpts struct {
// key. This includes user keys, range descriptors, and locks (separated
// intents).
ReplicatedBySpan roachpb.RSpan
// ReplicatedSpansFilter specifies which of the replicated spans indicated by
// ReplicatedBySpan should be returned or excluded. The zero value,
// ReplicatedSpansAll, returns all replicated spans.
ReplicatedSpansFilter ReplicatedSpansFilter
// ReplicatedByRangeID selects all RangeID-keyed replicated keys. An example
// of a key that falls into this Span is the GCThresholdKey.
ReplicatedByRangeID bool
Expand Down Expand Up @@ -60,27 +77,31 @@ func Select(rangeID roachpb.RangeID, opts SelectOpts) []roachpb.Span {
// See also the comment on KeySpan.
in := opts.ReplicatedBySpan
adjustedIn := in.KeySpan()
sl = append(sl, makeRangeLocalKeySpan(in))
if opts.ReplicatedSpansFilter != ReplicatedSpansUserOnly {
sl = append(sl, makeRangeLocalKeySpan(in))

// Lock table.
{
// Handle doubly-local lock table keys since range descriptor key
// is a range local key that can have a replicated lock acquired on it.
startRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.Key), nil)
endRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.EndKey), nil)
// Need adjusted start key to avoid overlapping with the local lock span right above.
startGlobal, _ := keys.LockTableSingleKey(adjustedIn.Key.AsRawKey(), nil)
endGlobal, _ := keys.LockTableSingleKey(adjustedIn.EndKey.AsRawKey(), nil)
sl = append(sl, roachpb.Span{
Key: startRangeLocal,
EndKey: endRangeLocal,
}, roachpb.Span{
Key: startGlobal,
EndKey: endGlobal,
})
// Lock table.
{
// Handle doubly-local lock table keys since range descriptor key
// is a range local key that can have a replicated lock acquired on it.
startRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.Key), nil)
endRangeLocal, _ := keys.LockTableSingleKey(keys.MakeRangeKeyPrefix(in.EndKey), nil)
// Need adjusted start key to avoid overlapping with the local lock span right above.
startGlobal, _ := keys.LockTableSingleKey(adjustedIn.Key.AsRawKey(), nil)
endGlobal, _ := keys.LockTableSingleKey(adjustedIn.EndKey.AsRawKey(), nil)
sl = append(sl, roachpb.Span{
Key: startRangeLocal,
EndKey: endRangeLocal,
}, roachpb.Span{
Key: startGlobal,
EndKey: endGlobal,
})
}
}
if opts.ReplicatedSpansFilter != ReplicatedSpansExcludeUser {
// Adjusted span because r1's "normal" keyspace starts only at LocalMax, not RKeyMin.
sl = append(sl, adjustedIn.AsRawSpanWithNoLocals())
}
// Adjusted span because r1's "normal" keyspace starts only at LocalMax, not RKeyMin.
sl = append(sl, adjustedIn.AsRawSpanWithNoLocals())
}
return sl
}
23 changes: 21 additions & 2 deletions pkg/kv/kvserver/rditer/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func TestSelect(t *testing.T) {

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
for _, tc := range []struct {
name string
sp roachpb.RSpan
name string
sp roachpb.RSpan
filter ReplicatedSpansFilter
}{
{
name: "no_span",
Expand All @@ -49,6 +50,23 @@ func TestSelect(t *testing.T) {
Key: roachpb.RKey("a"),
EndKey: roachpb.RKey("c"),
},
filter: ReplicatedSpansAll,
},
{
name: "r2_excludeuser",
sp: roachpb.RSpan{
Key: roachpb.RKey("a"),
EndKey: roachpb.RKey("c"),
},
filter: ReplicatedSpansExcludeUser,
},
{
name: "r2_useronly",
sp: roachpb.RSpan{
Key: roachpb.RKey("a"),
EndKey: roachpb.RKey("c"),
},
filter: ReplicatedSpansUserOnly,
},
{
name: "r3",
Expand All @@ -64,6 +82,7 @@ func TestSelect(t *testing.T) {
for _, unreplicatedByRangeID := range []bool{false, true} {
opts := SelectOpts{
ReplicatedBySpan: tc.sp,
ReplicatedSpansFilter: tc.filter,
ReplicatedByRangeID: replicatedByRangeID,
UnreplicatedByRangeID: unreplicatedByRangeID,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
echo
----
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
| SPAN | KEY HEX | ENDKEY HEX | VERSION HEX | PRETTY |
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
| /Local/RangeID/1/{r""-s""} | 016989726162632d120ce61c175eb445878c36dcf4062ada4c0001 | | | /Local/RangeID/1/r/AbortSpan/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" |
| /Local/RangeID/1/{r""-s""} | 016989726162632d129855a1ef8eb94c06a106cab1dda78a2b0001 | | | /Local/RangeID/1/r/AbortSpan/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" |
| /Local/RangeID/1/{r""-s""} | 016989726c67632d | | | /Local/RangeID/1/r/RangeGCThreshold |
| /Local/RangeID/1/{r""-s""} | 016989727261736b | | | /Local/RangeID/1/r/RangeAppliedState |
| /Local/RangeID/1/{r""-s""} | 01698972726c6c2d | | | /Local/RangeID/1/r/RangeLease |
| /Local/RangeID/1/{r""-s""} | 016989723a61 | 016989723a78 | 000000000000000109 | /Local/RangeID/1/r":{a"-x"}/0.000000001,0 |
| /Local/RangeID/1/{u""-v""} | 0169897572667462 | | | /Local/RangeID/1/u/RangeTombstone |
| /Local/RangeID/1/{u""-v""} | 0169897572667468 | | | /Local/RangeID/1/u/RaftHardState |
| /Local/RangeID/1/{u""-v""} | 016989757266746c0000000000000001 | | | /Local/RangeID/1/u/RaftLog/logIndex:1 |
| /Local/RangeID/1/{u""-v""} | 016989757266746c0000000000000002 | | | /Local/RangeID/1/u/RaftLog/logIndex:2 |
| /Local/RangeID/1/{u""-v""} | 01698975726c7274 | | | /Local/RangeID/1/u/RangeLastReplicaGCTimestamp |
| /Local/RangeID/1/{u""-v""} | 016989753a61 | 016989753a78 | 000000000000000109 | /Local/RangeID/1/u":{a"-x"}/0.000000001,0 |
| /Local/Range"{a"-b"} | 016b1261000172647363 | | 0000000000000001 | /Local/Range"a"/RangeDescriptor/0.000000001,0 |
| /Local/Range"{a"-b"} | 016b1261000174786e2d0ce61c175eb445878c36dcf4062ada4c | | | /Local/Range"a"/Transaction/"0ce61c17-5eb4-4587-8c36-dcf4062ada4c" |
| /Local/Range"{a"-b"} | 016b126100ff000174786e2d9855a1ef8eb94c06a106cab1dda78a2b | | | /Local/Range"a\x00"/Transaction/"9855a1ef-8eb9-4c06-a106-cab1dda78a2b" |
| /Local/Range"{a"-b"} | 016b1261ffffffff000174786e2d295e727c8ca9437cbb5e8e2ebbad996f | | | /Local/Range"a\xff\xff\xff\xff"/Transaction/"295e727c-8ca9-437c-bb5e-8e2ebbad996f" |
| /Local/Lock/Intent/Local/Range"{a"-b"} | 017a6b12016b126100ff01726473630001 | | 030ce61c175eb445878c36dcf4062ada4c | /Local/Range"a"/RangeDescriptor |
| /Local/Lock/Intent"{a"-b"} | 017a6b12610001 | | 030ce61c175eb445878c36dcf4062ada4c | "a" |
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
+----------------------------------------+--------------------------------------------------------------+--------------+------------------------------------+------------------------------------------------------------------------------------+
Loading