Skip to content

Commit

Permalink
rditer: replace ReplicaDataEngineIterator with `IterateReplicaKeySp…
Browse files Browse the repository at this point in the history
…ans`

This patch replaces `ReplicaDataEngineIterator` with
`IterateReplicaKeySpans()`.

The Raft snapshot protocol requires an unusual iteration order: for each
of the range's key spans, first send all point keys then send all range
keys. This allows ingesting SSTs that only span each key span (rather
than across wide swaths of the keyspace), with efficient, separate
iteration over point/range keys.

While the current `ReplicaDataEngineIterator` already does this, the
iteration order is rather peculiar, which can be unexpected to users.
`IterateReplicaKeySpans` attempts to make this iteration order more
explicit.

Other approaches were considered but rejected:

* Use an iterator generator, which returns the next iterator in the
  sequence. This was very unergonomic to use, since it required nested
  iteration loops (one over the iterators, and one per iterator) which
  make variable naming/reuse and iterator closing awkward.

* Use a visitor for each key instead of keyspan. This would again end up
  hiding the iteration sequence/structure. It also would make it
  impossible to skip across key spans, or do partial processing of a key
  span, since it could only abort the entire iteration.

Release note: None
  • Loading branch information
erikgrinaker committed Jul 26, 2022
1 parent 80b5818 commit d789c46
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 355 deletions.
41 changes: 20 additions & 21 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,27 +442,26 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error {
snapshot := db.NewSnapshot()
defer snapshot.Close()

iter := rditer.NewReplicaEngineDataIterator(&desc, snapshot, debugCtx.replicated)
defer iter.Close()
results := 0
var ok bool
for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() {
if hasPoint, _ := iter.HasPointAndRange(); !hasPoint {
// TODO(erikgrinaker): For now, just skip range keys. We should print
// them.
continue
}
var key storage.EngineKey
if key, err = iter.UnsafeKey(); err != nil {
break
}
kvserver.PrintEngineKeyValue(key, iter.UnsafeValue())
results++
if results == debugCtx.maxResults {
break
}
}
return err
var results int
return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated,
func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error {
if keyType == storage.IterKeyTypeRangesOnly {
// TODO(erikgrinaker): We should handle range keys, but we skip them for now.
return nil
}
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
var key storage.EngineKey
if key, err = iter.UnsafeEngineKey(); err != nil {
return err
}
kvserver.PrintEngineKeyValue(key, iter.UnsafeValue())
results++
if results == debugCtx.maxResults {
return iterutil.StopIteration()
}
}
return err
})
}

var debugRangeDescriptorsCmd = &cobra.Command{
Expand Down
73 changes: 45 additions & 28 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3761,48 +3761,65 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// ultimately keep the last one.
sendingEngSnapshot := sendingEng.NewSnapshot()
defer sendingEngSnapshot.Close()
keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc)
it := rditer.NewReplicaEngineDataIterator(
inSnap.Desc, sendingEngSnapshot, true /* replicatedOnly */)
defer it.Close()

// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
// Write a Pebble range deletion tombstone to each of the SSTs then put in
// the kv entries from the sender of the snapshot.
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()

ok, err := it.SeekStart()
require.NoError(t, err)
for _, s := range keySpans {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile)
if err := sst.ClearRawRange(s.Key, s.EndKey); err != nil {
type sstFileWriter struct {
span roachpb.Span
file *storage.MemFile
writer storage.SSTWriter
}
keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc)
sstFileWriters := map[string]sstFileWriter{}
for _, span := range keySpans {
file := &storage.MemFile{}
writer := storage.MakeIngestionSSTWriter(ctx, st, file)
if err := writer.ClearRawRange(span.Key, span.EndKey); err != nil {
return err
}
sstFileWriters[string(span.Key)] = sstFileWriter{
span: span,
file: file,
writer: writer,
}
}

