diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index b084cc6a40af..1ef845134631 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -442,27 +442,26 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error { snapshot := db.NewSnapshot() defer snapshot.Close() - iter := rditer.NewReplicaEngineDataIterator(&desc, snapshot, debugCtx.replicated) - defer iter.Close() - results := 0 - var ok bool - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - if hasPoint, _ := iter.HasPointAndRange(); !hasPoint { - // TODO(erikgrinaker): For now, just skip range keys. We should print - // them. - continue - } - var key storage.EngineKey - if key, err = iter.UnsafeKey(); err != nil { - break - } - kvserver.PrintEngineKeyValue(key, iter.UnsafeValue()) - results++ - if results == debugCtx.maxResults { - break - } - } - return err + var results int + return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated, + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + if keyType == storage.IterKeyTypeRangesOnly { + // TODO(erikgrinaker): We should handle range keys, but we skip them for now. + return nil + } + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + var key storage.EngineKey + if key, err = iter.UnsafeEngineKey(); err != nil { + return err + } + kvserver.PrintEngineKeyValue(key, iter.UnsafeValue()) + results++ + if results == debugCtx.maxResults { + return iterutil.StopIteration() + } + } + return err + }) } var debugRangeDescriptorsCmd = &cobra.Command{ diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ab204545a57d..15c09e036efb 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3761,48 +3761,65 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // ultimately keep the last one. sendingEngSnapshot := sendingEng.NewSnapshot() defer sendingEngSnapshot.Close() - keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc) - it := rditer.NewReplicaEngineDataIterator( - inSnap.Desc, sendingEngSnapshot, true /* replicatedOnly */) - defer it.Close() - // Write a range deletion tombstone to each of the SSTs then put in the - // kv entries from the sender of the snapshot. + // Write a Pebble range deletion tombstone to each of the SSTs then put in + // the kv entries from the sender of the snapshot. ctx := context.Background() st := cluster.MakeTestingClusterSettings() - ok, err := it.SeekStart() - require.NoError(t, err) - for _, s := range keySpans { - sstFile := &storage.MemFile{} - sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile) - if err := sst.ClearRawRange(s.Key, s.EndKey); err != nil { + type sstFileWriter struct { + span roachpb.Span + file *storage.MemFile + writer storage.SSTWriter + } + keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc) + sstFileWriters := map[string]sstFileWriter{} + for _, span := range keySpans { + file := &storage.MemFile{} + writer := storage.MakeIngestionSSTWriter(ctx, st, file) + if err := writer.ClearRawRange(span.Key, span.EndKey); err != nil { return err } + sstFileWriters[string(span.Key)] = sstFileWriter{ + span: span, + file: file, + writer: writer, + } + } - // Keep adding kv data to the SST until the key exceeds the - // bounds of the range, then proceed to the next range. - for ; ok && err == nil; ok, err = it.Next() { - var key storage.EngineKey - if key, err = it.UnsafeKey(); err != nil { - return err + err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true, /* replicatedOnly */ + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + fw, ok := sstFileWriters[string(span.Key)] + if !ok || !fw.span.Equal(span) { + return errors.Errorf("unexpected span %s", span) } - if s.EndKey.Compare(key.Key) <= 0 { - break + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + var key storage.EngineKey + if key, err = iter.UnsafeEngineKey(); err != nil { + return err + } + if err := fw.writer.PutEngineKey(key, iter.UnsafeValue()); err != nil { + return err + } } - if err := sst.PutEngineKey(key, it.UnsafeValue()); err != nil { + if err != nil { return err } - } - if err != nil { - return err - } - if err := sst.Finish(); err != nil { + return nil + }) + if err != nil { + return err + } + + for _, span := range keySpans { + fw := sstFileWriters[string(span.Key)] + if err := fw.writer.Finish(); err != nil { return err } - sst.Close() - expectedSSTs = append(expectedSSTs, sstFile.Data()) + expectedSSTs = append(expectedSSTs, fw.file.Data()) } + if len(expectedSSTs) != 5 { return errors.Errorf("len of expectedSSTs should expected to be %d, but got %d", 5, len(expectedSSTs)) diff --git a/pkg/kv/kvserver/rditer/BUILD.bazel b/pkg/kv/kvserver/rditer/BUILD.bazel index c172aa6a73ed..223f2d938374 100644 --- a/pkg/kv/kvserver/rditer/BUILD.bazel +++ b/pkg/kv/kvserver/rditer/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/roachpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/iterutil", ], ) diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index 9a6f32312daf..078835b8c467 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" ) // ReplicaDataIteratorOptions defines ReplicaMVCCDataIterator creation options. @@ -33,11 +34,11 @@ type ReplicaDataIteratorOptions struct { // the range's data. This cannot be used to iterate over keys that are not // representable as MVCCKeys, except when such non-MVCCKeys are limited to // intents, which can be made to look like interleaved MVCCKeys. Most callers -// want the real keys, and should use ReplicaEngineDataIterator. +// want the real keys, and should use IterateReplicaKeySpans. // // A ReplicaMVCCDataIterator provides a subset of the engine.MVCCIterator interface. // -// TODO(sumeer): merge with ReplicaEngineDataIterator. We can use an EngineIterator +// TODO(sumeer): merge with IterateReplicaKeySpans. We can use an EngineIterator // for MVCC key spans and convert from EngineKey to MVCCKey. type ReplicaMVCCDataIterator struct { ReplicaDataIteratorOptions @@ -51,27 +52,6 @@ type ReplicaMVCCDataIterator struct { err error } -// ReplicaEngineDataIterator provides a complete iteration over all data in a -// range, including system-local metadata and user data. The ranges Span slice -// specifies the key spans which comprise the range's data. -// -// The iterator iterates over both point keys and range keys (i.e. MVCC range -// tombstones), but in a somewhat peculiar order: for each key span, it first -// iterates over all point keys in order, then over all range keys in order, -// signalled via HasPointAndRange(). This allows efficient non-interleaved -// iteration of point/range keys, and keeps them grouped by key span for -// efficient Raft snapshot ingestion into a single SST per key span. -// -// TODO(erikgrinaker): Reconsider the above ordering/scheme for point/range -// keys. -type ReplicaEngineDataIterator struct { - reader storage.Reader - curIndex int - curKeyType storage.IterKeyType - spans []roachpb.Span - it storage.EngineIterator -} - // MakeAllKeySpans returns all key spans for the given Range, in // sorted order. func MakeAllKeySpans(d *roachpb.RangeDescriptor) []roachpb.Span { @@ -223,7 +203,7 @@ func MakeUserKeySpan(d *roachpb.RangeDescriptor) roachpb.Span { // and not both upper and lower bound. // // TODO(erikgrinaker): ReplicaMVCCDataIterator does not support MVCC range keys. -// This should be deprecated in favor of e.g. ReplicaEngineDataIterator. +// This should be deprecated in favor of e.g. IterateReplicaKeySpans. func NewReplicaMVCCDataIterator( d *roachpb.RangeDescriptor, reader storage.Reader, opts ReplicaDataIteratorOptions, ) *ReplicaMVCCDataIterator { @@ -371,124 +351,51 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) { return ri.it.HasPointAndRange() } -// NewReplicaEngineDataIterator creates a ReplicaEngineDataIterator for the -// given replica. -func NewReplicaEngineDataIterator( - desc *roachpb.RangeDescriptor, reader storage.Reader, replicatedOnly bool, -) *ReplicaEngineDataIterator { +// IterateReplicaKeySpans iterates over each of a range's key spans, and calls +// the given visitor with an iterator over its data. Specifically, it iterates +// over the spans returned by either MakeAllKeySpans or MakeReplicatedKeySpans, +// and for each one provides first a point key iterator and then a range key +// iterator. This is the expected order for Raft snapshots. +// +// The iterator will be pre-seeked to the span, and is provided along with the +// key span and key type (point or range). Iterators that have no data are +// skipped (i.e. when the seek exhausts the iterator). The iterator will +// automatically be closed when done. To halt iteration over key spans, return +// iterutil.StopIteration(). +// +// Must use a reader with consistent iterators. +func IterateReplicaKeySpans( + desc *roachpb.RangeDescriptor, + reader storage.Reader, + replicatedOnly bool, + visitor func(storage.EngineIterator, roachpb.Span, storage.IterKeyType) error, +) error { if !reader.ConsistentIterators() { - panic("ReplicaEngineDataIterator requires consistent iterators") + panic("reader must provide consistent iterators") } - - var ranges []roachpb.Span + var spans []roachpb.Span if replicatedOnly { - ranges = MakeReplicatedKeySpans(desc) + spans = MakeReplicatedKeySpans(desc) } else { - ranges = MakeAllKeySpans(desc) + spans = MakeAllKeySpans(desc) } - - return &ReplicaEngineDataIterator{ - reader: reader, - spans: ranges, - } -} - -// nextIter creates an iterator for the next non-empty key span/type and seeks -// it, closing the existing iterator if any. Returns false if all key spans and -// key types have been exhausted. -// -// TODO(erikgrinaker): Rather than creating a new iterator for each key span, -// we could expose an API to reconfigure the iterator with new bounds. However, -// the caller could also use e.g. a pebbleReadOnly which reuses the iterator -// internally. This should be benchmarked. -func (ri *ReplicaEngineDataIterator) nextIter() (bool, error) { - for { - if ri.it == nil { - ri.curIndex = 0 - ri.curKeyType = storage.IterKeyTypePointsOnly - } else if ri.curKeyType == storage.IterKeyTypePointsOnly { - ri.curKeyType = storage.IterKeyTypeRangesOnly - } else if ri.curIndex+1 < len(ri.spans) { - ri.curIndex++ - ri.curKeyType = storage.IterKeyTypePointsOnly - } else { - break - } - if ri.it != nil { - ri.it.Close() - } - keySpan := ri.spans[ri.curIndex] - ri.it = ri.reader.NewEngineIterator(storage.IterOptions{ - KeyTypes: ri.curKeyType, - LowerBound: keySpan.Key, - UpperBound: keySpan.EndKey, - }) - if ok, err := ri.it.SeekEngineKeyGE(storage.EngineKey{Key: keySpan.Key}); ok || err != nil { - return ok, err + keyTypes := []storage.IterKeyType{storage.IterKeyTypePointsOnly, storage.IterKeyTypeRangesOnly} + for _, span := range spans { + for _, keyType := range keyTypes { + iter := reader.NewEngineIterator(storage.IterOptions{ + KeyTypes: keyType, + LowerBound: span.Key, + UpperBound: span.EndKey, + }) + ok, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: span.Key}) + if err == nil && ok { + err = visitor(iter, span, keyType) + } + iter.Close() + if err != nil { + return iterutil.Map(err) + } } } - return false, nil -} - -// Close the underlying iterator. -func (ri *ReplicaEngineDataIterator) Close() { - if ri.it != nil { - ri.it.Close() - } -} - -// SeekStart seeks the iterator to the start of the key spans. -// It returns false if the iterator did not find any data. -func (ri *ReplicaEngineDataIterator) SeekStart() (bool, error) { - if ri.it != nil { - ri.it.Close() - ri.it = nil - } - return ri.nextIter() -} - -// Next advances to the next key in the iteration. -func (ri *ReplicaEngineDataIterator) Next() (bool, error) { - ok, err := ri.it.NextEngineKey() - if !ok && err == nil { - ok, err = ri.nextIter() - } - return ok, err -} - -// Value returns the current value. Only used in tests. -func (ri *ReplicaEngineDataIterator) Value() []byte { - return append([]byte{}, ri.it.UnsafeValue()...) -} - -// UnsafeKey returns the current key, but the memory is invalidated on the -// next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) UnsafeKey() (storage.EngineKey, error) { - return ri.it.UnsafeEngineKey() -} - -// UnsafeValue returns the same value as Value, but the memory is invalidated on -// the next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) UnsafeValue() []byte { - return ri.it.UnsafeValue() -} - -// HasPointAndRange returns whether the current position has a point or range -// key. ReplicaEngineDataIterator will never expose both a point key and range -// key on the same position. See struct comment for details. -func (ri *ReplicaEngineDataIterator) HasPointAndRange() (bool, bool) { - return ri.curKeyType == storage.IterKeyTypePointsOnly, - ri.curKeyType == storage.IterKeyTypeRangesOnly -} - -// RangeBounds returns the current range key bounds, but the memory is -// invalidated on the next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) RangeBounds() (roachpb.Span, error) { - return ri.it.EngineRangeBounds() -} - -// RangeKeys returns the current range keys, but the memory is invalidated on the -// next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) RangeKeys() []storage.EngineRangeKeyValue { - return ri.it.EngineRangeKeys() + return nil } diff --git a/pkg/kv/kvserver/rditer/replica_data_iter_test.go b/pkg/kv/kvserver/rditer/replica_data_iter_test.go index 8eb219e9e614..ffe1bb4ae70d 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter_test.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter_test.go @@ -203,49 +203,59 @@ func verifyRDReplicatedOnlyMVCCIter( }) } -// verifyRDEngineIter verifies that the ReplicaEngineDataIterator returns the +// verifyIterateReplicaKeySpans verifies that IterateReplicaKeySpans returns the // expected keys in the expected order. The expected keys can be either MVCCKey // or MVCCRangeKey. -func verifyRDEngineIter( +func verifyIterateReplicaKeySpans( t *testing.T, desc *roachpb.RangeDescriptor, eng storage.Engine, replicatedOnly bool, expectedKeys []interface{}, ) { - readWriter := eng.NewReadOnly(storage.StandardDurability) + readWriter := eng.NewSnapshot() defer readWriter.Close() - iter := NewReplicaEngineDataIterator(desc, readWriter, replicatedOnly) - defer iter.Close() actualKeys := []interface{}{} - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - hasPoint, hasRange := iter.HasPointAndRange() - if hasPoint { - key, err := iter.UnsafeKey() - require.NoError(t, err) - require.True(t, key.IsMVCCKey()) - mvccKey, err := key.ToMVCCKey() - require.NoError(t, err) - actualKeys = append(actualKeys, mvccKey.Clone()) - } - if hasRange { - bounds, err := iter.RangeBounds() - require.NoError(t, err) - for _, rk := range iter.RangeKeys() { - ts, err := storage.DecodeMVCCTimestampSuffix(rk.Version) + require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly, + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + // Span should not be empty. + require.NotZero(t, span) + + // Key should be in the given span. + key, err := iter.UnsafeEngineKey() require.NoError(t, err) - actualKeys = append(actualKeys, storage.MVCCRangeKey{ - StartKey: bounds.Key.Clone(), - EndKey: bounds.EndKey.Clone(), - Timestamp: ts, - }) + require.True(t, key.IsMVCCKey()) + require.True(t, span.ContainsKey(key.Key), "%s not in %s", key, span) + + switch keyType { + case storage.IterKeyTypePointsOnly: + mvccKey, err := key.ToMVCCKey() + require.NoError(t, err) + actualKeys = append(actualKeys, mvccKey.Clone()) + + case storage.IterKeyTypeRangesOnly: + bounds, err := iter.EngineRangeBounds() + require.NoError(t, err) + require.True(t, span.Contains(bounds), "%s not contained in %s", bounds, span) + for _, rk := range iter.EngineRangeKeys() { + ts, err := storage.DecodeMVCCTimestampSuffix(rk.Version) + require.NoError(t, err) + actualKeys = append(actualKeys, storage.MVCCRangeKey{ + StartKey: bounds.Key.Clone(), + EndKey: bounds.EndKey.Clone(), + Timestamp: ts, + }) + } + + default: + t.Fatalf("unexpected key type %v", keyType) + } } - } - } - require.NoError(t, err) + return err + })) require.Equal(t, expectedKeys, actualKeys) } @@ -264,8 +274,8 @@ func TestReplicaDataIteratorEmptyRange(t *testing.T) { } verifyRDReplicatedOnlyMVCCIter(t, desc, eng, []storage.MVCCKey{}, []storage.MVCCRangeKey{}) - verifyRDEngineIter(t, desc, eng, false, []interface{}{}) - verifyRDEngineIter(t, desc, eng, true, []interface{}{}) + verifyIterateReplicaKeySpans(t, desc, eng, false, []interface{}{}) + verifyIterateReplicaKeySpans(t, desc, eng, true, []interface{}{}) } // TestReplicaDataIterator creates three ranges (a-b, b-c, c-d) and fills each @@ -311,8 +321,8 @@ func TestReplicaDataIterator(t *testing.T) { t.Run(tc.desc.RSpan().String(), func(t *testing.T) { // Verify the replicated and unreplicated engine contents. - verifyRDEngineIter(t, &tc.desc, eng, false, tc.allKeys) - verifyRDEngineIter(t, &tc.desc, eng, true, tc.replicatedKeys) + verifyIterateReplicaKeySpans(t, &tc.desc, eng, false, tc.allKeys) + verifyIterateReplicaKeySpans(t, &tc.desc, eng, true, tc.replicatedKeys) // Verify the replicated MVCC contents. var pointKeys []storage.MVCCKey @@ -379,11 +389,8 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) { } for _, desc := range descs { t.Run(desc.KeySpan().String(), func(t *testing.T) { - // An iterator should see range keys spanning all relevant key spans. + // Iterators should see range keys spanning all relevant key spans. testutils.RunTrueAndFalse(t, "replicatedOnly", func(t *testing.T, replicatedOnly bool) { - rangeIter := NewReplicaEngineDataIterator(&desc, snapshot, replicatedOnly) - defer rangeIter.Close() - var expectedSpans []roachpb.Span if replicatedOnly { expectedSpans = MakeReplicatedKeySpans(&desc) @@ -392,14 +399,24 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) { } var actualSpans []roachpb.Span - var ok bool - var err error - for ok, err = rangeIter.SeekStart(); ok && err == nil; ok, err = rangeIter.Next() { - bounds, err := rangeIter.RangeBounds() - require.NoError(t, err) - actualSpans = append(actualSpans, bounds.Clone()) - } - require.NoError(t, err) + require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly, + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + // We should never see any point keys. + require.Equal(t, storage.IterKeyTypeRangesOnly, keyType) + + // The iterator should already be positioned on the range key, which should + // span the entire key span and be the only range key. + bounds, err := iter.EngineRangeBounds() + require.NoError(t, err) + require.Equal(t, span, bounds) + actualSpans = append(actualSpans, bounds.Clone()) + + ok, err := iter.NextEngineKey() + require.NoError(t, err) + require.False(t, ok) + + return nil + })) require.Equal(t, expectedSpans, actualSpans) }) }) @@ -490,14 +507,15 @@ func benchReplicaEngineDataIterator(b *testing.B, numRanges, numKeysPerRange, va for i := 0; i < b.N; i++ { for _, desc := range descs { - iter := NewReplicaEngineDataIterator(&desc, snapshot, false /* replicatedOnly */) - defer iter.Close() - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - _, _ = iter.UnsafeKey() - _ = iter.UnsafeValue() - } + err := IterateReplicaKeySpans(&desc, snapshot, false, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, _ storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + _, _ = iter.UnsafeEngineKey() + _ = iter.UnsafeValue() + } + return err + }) if err != nil { require.NoError(b, err) } diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 615e1849f7ed..138d608e76fe 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -1711,44 +1712,42 @@ func getExpectedSnapshotSizeBytes( } defer snap.Close() - totalBytes := int64(0) - var b storage.Batch - defer func() { - b.Close() - }() - b = originStore.Engine().NewUnindexedBatch(true) - iter := snap.Iter - var ok bool - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - hasPoint, hasRange := iter.HasPointAndRange() - if hasPoint { - var unsafeKey storage.EngineKey - if unsafeKey, err = iter.UnsafeKey(); err != nil { - return 0, err - } - if err := b.PutEngineKey(unsafeKey, iter.UnsafeValue()); err != nil { - return 0, err - } - } - if hasRange { - bounds, err := iter.RangeBounds() - if err != nil { - return 0, err - } - for _, rkv := range iter.RangeKeys() { - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return 0, err + b := originStore.Engine().NewUnindexedBatch(true) + defer b.Close() + + err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + switch keyType { + case storage.IterKeyTypePointsOnly: + unsafeKey, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + if err := b.PutEngineKey(unsafeKey, iter.UnsafeValue()); err != nil { + return err + } + + case storage.IterKeyTypeRangesOnly: + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) + if err != nil { + return err + } + } + + default: + return errors.Errorf("unexpected key type %v", keyType) } } - } - } - if err != nil { - return 0, err - } - totalBytes += int64(b.Len()) - - return totalBytes, nil + return err + }) + return int64(b.Len()), err } // Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 2aa04a45a7ba..c2312ccab4c3 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -497,10 +497,8 @@ type OutgoingSnapshot struct { SnapUUID uuid.UUID // The Raft snapshot message to send. Contains SnapUUID as its data. RaftSnap raftpb.Snapshot - // The RocksDB snapshot that will be streamed from. + // The Pebble snapshot that will be streamed from. EngineSnap storage.Reader - // The complete range iterator for the snapshot to stream. - Iter *rditer.ReplicaEngineDataIterator // The replica state within the snapshot. State kvserverpb.ReplicaState // Allows access the original Replica's sideloaded storage. Note that @@ -525,7 +523,6 @@ func (s OutgoingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { // Close releases the resources associated with the snapshot. func (s *OutgoingSnapshot) Close() { - s.Iter.Close() s.EngineSnap.Close() if s.onClose != nil { s.onClose() @@ -598,15 +595,10 @@ func snapshot( return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex) } - // Intentionally let this iterator and the snapshot escape so that the - // streamer can send chunks from it bit by bit. - iter := rditer.NewReplicaEngineDataIterator(&desc, snap, true /* replicatedOnly */) - return OutgoingSnapshot{ RaftEntryCache: eCache, WithSideloaded: withSideloaded, EngineSnap: snap, - Iter: iter, State: state, SnapUUID: snapUUID, RaftSnap: raftpb.Snapshot{ diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 9a0207b4d31c..7ec4672b0c28 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7068,38 +7068,36 @@ func TestReplicaDestroy(t *testing.T) { tc.Start(ctx, t, stopper) repl, err := tc.store.GetReplica(1) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) func() { tc.repl.raftMu.Lock() defer tc.repl.raftMu.Unlock() - if _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{ + _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{ DestroyData: true, - }); err != nil { - t.Fatal(err) - } + }) + require.NoError(t, err) }() engSnapshot := tc.repl.store.Engine().NewSnapshot() defer engSnapshot.Close() - iter := rditer.NewReplicaEngineDataIterator(tc.repl.Desc(), engSnapshot, - false /* replicatedOnly */) - defer iter.Close() - ok, err := iter.SeekStart() - require.NoError(t, err) - require.True(t, ok, "expected a tombstone key, but iterator was empty") - // If the range is destroyed, only a tombstone key should be there. - k1, err := iter.UnsafeKey() - require.NoError(t, err) - require.Equal(t, keys.RangeTombstoneKey(tc.repl.RangeID), k1.Key) - - ok, err = iter.Next() - require.NoError(t, err) - require.False(t, ok, "expected destroyed replica to only have a tombstone key, but found more") + expectedKeys := []roachpb.Key{keys.RangeTombstoneKey(tc.repl.RangeID)} + actualKeys := []roachpb.Key{} + + require.NoError(t, rditer.IterateReplicaKeySpans(tc.repl.Desc(), engSnapshot, false, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + require.Equal(t, storage.IterKeyTypePointsOnly, keyType) + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + key, err := iter.UnsafeEngineKey() + require.NoError(t, err) + actualKeys = append(actualKeys, key.Key.Clone()) + } + return err + })) + require.Equal(t, expectedKeys, actualKeys) } // TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index c932ff7c221b..5f057d332400 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -403,11 +403,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // bytesSent is updated as key-value batches are sent with sendBatch. It does // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). - bytesSent := int64(0) - - // Iterate over all keys (point keys and range keys) using the provided - // iterator and stream out batches of key-values. + var bytesSent int64 var kvs, rangeKVs int + + // Iterate over all keys (point keys and range keys) and stream out batches of + // key-values. var b storage.Batch defer func() { if b != nil { @@ -427,57 +427,61 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return nil } - iter := snap.Iter - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - // NB: EngineReplicaDataIterator will never expose point/range keys - // together: it first iterates over all point keys in a key span, then over - // all range keys in a key span, then moves onto the next key span. - hasPoint, hasRange := iter.HasPointAndRange() - - if hasPoint { - kvs++ - var unsafeKey storage.EngineKey - if unsafeKey, err = iter.UnsafeKey(); err != nil { - return 0, err - } - unsafeValue := iter.UnsafeValue() - if b == nil { - b = kvSS.newBatch() - } - if err := b.PutEngineKey(unsafeKey, unsafeValue); err != nil { - return 0, err - } - if int64(b.Len()) >= kvSS.batchSize { - if err = flushBatch(); err != nil { - return 0, err - } - } + maybeFlushBatch := func() error { + if int64(b.Len()) >= kvSS.batchSize { + return flushBatch() } + return nil + } - if hasRange { - bounds, err := iter.RangeBounds() - if err != nil { - return 0, err - } - for _, rkv := range iter.RangeKeys() { - rangeKVs++ - if b == nil { - b = kvSS.newBatch() - } - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return 0, err + err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + var err error + switch keyType { + case storage.IterKeyTypePointsOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + kvs++ + if b == nil { + b = kvSS.newBatch() + } + key, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + if err = b.PutEngineKey(key, iter.UnsafeValue()); err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } } - if bLen := int64(b.Len()); bLen >= kvSS.batchSize { - if err = flushBatch(); err != nil { - return 0, err + + case storage.IterKeyTypeRangesOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + rangeKVs++ + if b == nil { + b = kvSS.newBatch() + } + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) + if err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } } } + + default: + return errors.AssertionFailedf("unexpected key type %v", keyType) } - } - } + return err + }) if err != nil { return 0, err }