From d789c46cd38d2d7786faa5d2b8a0e07c5f5ed10c Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Fri, 8 Jul 2022 11:25:09 +0000 Subject: [PATCH] rditer: replace `ReplicaDataEngineIterator` with `IterateReplicaKeySpans` This patch replaces `ReplicaDataEngineIterator` with `IterateReplicaKeySpans()`. The Raft snapshot protocol requires an unusual iteration order: for each of the range's key spans, first send all point keys then send all range keys. This allows ingesting SSTs that only span each key span (rather than across wide swaths of the keyspace), with efficient, separate iteration over point/range keys. While the current `ReplicaDataEngineIterator` already does this, the iteration order is rather peculiar, which can be unexpected to users. `IterateReplicaKeySpans` attempts to make this iteration order more explicit. Other approaches were considered but rejected: * Use an iterator generator, which returns the next iterator in the sequence. This was very unergonomic to use, since it required nested iteration loops (one over the iterators, and one per iterator) which make variable naming/reuse and iterator closing awkward. * Use a visitor for each key instead of keyspan. This would again end up hiding the iteration sequence/structure. It also would make it impossible to skip across key spans, or do partial processing of a key span, since it could only abort the entire iteration. Release note: None --- pkg/cli/debug.go | 41 ++-- pkg/kv/kvserver/client_merge_test.go | 73 ++++--- pkg/kv/kvserver/rditer/BUILD.bazel | 1 + pkg/kv/kvserver/rditer/replica_data_iter.go | 181 +++++------------- .../kvserver/rditer/replica_data_iter_test.go | 126 ++++++------ pkg/kv/kvserver/replica_learner_test.go | 71 ++++--- pkg/kv/kvserver/replica_raftstorage.go | 10 +- pkg/kv/kvserver/replica_test.go | 40 ++-- pkg/kv/kvserver/store_snapshot.go | 102 +++++----- 9 files changed, 290 insertions(+), 355 deletions(-) diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index b084cc6a40af..1ef845134631 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -442,27 +442,26 @@ func runDebugRangeData(cmd *cobra.Command, args []string) error { snapshot := db.NewSnapshot() defer snapshot.Close() - iter := rditer.NewReplicaEngineDataIterator(&desc, snapshot, debugCtx.replicated) - defer iter.Close() - results := 0 - var ok bool - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - if hasPoint, _ := iter.HasPointAndRange(); !hasPoint { - // TODO(erikgrinaker): For now, just skip range keys. We should print - // them. - continue - } - var key storage.EngineKey - if key, err = iter.UnsafeKey(); err != nil { - break - } - kvserver.PrintEngineKeyValue(key, iter.UnsafeValue()) - results++ - if results == debugCtx.maxResults { - break - } - } - return err + var results int + return rditer.IterateReplicaKeySpans(&desc, snapshot, debugCtx.replicated, + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + if keyType == storage.IterKeyTypeRangesOnly { + // TODO(erikgrinaker): We should handle range keys, but we skip them for now. + return nil + } + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + var key storage.EngineKey + if key, err = iter.UnsafeEngineKey(); err != nil { + return err + } + kvserver.PrintEngineKeyValue(key, iter.UnsafeValue()) + results++ + if results == debugCtx.maxResults { + return iterutil.StopIteration() + } + } + return err + }) } var debugRangeDescriptorsCmd = &cobra.Command{ diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index ab204545a57d..15c09e036efb 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3761,48 +3761,65 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // ultimately keep the last one. sendingEngSnapshot := sendingEng.NewSnapshot() defer sendingEngSnapshot.Close() - keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc) - it := rditer.NewReplicaEngineDataIterator( - inSnap.Desc, sendingEngSnapshot, true /* replicatedOnly */) - defer it.Close() - // Write a range deletion tombstone to each of the SSTs then put in the - // kv entries from the sender of the snapshot. + // Write a Pebble range deletion tombstone to each of the SSTs then put in + // the kv entries from the sender of the snapshot. ctx := context.Background() st := cluster.MakeTestingClusterSettings() - ok, err := it.SeekStart() - require.NoError(t, err) - for _, s := range keySpans { - sstFile := &storage.MemFile{} - sst := storage.MakeIngestionSSTWriter(ctx, st, sstFile) - if err := sst.ClearRawRange(s.Key, s.EndKey); err != nil { + type sstFileWriter struct { + span roachpb.Span + file *storage.MemFile + writer storage.SSTWriter + } + keySpans := rditer.MakeReplicatedKeySpans(inSnap.Desc) + sstFileWriters := map[string]sstFileWriter{} + for _, span := range keySpans { + file := &storage.MemFile{} + writer := storage.MakeIngestionSSTWriter(ctx, st, file) + if err := writer.ClearRawRange(span.Key, span.EndKey); err != nil { return err } + sstFileWriters[string(span.Key)] = sstFileWriter{ + span: span, + file: file, + writer: writer, + } + } - // Keep adding kv data to the SST until the key exceeds the - // bounds of the range, then proceed to the next range. - for ; ok && err == nil; ok, err = it.Next() { - var key storage.EngineKey - if key, err = it.UnsafeKey(); err != nil { - return err + err := rditer.IterateReplicaKeySpans(inSnap.Desc, sendingEngSnapshot, true, /* replicatedOnly */ + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + fw, ok := sstFileWriters[string(span.Key)] + if !ok || !fw.span.Equal(span) { + return errors.Errorf("unexpected span %s", span) } - if s.EndKey.Compare(key.Key) <= 0 { - break + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + var key storage.EngineKey + if key, err = iter.UnsafeEngineKey(); err != nil { + return err + } + if err := fw.writer.PutEngineKey(key, iter.UnsafeValue()); err != nil { + return err + } } - if err := sst.PutEngineKey(key, it.UnsafeValue()); err != nil { + if err != nil { return err } - } - if err != nil { - return err - } - if err := sst.Finish(); err != nil { + return nil + }) + if err != nil { + return err + } + + for _, span := range keySpans { + fw := sstFileWriters[string(span.Key)] + if err := fw.writer.Finish(); err != nil { return err } - sst.Close() - expectedSSTs = append(expectedSSTs, sstFile.Data()) + expectedSSTs = append(expectedSSTs, fw.file.Data()) } + if len(expectedSSTs) != 5 { return errors.Errorf("len of expectedSSTs should expected to be %d, but got %d", 5, len(expectedSSTs)) diff --git a/pkg/kv/kvserver/rditer/BUILD.bazel b/pkg/kv/kvserver/rditer/BUILD.bazel index c172aa6a73ed..223f2d938374 100644 --- a/pkg/kv/kvserver/rditer/BUILD.bazel +++ b/pkg/kv/kvserver/rditer/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/roachpb", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/iterutil", ], ) diff --git a/pkg/kv/kvserver/rditer/replica_data_iter.go b/pkg/kv/kvserver/rditer/replica_data_iter.go index 9a6f32312daf..078835b8c467 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" ) // ReplicaDataIteratorOptions defines ReplicaMVCCDataIterator creation options. @@ -33,11 +34,11 @@ type ReplicaDataIteratorOptions struct { // the range's data. This cannot be used to iterate over keys that are not // representable as MVCCKeys, except when such non-MVCCKeys are limited to // intents, which can be made to look like interleaved MVCCKeys. Most callers -// want the real keys, and should use ReplicaEngineDataIterator. +// want the real keys, and should use IterateReplicaKeySpans. // // A ReplicaMVCCDataIterator provides a subset of the engine.MVCCIterator interface. // -// TODO(sumeer): merge with ReplicaEngineDataIterator. We can use an EngineIterator +// TODO(sumeer): merge with IterateReplicaKeySpans. We can use an EngineIterator // for MVCC key spans and convert from EngineKey to MVCCKey. type ReplicaMVCCDataIterator struct { ReplicaDataIteratorOptions @@ -51,27 +52,6 @@ type ReplicaMVCCDataIterator struct { err error } -// ReplicaEngineDataIterator provides a complete iteration over all data in a -// range, including system-local metadata and user data. The ranges Span slice -// specifies the key spans which comprise the range's data. -// -// The iterator iterates over both point keys and range keys (i.e. MVCC range -// tombstones), but in a somewhat peculiar order: for each key span, it first -// iterates over all point keys in order, then over all range keys in order, -// signalled via HasPointAndRange(). This allows efficient non-interleaved -// iteration of point/range keys, and keeps them grouped by key span for -// efficient Raft snapshot ingestion into a single SST per key span. -// -// TODO(erikgrinaker): Reconsider the above ordering/scheme for point/range -// keys. -type ReplicaEngineDataIterator struct { - reader storage.Reader - curIndex int - curKeyType storage.IterKeyType - spans []roachpb.Span - it storage.EngineIterator -} - // MakeAllKeySpans returns all key spans for the given Range, in // sorted order. func MakeAllKeySpans(d *roachpb.RangeDescriptor) []roachpb.Span { @@ -223,7 +203,7 @@ func MakeUserKeySpan(d *roachpb.RangeDescriptor) roachpb.Span { // and not both upper and lower bound. // // TODO(erikgrinaker): ReplicaMVCCDataIterator does not support MVCC range keys. -// This should be deprecated in favor of e.g. ReplicaEngineDataIterator. +// This should be deprecated in favor of e.g. IterateReplicaKeySpans. func NewReplicaMVCCDataIterator( d *roachpb.RangeDescriptor, reader storage.Reader, opts ReplicaDataIteratorOptions, ) *ReplicaMVCCDataIterator { @@ -371,124 +351,51 @@ func (ri *ReplicaMVCCDataIterator) HasPointAndRange() (bool, bool) { return ri.it.HasPointAndRange() } -// NewReplicaEngineDataIterator creates a ReplicaEngineDataIterator for the -// given replica. -func NewReplicaEngineDataIterator( - desc *roachpb.RangeDescriptor, reader storage.Reader, replicatedOnly bool, -) *ReplicaEngineDataIterator { +// IterateReplicaKeySpans iterates over each of a range's key spans, and calls +// the given visitor with an iterator over its data. Specifically, it iterates +// over the spans returned by either MakeAllKeySpans or MakeReplicatedKeySpans, +// and for each one provides first a point key iterator and then a range key +// iterator. This is the expected order for Raft snapshots. +// +// The iterator will be pre-seeked to the span, and is provided along with the +// key span and key type (point or range). Iterators that have no data are +// skipped (i.e. when the seek exhausts the iterator). The iterator will +// automatically be closed when done. To halt iteration over key spans, return +// iterutil.StopIteration(). +// +// Must use a reader with consistent iterators. +func IterateReplicaKeySpans( + desc *roachpb.RangeDescriptor, + reader storage.Reader, + replicatedOnly bool, + visitor func(storage.EngineIterator, roachpb.Span, storage.IterKeyType) error, +) error { if !reader.ConsistentIterators() { - panic("ReplicaEngineDataIterator requires consistent iterators") + panic("reader must provide consistent iterators") } - - var ranges []roachpb.Span + var spans []roachpb.Span if replicatedOnly { - ranges = MakeReplicatedKeySpans(desc) + spans = MakeReplicatedKeySpans(desc) } else { - ranges = MakeAllKeySpans(desc) + spans = MakeAllKeySpans(desc) } - - return &ReplicaEngineDataIterator{ - reader: reader, - spans: ranges, - } -} - -// nextIter creates an iterator for the next non-empty key span/type and seeks -// it, closing the existing iterator if any. Returns false if all key spans and -// key types have been exhausted. -// -// TODO(erikgrinaker): Rather than creating a new iterator for each key span, -// we could expose an API to reconfigure the iterator with new bounds. However, -// the caller could also use e.g. a pebbleReadOnly which reuses the iterator -// internally. This should be benchmarked. -func (ri *ReplicaEngineDataIterator) nextIter() (bool, error) { - for { - if ri.it == nil { - ri.curIndex = 0 - ri.curKeyType = storage.IterKeyTypePointsOnly - } else if ri.curKeyType == storage.IterKeyTypePointsOnly { - ri.curKeyType = storage.IterKeyTypeRangesOnly - } else if ri.curIndex+1 < len(ri.spans) { - ri.curIndex++ - ri.curKeyType = storage.IterKeyTypePointsOnly - } else { - break - } - if ri.it != nil { - ri.it.Close() - } - keySpan := ri.spans[ri.curIndex] - ri.it = ri.reader.NewEngineIterator(storage.IterOptions{ - KeyTypes: ri.curKeyType, - LowerBound: keySpan.Key, - UpperBound: keySpan.EndKey, - }) - if ok, err := ri.it.SeekEngineKeyGE(storage.EngineKey{Key: keySpan.Key}); ok || err != nil { - return ok, err + keyTypes := []storage.IterKeyType{storage.IterKeyTypePointsOnly, storage.IterKeyTypeRangesOnly} + for _, span := range spans { + for _, keyType := range keyTypes { + iter := reader.NewEngineIterator(storage.IterOptions{ + KeyTypes: keyType, + LowerBound: span.Key, + UpperBound: span.EndKey, + }) + ok, err := iter.SeekEngineKeyGE(storage.EngineKey{Key: span.Key}) + if err == nil && ok { + err = visitor(iter, span, keyType) + } + iter.Close() + if err != nil { + return iterutil.Map(err) + } } } - return false, nil -} - -// Close the underlying iterator. -func (ri *ReplicaEngineDataIterator) Close() { - if ri.it != nil { - ri.it.Close() - } -} - -// SeekStart seeks the iterator to the start of the key spans. -// It returns false if the iterator did not find any data. -func (ri *ReplicaEngineDataIterator) SeekStart() (bool, error) { - if ri.it != nil { - ri.it.Close() - ri.it = nil - } - return ri.nextIter() -} - -// Next advances to the next key in the iteration. -func (ri *ReplicaEngineDataIterator) Next() (bool, error) { - ok, err := ri.it.NextEngineKey() - if !ok && err == nil { - ok, err = ri.nextIter() - } - return ok, err -} - -// Value returns the current value. Only used in tests. -func (ri *ReplicaEngineDataIterator) Value() []byte { - return append([]byte{}, ri.it.UnsafeValue()...) -} - -// UnsafeKey returns the current key, but the memory is invalidated on the -// next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) UnsafeKey() (storage.EngineKey, error) { - return ri.it.UnsafeEngineKey() -} - -// UnsafeValue returns the same value as Value, but the memory is invalidated on -// the next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) UnsafeValue() []byte { - return ri.it.UnsafeValue() -} - -// HasPointAndRange returns whether the current position has a point or range -// key. ReplicaEngineDataIterator will never expose both a point key and range -// key on the same position. See struct comment for details. -func (ri *ReplicaEngineDataIterator) HasPointAndRange() (bool, bool) { - return ri.curKeyType == storage.IterKeyTypePointsOnly, - ri.curKeyType == storage.IterKeyTypeRangesOnly -} - -// RangeBounds returns the current range key bounds, but the memory is -// invalidated on the next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) RangeBounds() (roachpb.Span, error) { - return ri.it.EngineRangeBounds() -} - -// RangeKeys returns the current range keys, but the memory is invalidated on the -// next call to {Next,Close}. -func (ri *ReplicaEngineDataIterator) RangeKeys() []storage.EngineRangeKeyValue { - return ri.it.EngineRangeKeys() + return nil } diff --git a/pkg/kv/kvserver/rditer/replica_data_iter_test.go b/pkg/kv/kvserver/rditer/replica_data_iter_test.go index 8eb219e9e614..ffe1bb4ae70d 100644 --- a/pkg/kv/kvserver/rditer/replica_data_iter_test.go +++ b/pkg/kv/kvserver/rditer/replica_data_iter_test.go @@ -203,49 +203,59 @@ func verifyRDReplicatedOnlyMVCCIter( }) } -// verifyRDEngineIter verifies that the ReplicaEngineDataIterator returns the +// verifyIterateReplicaKeySpans verifies that IterateReplicaKeySpans returns the // expected keys in the expected order. The expected keys can be either MVCCKey // or MVCCRangeKey. -func verifyRDEngineIter( +func verifyIterateReplicaKeySpans( t *testing.T, desc *roachpb.RangeDescriptor, eng storage.Engine, replicatedOnly bool, expectedKeys []interface{}, ) { - readWriter := eng.NewReadOnly(storage.StandardDurability) + readWriter := eng.NewSnapshot() defer readWriter.Close() - iter := NewReplicaEngineDataIterator(desc, readWriter, replicatedOnly) - defer iter.Close() actualKeys := []interface{}{} - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - hasPoint, hasRange := iter.HasPointAndRange() - if hasPoint { - key, err := iter.UnsafeKey() - require.NoError(t, err) - require.True(t, key.IsMVCCKey()) - mvccKey, err := key.ToMVCCKey() - require.NoError(t, err) - actualKeys = append(actualKeys, mvccKey.Clone()) - } - if hasRange { - bounds, err := iter.RangeBounds() - require.NoError(t, err) - for _, rk := range iter.RangeKeys() { - ts, err := storage.DecodeMVCCTimestampSuffix(rk.Version) + require.NoError(t, IterateReplicaKeySpans(desc, readWriter, replicatedOnly, + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + // Span should not be empty. + require.NotZero(t, span) + + // Key should be in the given span. + key, err := iter.UnsafeEngineKey() require.NoError(t, err) - actualKeys = append(actualKeys, storage.MVCCRangeKey{ - StartKey: bounds.Key.Clone(), - EndKey: bounds.EndKey.Clone(), - Timestamp: ts, - }) + require.True(t, key.IsMVCCKey()) + require.True(t, span.ContainsKey(key.Key), "%s not in %s", key, span) + + switch keyType { + case storage.IterKeyTypePointsOnly: + mvccKey, err := key.ToMVCCKey() + require.NoError(t, err) + actualKeys = append(actualKeys, mvccKey.Clone()) + + case storage.IterKeyTypeRangesOnly: + bounds, err := iter.EngineRangeBounds() + require.NoError(t, err) + require.True(t, span.Contains(bounds), "%s not contained in %s", bounds, span) + for _, rk := range iter.EngineRangeKeys() { + ts, err := storage.DecodeMVCCTimestampSuffix(rk.Version) + require.NoError(t, err) + actualKeys = append(actualKeys, storage.MVCCRangeKey{ + StartKey: bounds.Key.Clone(), + EndKey: bounds.EndKey.Clone(), + Timestamp: ts, + }) + } + + default: + t.Fatalf("unexpected key type %v", keyType) + } } - } - } - require.NoError(t, err) + return err + })) require.Equal(t, expectedKeys, actualKeys) } @@ -264,8 +274,8 @@ func TestReplicaDataIteratorEmptyRange(t *testing.T) { } verifyRDReplicatedOnlyMVCCIter(t, desc, eng, []storage.MVCCKey{}, []storage.MVCCRangeKey{}) - verifyRDEngineIter(t, desc, eng, false, []interface{}{}) - verifyRDEngineIter(t, desc, eng, true, []interface{}{}) + verifyIterateReplicaKeySpans(t, desc, eng, false, []interface{}{}) + verifyIterateReplicaKeySpans(t, desc, eng, true, []interface{}{}) } // TestReplicaDataIterator creates three ranges (a-b, b-c, c-d) and fills each @@ -311,8 +321,8 @@ func TestReplicaDataIterator(t *testing.T) { t.Run(tc.desc.RSpan().String(), func(t *testing.T) { // Verify the replicated and unreplicated engine contents. - verifyRDEngineIter(t, &tc.desc, eng, false, tc.allKeys) - verifyRDEngineIter(t, &tc.desc, eng, true, tc.replicatedKeys) + verifyIterateReplicaKeySpans(t, &tc.desc, eng, false, tc.allKeys) + verifyIterateReplicaKeySpans(t, &tc.desc, eng, true, tc.replicatedKeys) // Verify the replicated MVCC contents. var pointKeys []storage.MVCCKey @@ -379,11 +389,8 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) { } for _, desc := range descs { t.Run(desc.KeySpan().String(), func(t *testing.T) { - // An iterator should see range keys spanning all relevant key spans. + // Iterators should see range keys spanning all relevant key spans. testutils.RunTrueAndFalse(t, "replicatedOnly", func(t *testing.T, replicatedOnly bool) { - rangeIter := NewReplicaEngineDataIterator(&desc, snapshot, replicatedOnly) - defer rangeIter.Close() - var expectedSpans []roachpb.Span if replicatedOnly { expectedSpans = MakeReplicatedKeySpans(&desc) @@ -392,14 +399,24 @@ func TestReplicaDataIteratorGlobalRangeKey(t *testing.T) { } var actualSpans []roachpb.Span - var ok bool - var err error - for ok, err = rangeIter.SeekStart(); ok && err == nil; ok, err = rangeIter.Next() { - bounds, err := rangeIter.RangeBounds() - require.NoError(t, err) - actualSpans = append(actualSpans, bounds.Clone()) - } - require.NoError(t, err) + require.NoError(t, IterateReplicaKeySpans(&desc, snapshot, replicatedOnly, + func(iter storage.EngineIterator, span roachpb.Span, keyType storage.IterKeyType) error { + // We should never see any point keys. + require.Equal(t, storage.IterKeyTypeRangesOnly, keyType) + + // The iterator should already be positioned on the range key, which should + // span the entire key span and be the only range key. + bounds, err := iter.EngineRangeBounds() + require.NoError(t, err) + require.Equal(t, span, bounds) + actualSpans = append(actualSpans, bounds.Clone()) + + ok, err := iter.NextEngineKey() + require.NoError(t, err) + require.False(t, ok) + + return nil + })) require.Equal(t, expectedSpans, actualSpans) }) }) @@ -490,14 +507,15 @@ func benchReplicaEngineDataIterator(b *testing.B, numRanges, numKeysPerRange, va for i := 0; i < b.N; i++ { for _, desc := range descs { - iter := NewReplicaEngineDataIterator(&desc, snapshot, false /* replicatedOnly */) - defer iter.Close() - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - _, _ = iter.UnsafeKey() - _ = iter.UnsafeValue() - } + err := IterateReplicaKeySpans(&desc, snapshot, false, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, _ storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + _, _ = iter.UnsafeEngineKey() + _ = iter.UnsafeValue() + } + return err + }) if err != nil { require.NoError(b, err) } diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 615e1849f7ed..138d608e76fe 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -1711,44 +1712,42 @@ func getExpectedSnapshotSizeBytes( } defer snap.Close() - totalBytes := int64(0) - var b storage.Batch - defer func() { - b.Close() - }() - b = originStore.Engine().NewUnindexedBatch(true) - iter := snap.Iter - var ok bool - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - hasPoint, hasRange := iter.HasPointAndRange() - if hasPoint { - var unsafeKey storage.EngineKey - if unsafeKey, err = iter.UnsafeKey(); err != nil { - return 0, err - } - if err := b.PutEngineKey(unsafeKey, iter.UnsafeValue()); err != nil { - return 0, err - } - } - if hasRange { - bounds, err := iter.RangeBounds() - if err != nil { - return 0, err - } - for _, rkv := range iter.RangeKeys() { - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return 0, err + b := originStore.Engine().NewUnindexedBatch(true) + defer b.Close() + + err = rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + switch keyType { + case storage.IterKeyTypePointsOnly: + unsafeKey, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + if err := b.PutEngineKey(unsafeKey, iter.UnsafeValue()); err != nil { + return err + } + + case storage.IterKeyTypeRangesOnly: + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) + if err != nil { + return err + } + } + + default: + return errors.Errorf("unexpected key type %v", keyType) } } - } - } - if err != nil { - return 0, err - } - totalBytes += int64(b.Len()) - - return totalBytes, nil + return err + }) + return int64(b.Len()), err } // Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 2aa04a45a7ba..c2312ccab4c3 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -497,10 +497,8 @@ type OutgoingSnapshot struct { SnapUUID uuid.UUID // The Raft snapshot message to send. Contains SnapUUID as its data. RaftSnap raftpb.Snapshot - // The RocksDB snapshot that will be streamed from. + // The Pebble snapshot that will be streamed from. EngineSnap storage.Reader - // The complete range iterator for the snapshot to stream. - Iter *rditer.ReplicaEngineDataIterator // The replica state within the snapshot. State kvserverpb.ReplicaState // Allows access the original Replica's sideloaded storage. Note that @@ -525,7 +523,6 @@ func (s OutgoingSnapshot) SafeFormat(w redact.SafePrinter, _ rune) { // Close releases the resources associated with the snapshot. func (s *OutgoingSnapshot) Close() { - s.Iter.Close() s.EngineSnap.Close() if s.onClose != nil { s.onClose() @@ -598,15 +595,10 @@ func snapshot( return OutgoingSnapshot{}, errors.Wrapf(err, "failed to fetch term of %d", state.RaftAppliedIndex) } - // Intentionally let this iterator and the snapshot escape so that the - // streamer can send chunks from it bit by bit. - iter := rditer.NewReplicaEngineDataIterator(&desc, snap, true /* replicatedOnly */) - return OutgoingSnapshot{ RaftEntryCache: eCache, WithSideloaded: withSideloaded, EngineSnap: snap, - Iter: iter, State: state, SnapUUID: snapUUID, RaftSnap: raftpb.Snapshot{ diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 9a0207b4d31c..7ec4672b0c28 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -7068,38 +7068,36 @@ func TestReplicaDestroy(t *testing.T) { tc.Start(ctx, t, stopper) repl, err := tc.store.GetReplica(1) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) func() { tc.repl.raftMu.Lock() defer tc.repl.raftMu.Unlock() - if _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{ + _, err := tc.store.removeInitializedReplicaRaftMuLocked(ctx, tc.repl, repl.Desc().NextReplicaID, RemoveOptions{ DestroyData: true, - }); err != nil { - t.Fatal(err) - } + }) + require.NoError(t, err) }() engSnapshot := tc.repl.store.Engine().NewSnapshot() defer engSnapshot.Close() - iter := rditer.NewReplicaEngineDataIterator(tc.repl.Desc(), engSnapshot, - false /* replicatedOnly */) - defer iter.Close() - ok, err := iter.SeekStart() - require.NoError(t, err) - require.True(t, ok, "expected a tombstone key, but iterator was empty") - // If the range is destroyed, only a tombstone key should be there. - k1, err := iter.UnsafeKey() - require.NoError(t, err) - require.Equal(t, keys.RangeTombstoneKey(tc.repl.RangeID), k1.Key) - - ok, err = iter.Next() - require.NoError(t, err) - require.False(t, ok, "expected destroyed replica to only have a tombstone key, but found more") + expectedKeys := []roachpb.Key{keys.RangeTombstoneKey(tc.repl.RangeID)} + actualKeys := []roachpb.Key{} + + require.NoError(t, rditer.IterateReplicaKeySpans(tc.repl.Desc(), engSnapshot, false, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + require.Equal(t, storage.IterKeyTypePointsOnly, keyType) + var err error + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + key, err := iter.UnsafeEngineKey() + require.NoError(t, err) + actualKeys = append(actualKeys, key.Key.Clone()) + } + return err + })) + require.Equal(t, expectedKeys, actualKeys) } // TestQuotaPoolReleasedOnFailedProposal tests that the quota acquired by diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index c932ff7c221b..5f057d332400 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -403,11 +403,11 @@ func (kvSS *kvBatchSnapshotStrategy) Send( // bytesSent is updated as key-value batches are sent with sendBatch. It does // not reflect the log entries sent (which are never sent in newer versions of // CRDB, as of VersionUnreplicatedTruncatedState). - bytesSent := int64(0) - - // Iterate over all keys (point keys and range keys) using the provided - // iterator and stream out batches of key-values. + var bytesSent int64 var kvs, rangeKVs int + + // Iterate over all keys (point keys and range keys) and stream out batches of + // key-values. var b storage.Batch defer func() { if b != nil { @@ -427,57 +427,61 @@ func (kvSS *kvBatchSnapshotStrategy) Send( return nil } - iter := snap.Iter - var ok bool - var err error - for ok, err = iter.SeekStart(); ok && err == nil; ok, err = iter.Next() { - // NB: EngineReplicaDataIterator will never expose point/range keys - // together: it first iterates over all point keys in a key span, then over - // all range keys in a key span, then moves onto the next key span. - hasPoint, hasRange := iter.HasPointAndRange() - - if hasPoint { - kvs++ - var unsafeKey storage.EngineKey - if unsafeKey, err = iter.UnsafeKey(); err != nil { - return 0, err - } - unsafeValue := iter.UnsafeValue() - if b == nil { - b = kvSS.newBatch() - } - if err := b.PutEngineKey(unsafeKey, unsafeValue); err != nil { - return 0, err - } - if int64(b.Len()) >= kvSS.batchSize { - if err = flushBatch(); err != nil { - return 0, err - } - } + maybeFlushBatch := func() error { + if int64(b.Len()) >= kvSS.batchSize { + return flushBatch() } + return nil + } - if hasRange { - bounds, err := iter.RangeBounds() - if err != nil { - return 0, err - } - for _, rkv := range iter.RangeKeys() { - rangeKVs++ - if b == nil { - b = kvSS.newBatch() - } - err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) - if err != nil { - return 0, err + err := rditer.IterateReplicaKeySpans(snap.State.Desc, snap.EngineSnap, true, /* replicatedOnly */ + func(iter storage.EngineIterator, _ roachpb.Span, keyType storage.IterKeyType) error { + var err error + switch keyType { + case storage.IterKeyTypePointsOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + kvs++ + if b == nil { + b = kvSS.newBatch() + } + key, err := iter.UnsafeEngineKey() + if err != nil { + return err + } + if err = b.PutEngineKey(key, iter.UnsafeValue()); err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } } - if bLen := int64(b.Len()); bLen >= kvSS.batchSize { - if err = flushBatch(); err != nil { - return 0, err + + case storage.IterKeyTypeRangesOnly: + for ok := true; ok && err == nil; ok, err = iter.NextEngineKey() { + bounds, err := iter.EngineRangeBounds() + if err != nil { + return err + } + for _, rkv := range iter.EngineRangeKeys() { + rangeKVs++ + if b == nil { + b = kvSS.newBatch() + } + err := b.PutEngineRangeKey(bounds.Key, bounds.EndKey, rkv.Version, rkv.Value) + if err != nil { + return err + } + if err = maybeFlushBatch(); err != nil { + return err + } } } + + default: + return errors.AssertionFailedf("unexpected key type %v", keyType) } - } - } + return err + }) if err != nil { return 0, err }