// Keep adding kv data to the SST until the key exceeds the
// bounds of the range, then proceed to the next range.
for ; ok && err == nil; ok, err = it.Next() {
var key storage.EngineKey
if key, err = it.UnsafeKey(); err != nil {
return err
err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true, /* replicatedOnly */
func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error {
fw, ok := sstFileWriters[string(span.Key)]
if !ok || !fw.span.Equal(span) {
return errors.Errorf("unexpected span %s", span)
}
if s.EndKey.Compare(key.Key) <= 0 {
break
var err error
for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() {
var key storage.EngineKey
if key, err = iter.UnsafeEngineKey(); err != nil {
return err
}
if err := fw.writer.PutEngineKey(key, iter.UnsafeValue()); err != nil {
return err
}
}
if err := sst.PutEngineKey(key, it.UnsafeValue()); err != nil {
if err != nil {
return err
}
}
if err != nil {
return err
}
if err := sst.Finish(); err != nil {
return nil
})
if err != nil {
return err
}

for _, span := range keySpans {
fw := sstFileWriters[string(span.Key)]
if err := fw.writer.Finish(); err != nil {
return err
}
sst.Close()
expectedSSTs = append(expectedSSTs, sstFile.Data())
expectedSSTs = append(expectedSSTs, fw.file.Data())
}

if len(expectedSSTs) != 5 {
return errors.Errorf("len of expectedSSTs should expected to be %d, but got %d",
5, len(expectedSSTs))
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rditer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/iterutil",
],
)

Expand Down
181 changes: 44 additions & 137 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
)

// ReplicaDataIteratorOptions defines ReplicaMVCCDataIterator creation options.
Expand All @@ -33,11 +34,11 @@ type ReplicaDataIteratorOptions struct {
// the range's data. This cannot be used to iterate over keys that are not
// representable as MVCCKeys, except when such non-MVCCKeys are limited to
// intents, which can be made to look like interleaved MVCCKeys. Most callers
// want the real keys, and should use ReplicaEngineDataIterator.
// want the real keys, and should use IterateReplicaKeySpans.
//
// A ReplicaMVCCDataIterator provides a subset of the engine.MVCCIterator interface.
//
// TODO(sumeer): merge with ReplicaEngineDataIterator. We can use an EngineIterator
// TODO(sumeer): merge with IterateReplicaKeySpans. We can use an EngineIterator
// for MVCC key spans and convert from EngineKey to MVCCKey.
type ReplicaMVCCDataIterator struct {
ReplicaDataIteratorOptions
Expand All @@ -51,27 +52,6 @@ type ReplicaMVCCDataIterator struct {
err error
}

// ReplicaEngineDataIterator provides a complete iteration over all data in a
// range, including system-local metadata and user data. The ranges Span slice
// specifies the key spans which comprise the range's data.
//
// The iterator iterates over both point keys and range keys (i.e. MVCC range
// tombstones), but in a somewhat peculiar order: for each key span, it first
// iterates over all point keys in order, then over all range keys in order,
// signalled via HasPointAndRange(). This allows efficient non-interleaved
// iteration of point/range keys, and keeps them grouped by key span for
// efficient Raft snapshot ingestion into a single SST per key span.
//
// TODO(erikgrinaker): Reconsider the above ordering/scheme for point/range
// keys.
type ReplicaEngineDataIterator struct {
reader storage.Reader
curIndex int
curKeyType storage.IterKeyType
spans []roachpb.Span
it storage.EngineIterator
}

// MakeAllKeySpans returns all key spans for the given Range, in
// sorted order.
func MakeAllKeySpans(d *roachpb.RangeDescriptor) []roachpb.Span {
Expand Down Expand Up @@ -223,7 +203,7 @@ func MakeUserKeySpan(d *roachpb.RangeDescriptor) roachpb.Span {
// and not both upper and lower bound.
//
// TODO(erikgrinaker): ReplicaMVCCDataIterator does not support MVCC range keys.
// This should be deprecated in favor of e.g. ReplicaEngineDataIterator.
// This should be deprecated in favor of e.g. IterateReplicaKeySpans.
func NewReplicaMVCCDataIterator(
d *roachpb.RangeDescriptor, reader storage.Reader, opts ReplicaDataIteratorOptions,
) *ReplicaMVCCDataIterator {
Expand Down Expand Up @@ -371,124 +351,51 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) {
return ri.it.HasPointAndRange()
}

// NewReplicaEngineDataIterator creates a ReplicaEngineDataIterator for the
// given replica.
func NewReplicaEngineDataIterator(
desc *roachpb.RangeDescriptor, reader storage.Reader, replicatedOnly bool,
) *ReplicaEngineDataIterator {
// IterateReplicaKeySpans iterates over each of a range's key spans, and calls
// the given visitor with an iterator over its data. Specifically, it iterates
// over the spans returned by either MakeAllKeySpans or MakeReplicatedKeySpans,
// and for each one provides first a point key iterator and then a range key
// iterator. This is the expected order for Raft snapshots.
//
// The iterator will be pre-seeked to the span, and is provided along with the
// key span and key type (point or range). Iterators that have no data are
// skipped (i.e. when the seek exhausts the iterator). The iterator will
// automatically be closed when done. To halt iteration over key spans, return
// iterutil.StopIteration().
//
// Must use a reader with consistent iterators.
func IterateReplicaKeySpans(
desc *roachpb.RangeDescriptor,
reader storage.Reader,
replicatedOnly bool,
visitor func(storage.EngineIterator, roachpb.Span, storage.IterKeyType) error,
) error {
if !reader.ConsistentIterators() {
panic("ReplicaEngineDataIterator requires consistent iterators")
panic("reader must provide consistent iterators")
}

var ranges []roachpb.Span
var spans []roachpb.Span
if replicatedOnly {
ranges = MakeReplicatedKeySpans(desc)
spans = MakeReplicatedKeySpans(desc)
} else {
ranges = MakeAllKeySpans(desc)
spans = MakeAllKeySpans(desc)
}

return &ReplicaEngineDataIterator{
reader: reader,
spans: ranges,
}
}

// nextIter creates an iterator for the next non-empty key span/type and seeks
// it, closing the existing iterator if any. Returns false if all key spans and
// key types have been exhausted.
//
// TODO(erikgrinaker): Rather than creating a new iterator for each key span,
// we could expose an API to reconfigure the iterator with new bounds. However,
// the caller could also use e.g. a pebbleReadOnly which reuses the iterator
// internally. This should be benchmarked.
func (ri *ReplicaEngineDataIterator) nextIter() (bool, error) {
for {
if ri.it == nil {
ri.curIndex = 0
ri.curKeyType = storage.IterKeyTypePointsOnly
} else if ri.curKeyType == storage.IterKeyTypePointsOnly {
ri.curKeyType = storage.IterKeyTypeRangesOnly
} else if ri.curIndex+1 < len(ri.spans) {
ri.curIndex++
ri.curKeyType = storage.IterKeyTypePointsOnly
} else {
break
}
if ri.it != nil {
ri.it.Close()
}
keySpan := ri.spans[ri.curIndex]
ri.it = ri.reader.NewEngineIterator(storage.IterOptions{
KeyTypes: ri.curKeyType,
LowerBound: keySpan.Key,
UpperBound: keySpan.EndKey,
})
if ok, err := ri.it.SeekEngineKeyGE(storage.EngineKey{Key: keySpan.Key}); ok || err != nil {
return ok, err
keyTypes := []storage.IterKeyType{storage.IterKeyTypePointsOnly, storage.IterKeyTypeRangesOnly}
for _, span := range spans {
for _, keyType := range keyTypes {
iter := reader.NewEngineIterator(storage.IterOptions{
KeyTypes: keyType,
LowerBound: span.Key,
UpperBound: span.EndKey,
})
ok, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: span.Key})
if err == nil && ok {
err = visitor(iter, span, keyType)
}
iter.Close()
if err != nil {
return iterutil.Map(err)
}
}
}
return false, nil
}

// Close the underlying iterator.
func (ri *ReplicaEngineDataIterator) Close() {
if ri.it != nil {
ri.it.Close()
}
}

// SeekStart seeks the iterator to the start of the key spans.
// It returns false if the iterator did not find any data.
func (ri *ReplicaEngineDataIterator) SeekStart() (bool, error) {
if ri.it != nil {
ri.it.Close()
ri.it = nil
}
return ri.nextIter()
}

// Next advances to the next key in the iteration.
func (ri *ReplicaEngineDataIterator) Next() (bool, error) {
ok, err := ri.it.NextEngineKey()
if !ok && err == nil {
ok, err = ri.nextIter()
}
return ok, err
}

// Value returns the current value. Only used in tests.
func (ri *ReplicaEngineDataIterator) Value() []byte {
return append([]byte{}, ri.it.UnsafeValue()...)
}

// UnsafeKey returns the current key, but the memory is invalidated on the
// next call to {Next,Close}.
func (ri *ReplicaEngineDataIterator) UnsafeKey() (storage.EngineKey, error) {
return ri.it.UnsafeEngineKey()
}

// UnsafeValue returns the same value as Value, but the memory is invalidated on
// the next call to {Next,Close}.
func (ri *ReplicaEngineDataIterator) UnsafeValue() []byte {
return ri.it.UnsafeValue()
}

// HasPointAndRange returns whether the current position has a point or range
// key. ReplicaEngineDataIterator will never expose both a point key and range
// key on the same position. See struct comment for details.
func (ri *ReplicaEngineDataIterator) HasPointAndRange() (bool, bool) {
return ri.curKeyType == storage.IterKeyTypePointsOnly,
ri.curKeyType == storage.IterKeyTypeRangesOnly
}

// RangeBounds returns the current range key bounds, but the memory is
// invalidated on the next call to {Next,Close}.
func (ri *ReplicaEngineDataIterator) RangeBounds() (roachpb.Span, error) {
return ri.it.EngineRangeBounds()
}

// RangeKeys returns the current range keys, but the memory is invalidated on the
// next call to {Next,Close}.
func (ri *ReplicaEngineDataIterator) RangeKeys() []storage.EngineRangeKeyValue {
return ri.it.EngineRangeKeys()
return nil
}
Loading

0 comments on commit d789c46

Please sign in to comment.