diff --git a/batch.go b/batch.go index f9855d0554..0c204793a5 100644 --- a/batch.go +++ b/batch.go @@ -2056,7 +2056,7 @@ func (b *flushableBatch) newIter(o *IterOptions) internalIterator { } // newFlushIter is part of the flushable interface. -func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { +func (b *flushableBatch) newFlushIter(o *IterOptions) internalIterator { return &flushFlushableBatchIter{ flushableBatchIter: flushableBatchIter{ batch: b, @@ -2065,7 +2065,6 @@ func (b *flushableBatch) newFlushIter(o *IterOptions, bytesFlushed *uint64) inte cmp: b.cmp, index: -1, }, - bytesIterated: bytesFlushed, } } @@ -2327,7 +2326,6 @@ func (i *flushableBatchIter) SetContext(_ context.Context) {} // of number of bytes iterated. type flushFlushableBatchIter struct { flushableBatchIter - bytesIterated *uint64 } // flushFlushableBatchIter implements the base.InternalIterator interface. @@ -2353,13 +2351,7 @@ func (i *flushFlushableBatchIter) SeekLT(key []byte, flags base.SeekLTFlags) *ba func (i *flushFlushableBatchIter) First() *base.InternalKV { i.err = nil // clear cached iteration error - kv := i.flushableBatchIter.First() - if kv == nil { - return nil - } - entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset - *i.bytesIterated += uint64(entryBytes) + i.valueSize() - return kv + return i.flushableBatchIter.First() } func (i *flushFlushableBatchIter) NextPrefix(succKey []byte) *base.InternalKV { @@ -2376,41 +2368,13 @@ func (i *flushFlushableBatchIter) Next() *base.InternalKV { if i.index == len(i.offsets) { return nil } - kv := i.getKV(i.index) - entryBytes := i.offsets[i.index].keyEnd - i.offsets[i.index].offset - *i.bytesIterated += uint64(entryBytes) + i.valueSize() - return kv + return i.getKV(i.index) } func (i flushFlushableBatchIter) Prev() *base.InternalKV { panic("pebble: Prev unimplemented") } -func (i flushFlushableBatchIter) valueSize() uint64 { - p := i.data[i.offsets[i.index].offset:] - if len(p) == 0 { - i.err = base.CorruptionErrorf("corrupted batch") - return 0 - } - kind := InternalKeyKind(p[0]) - if kind > InternalKeyKindMax { - i.err = base.CorruptionErrorf("corrupted batch") - return 0 - } - var length uint64 - switch kind { - case InternalKeyKindSet, InternalKeyKindMerge, InternalKeyKindRangeDelete: - keyEnd := i.offsets[i.index].keyEnd - v, n := binary.Uvarint(i.data[keyEnd:]) - if n <= 0 { - i.err = base.CorruptionErrorf("corrupted batch") - return 0 - } - length = v + uint64(n) - } - return length -} - // batchOptions holds the parameters to configure batch. type batchOptions struct { initialSizeBytes int diff --git a/batch_test.go b/batch_test.go index 82985a673f..8a427fbea2 100644 --- a/batch_test.go +++ b/batch_test.go @@ -1279,34 +1279,6 @@ func scanKeyspanIterator(w io.Writer, ki keyspan.FragmentIterator) { } } -func TestFlushableBatchBytesIterated(t *testing.T) { - batch := newBatch(nil) - for j := 0; j < 1000; j++ { - key := make([]byte, 8+j%3) - value := make([]byte, 7+j%5) - batch.Set(key, value, nil) - - fb, err := newFlushableBatch(batch, DefaultComparer) - require.NoError(t, err) - - var bytesIterated uint64 - it := fb.newFlushIter(nil, &bytesIterated) - - var prevIterated uint64 - for kv := it.First(); kv != nil; kv = it.Next() { - if bytesIterated < prevIterated { - t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) - } - prevIterated = bytesIterated - } - - expected := fb.inuseBytes() - if bytesIterated != expected { - t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) - } - } -} - func TestEmptyFlushableBatch(t *testing.T) { // Verify that we can create a flushable batch on an empty batch. fb, err := newFlushableBatch(newBatch(nil), DefaultComparer) diff --git a/compaction.go b/compaction.go index b2125d8056..ed10007663 100644 --- a/compaction.go +++ b/compaction.go @@ -295,8 +295,6 @@ type compaction struct { // flushing contains the flushables (aka memtables) that are being flushed. flushing flushableList - // bytesIterated contains the number of bytes that have been flushed/compacted. - bytesIterated uint64 // bytesWritten contains the number of bytes that have been written to outputs. bytesWritten int64 @@ -942,7 +940,7 @@ func (c *compaction) newInputIter( // stored in c.flushing. for i := range c.flushing { f := c.flushing[i] - iters = append(iters, f.newFlushIter(nil, &c.bytesIterated)) + iters = append(iters, f.newFlushIter(nil)) rangeDelIter := f.newRangeDelIter(nil) if rangeDelIter != nil { rangeDelIters = append(rangeDelIters, rangeDelIter) @@ -959,8 +957,8 @@ func (c *compaction) newInputIter( // deletions to compactions are handled below. iters = append(iters, newLevelIter(context.Background(), iterOpts, c.comparer, newIters, level.files.Iter(), l, internalIterOpts{ - bytesIterated: &c.bytesIterated, - bufferPool: &c.bufferPool, + compaction: true, + bufferPool: &c.bufferPool, })) // TODO(jackson): Use keyspanimpl.LevelIter to avoid loading all the range // deletions into memory upfront. (See #2015, which reverted this.) There @@ -1000,7 +998,7 @@ func (c *compaction) newInputIter( iter := level.files.Iter() for f := iter.First(); f != nil; f = iter.Next() { rangeDelIter, closer, err := c.newRangeDelIter( - newIters, iter.Take(), iterOpts, l, &c.bytesIterated) + newIters, iter.Take(), iterOpts, l) if err != nil { // The error will already be annotated with the BackingFileNum, so // we annotate it with the FileNum. @@ -1113,17 +1111,13 @@ func (c *compaction) newInputIter( } func (c *compaction) newRangeDelIter( - newIters tableNewIters, - f manifest.LevelFile, - opts IterOptions, - l manifest.Level, - bytesIterated *uint64, + newIters tableNewIters, f manifest.LevelFile, opts IterOptions, l manifest.Level, ) (keyspan.FragmentIterator, io.Closer, error) { opts.level = l - iterSet, err := newIters(context.Background(), f.FileMetadata, - &opts, internalIterOpts{ - bytesIterated: &c.bytesIterated, - bufferPool: &c.bufferPool, + iterSet, err := newIters(context.Background(), f.FileMetadata, &opts, + internalIterOpts{ + compaction: true, + bufferPool: &c.bufferPool, }, iterRangeDeletions) if err != nil { return nil, nil, err @@ -1767,8 +1761,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { d.mu.versions.logUnlock() } - bytesFlushed = c.bytesIterated - // If err != nil, then the flush will be retried, and we will recalculate // these metrics. if err == nil { @@ -1820,7 +1812,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { close(flushed[i].flushed) } - return bytesFlushed, err + return inputBytes, err } // maybeTransitionSnapshotsToFileOnlyLocked transitions any "eventually diff --git a/flushable.go b/flushable.go index f243564d19..6f5aa49874 100644 --- a/flushable.go +++ b/flushable.go @@ -22,7 +22,7 @@ import ( // flushable defines the interface for immutable memtables. type flushable interface { newIter(o *IterOptions) internalIterator - newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator + newFlushIter(o *IterOptions) internalIterator newRangeDelIter(o *IterOptions) keyspan.FragmentIterator newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator containsRangeKeys() bool @@ -226,7 +226,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator { } // newFlushIter is part of the flushable interface. -func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { +func (s *ingestedFlushable) newFlushIter(*IterOptions) internalIterator { // newFlushIter is only used for writing memtables to disk as sstables. // Since ingested sstables are already present on disk, they don't need to // make use of a flush iter. diff --git a/internal/arenaskl/flush_iterator.go b/internal/arenaskl/flush_iterator.go index aa98c40e85..20baa0d32e 100644 --- a/internal/arenaskl/flush_iterator.go +++ b/internal/arenaskl/flush_iterator.go @@ -24,7 +24,6 @@ import "github.com/cockroachdb/pebble/internal/base" // simply value copying the struct. type flushIterator struct { Iterator - bytesIterated *uint64 } // flushIterator implements the base.InternalIterator interface. @@ -51,12 +50,7 @@ func (it *flushIterator) SeekLT(key []byte, flags base.SeekLTFlags) *base.Intern // that First only checks the upper bound. It is up to the caller to ensure // that key is greater than or equal to the lower bound. func (it *flushIterator) First() *base.InternalKV { - kv := it.Iterator.First() - if kv == nil { - return nil - } - *it.bytesIterated += uint64(it.nd.allocSize) - return kv + return it.Iterator.First() } // Next advances to the next position. Returns the key and value if the @@ -69,7 +63,6 @@ func (it *flushIterator) Next() *base.InternalKV { return nil } it.decodeKey() - *it.bytesIterated += uint64(it.nd.allocSize) it.kv.V = base.MakeInPlaceValue(it.value()) return &it.kv } diff --git a/internal/arenaskl/skl.go b/internal/arenaskl/skl.go index ef1ebfcf15..7f746fef38 100644 --- a/internal/arenaskl/skl.go +++ b/internal/arenaskl/skl.go @@ -307,10 +307,9 @@ func (s *Skiplist) NewIter(lower, upper []byte) *Iterator { // NewFlushIter returns a new flushIterator, which is similar to an Iterator // but also sets the current number of the bytes that have been iterated // through. -func (s *Skiplist) NewFlushIter(bytesFlushed *uint64) base.InternalIterator { +func (s *Skiplist) NewFlushIter() base.InternalIterator { return &flushIterator{ - Iterator: Iterator{list: s, nd: s.head}, - bytesIterated: bytesFlushed, + Iterator: Iterator{list: s, nd: s.head}, } } diff --git a/internal/arenaskl/skl_test.go b/internal/arenaskl/skl_test.go index f858a22c39..3357b25a4d 100644 --- a/internal/arenaskl/skl_test.go +++ b/internal/arenaskl/skl_test.go @@ -769,35 +769,6 @@ func TestIteratorBounds(t *testing.T) { require.False(t, it.Prev()) } -func TestBytesIterated(t *testing.T) { - l := NewSkiplist(newArena(arenaSize), bytes.Compare) - emptySize := l.arena.Size() - for i := 0; i < 200; i++ { - bytesIterated := l.bytesIterated(t) - expected := uint64(l.arena.Size() - emptySize) - if bytesIterated != expected { - t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) - } - l.Add(base.InternalKey{UserKey: []byte{byte(i)}}, nil) - } -} - -// bytesIterated returns the number of bytes iterated in the skiplist. -func (s *Skiplist) bytesIterated(t *testing.T) (bytesIterated uint64) { - x := s.NewFlushIter(&bytesIterated) - var prevIterated uint64 - for kv := x.First(); kv != nil; kv = x.Next() { - if bytesIterated < prevIterated { - t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) - } - prevIterated = bytesIterated - } - if x.Close() != nil { - return 0 - } - return bytesIterated -} - func randomKey(rng *rand.Rand, b []byte) base.InternalKey { key := rng.Uint32() key2 := rng.Uint32() diff --git a/level_iter.go b/level_iter.go index 057cae8f25..783d720fed 100644 --- a/level_iter.go +++ b/level_iter.go @@ -17,7 +17,10 @@ import ( ) type internalIterOpts struct { - bytesIterated *uint64 + // if compaction is set, sstable-level iterators will be created using + // NewCompactionIter; these iterators have a more constrained interface + // and are optimized for the sequential scan of a compaction. + compaction bool bufferPool *sstable.BufferPool stats *base.InternalIteratorStats boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter diff --git a/mem_table.go b/mem_table.go index aa55ca71b9..a91d8a6517 100644 --- a/mem_table.go +++ b/mem_table.go @@ -260,8 +260,8 @@ func (m *memTable) newIter(o *IterOptions) internalIterator { } // newFlushIter is part of the flushable interface. -func (m *memTable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator { - return m.skl.NewFlushIter(bytesFlushed) +func (m *memTable) newFlushIter(o *IterOptions) internalIterator { + return m.skl.NewFlushIter() } // newRangeDelIter is part of the flushable interface. diff --git a/mem_table_test.go b/mem_table_test.go index 41b066a14c..1b78edefbd 100644 --- a/mem_table_test.go +++ b/mem_table_test.go @@ -78,22 +78,6 @@ func (m *memTable) count() (n int) { return n } -// bytesIterated returns the number of bytes iterated in a DB. -func (m *memTable) bytesIterated(t *testing.T) (bytesIterated uint64) { - x := m.newFlushIter(nil, &bytesIterated) - var prevIterated uint64 - for kv := x.First(); kv != nil; kv = x.Next() { - if bytesIterated < prevIterated { - t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) - } - prevIterated = bytesIterated - } - if x.Close() != nil { - return 0 - } - return bytesIterated -} - func ikey(s string) InternalKey { return base.MakeInternalKey([]byte(s), 0, InternalKeyKindSet) } @@ -158,18 +142,6 @@ func TestMemTableCount(t *testing.T) { } } -func TestMemTableBytesIterated(t *testing.T) { - m := newMemTable(memTableOptions{}) - for i := 0; i < 200; i++ { - bytesIterated := m.bytesIterated(t) - expected := m.inuseBytes() - if bytesIterated != expected { - t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) - } - m.set(InternalKey{UserKey: []byte{byte(i)}}, nil) - } -} - func TestMemTableEmpty(t *testing.T) { m := newMemTable(memTableOptions{}) if !m.empty() { diff --git a/sstable/reader.go b/sstable/reader.go index 009cbef05f..d88f304c06 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -388,18 +388,16 @@ func (r *Reader) NewIter(transforms IterTransforms, lower, upper []byte) (Iterat // after itself and returns a nil iterator. func (r *Reader) NewCompactionIter( transforms IterTransforms, - bytesIterated *uint64, categoryAndQoS CategoryAndQoS, statsCollector *CategoryStatsCollector, rp ReaderProvider, bufferPool *BufferPool, ) (Iterator, error) { - return r.newCompactionIter(transforms, bytesIterated, categoryAndQoS, statsCollector, rp, nil, bufferPool) + return r.newCompactionIter(transforms, categoryAndQoS, statsCollector, rp, nil, bufferPool) } func (r *Reader) newCompactionIter( transforms IterTransforms, - bytesIterated *uint64, categoryAndQoS CategoryAndQoS, statsCollector *CategoryStatsCollector, rp ReaderProvider, @@ -420,10 +418,7 @@ func (r *Reader) newCompactionIter( return nil, err } i.setupForCompaction() - return &twoLevelCompactionIterator{ - twoLevelIterator: i, - bytesIterated: bytesIterated, - }, nil + return &twoLevelCompactionIterator{twoLevelIterator: i}, nil } i := singleLevelIterPool.Get().(*singleLevelIterator) err := i.init( @@ -434,10 +429,7 @@ func (r *Reader) newCompactionIter( return nil, err } i.setupForCompaction() - return &compactionIterator{ - singleLevelIterator: i, - bytesIterated: bytesIterated, - }, nil + return &compactionIterator{singleLevelIterator: i}, nil } // NewRawRangeDelIter returns an internal iterator for the contents of the diff --git a/sstable/reader_common.go b/sstable/reader_common.go index 3e80297904..e71d565bc2 100644 --- a/sstable/reader_common.go +++ b/sstable/reader_common.go @@ -33,7 +33,6 @@ type CommonReader interface { NewCompactionIter( transforms IterTransforms, - bytesIterated *uint64, categoryAndQoS CategoryAndQoS, statsCollector *CategoryStatsCollector, rp ReaderProvider, diff --git a/sstable/reader_iter.go b/sstable/reader_iter.go index edf6ca0b24..d552d71499 100644 --- a/sstable/reader_iter.go +++ b/sstable/reader_iter.go @@ -175,8 +175,6 @@ func checkRangeKeyFragmentBlockIterator(obj interface{}) { // bytes that have been iterated through. type compactionIterator struct { *singleLevelIterator - bytesIterated *uint64 - prevOffset uint64 } // compactionIterator implements the base.InternalIterator interface. @@ -259,10 +257,6 @@ func (i *compactionIterator) skipForward(kv *base.InternalKV) *base.InternalKV { } } - curOffset := i.recordOffset() - *i.bytesIterated += uint64(curOffset - i.prevOffset) - i.prevOffset = curOffset - // We have an upper bound when the table is virtual. if i.upper != nil && kv != nil { cmp := i.cmp(kv.K.UserKey, i.upper) diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index d83eed43ca..3c0a7b31e6 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -594,24 +594,6 @@ func (i *singleLevelIterator) trySeekLTUsingPrevWithinBlock( return kv, false } -func (i *singleLevelIterator) recordOffset() uint64 { - offset := i.dataBH.Offset - if i.data.valid() { - // - i.dataBH.Length/len(i.data.data) is the compression ratio. If - // uncompressed, this is 1. - // - i.data.nextOffset is the uncompressed position of the current record - // in the block. - // - i.dataBH.Offset is the offset of the block in the sstable before - // decompression. - offset += (uint64(i.data.nextOffset) * i.dataBH.Length) / uint64(len(i.data.data)) - } else { - // Last entry in the block must increment bytes iterated by the size of the block trailer - // and restart points. - offset += i.dataBH.Length + blockTrailerLen - } - return offset -} - // SeekGE implements internalIterator.SeekGE, as documented in the pebble // package. Note that SeekGE only checks the upper bound. It is up to the // caller to ensure that key is greater than or equal to the lower bound. diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 35d1aa9734..f475fa036c 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -995,8 +995,6 @@ func (i *twoLevelIterator) Close() error { // were separated due to performance. type twoLevelCompactionIterator struct { *twoLevelIterator - bytesIterated *uint64 - prevOffset uint64 } // twoLevelCompactionIterator implements the base.InternalIterator interface. @@ -1083,10 +1081,6 @@ func (i *twoLevelCompactionIterator) skipForward(kv *base.InternalKV) *base.Inte } } - curOffset := i.recordOffset() - *i.bytesIterated += uint64(curOffset - i.prevOffset) - i.prevOffset = curOffset - // We have an upper bound when the table is virtual. if i.upper != nil && kv != nil { cmp := i.cmp(kv.K.UserKey, i.upper) diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 373503c60d..817ba35d77 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -325,8 +325,7 @@ func runVirtualReaderTest(t *testing.T, path string, blockSize, indexBlockSize i } var rp ReaderProvider - var bytesIterated uint64 - iter, err := v.NewCompactionIter(transforms, &bytesIterated, CategoryAndQoS{}, nil, rp, &bp) + iter, err := v.NewCompactionIter(transforms, CategoryAndQoS{}, nil, rp, &bp) if err != nil { return err.Error() } @@ -940,65 +939,6 @@ func checkValidPrefix(prefix, key []byte) bool { return prefix == nil || bytes.HasPrefix(key, prefix) } -func testBytesIteratedWithCompression( - t *testing.T, - compression Compression, - allowedSizeDeviationPercent uint64, - blockSizes []int, - maxNumEntries []uint64, -) { - for i, blockSize := range blockSizes { - for _, indexBlockSize := range blockSizes { - for _, numEntries := range []uint64{0, 1, maxNumEntries[i]} { - r := buildTestTable(t, numEntries, blockSize, indexBlockSize, compression, nil) - var bytesIterated, prevIterated uint64 - var pool BufferPool - pool.Init(5) - citer, err := r.NewCompactionIter( - NoTransforms, &bytesIterated, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool) - require.NoError(t, err) - - for kv := citer.First(); kv != nil; kv = citer.Next() { - if bytesIterated < prevIterated { - t.Fatalf("bytesIterated moved backward: %d < %d", bytesIterated, prevIterated) - } - prevIterated = bytesIterated - } - - expected := r.Properties.DataSize - allowedSizeDeviation := expected * allowedSizeDeviationPercent / 100 - // There is some inaccuracy due to compression estimation. - if bytesIterated < expected-allowedSizeDeviation || bytesIterated > expected+allowedSizeDeviation { - t.Fatalf("bytesIterated: got %d, want %d", bytesIterated, expected) - } - - require.NoError(t, citer.Close()) - require.NoError(t, r.Close()) - pool.Release() - } - } - } -} - -func TestBytesIterated(t *testing.T) { - blockSizes := []int{10, 100, 1000, 4096, math.MaxInt32} - t.Run("Compressed", func(t *testing.T) { - testBytesIteratedWithCompression(t, SnappyCompression, 1, blockSizes, []uint64{1e5, 1e5, 1e5, 1e5, 1e5}) - }) - t.Run("Uncompressed", func(t *testing.T) { - testBytesIteratedWithCompression(t, NoCompression, 0, blockSizes, []uint64{1e5, 1e5, 1e5, 1e5, 1e5}) - }) - t.Run("Zstd", func(t *testing.T) { - // compression with zstd is extremely slow with small block size (esp the nocgo version). - // use less numEntries to make the test run at reasonable speed (under 10 seconds). - maxNumEntries := []uint64{1e2, 1e2, 1e3, 4e3, 1e5} - if useStandardZstdLib { - maxNumEntries = []uint64{1e3, 1e3, 1e4, 4e4, 1e5} - } - testBytesIteratedWithCompression(t, ZstdCompression, 1, blockSizes, maxNumEntries) - }) -} - func TestCompactionIteratorSetupForCompaction(t *testing.T) { tmpDir := path.Join(t.TempDir()) provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.Default, tmpDir)) @@ -1009,11 +949,10 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { for _, indexBlockSize := range blockSizes { for _, numEntries := range []uint64{0, 1, 1e5} { r := buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, DefaultCompression, nil) - var bytesIterated uint64 var pool BufferPool pool.Init(5) citer, err := r.NewCompactionIter( - NoTransforms, &bytesIterated, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool) + NoTransforms, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) switch i := citer.(type) { case *compactionIterator: @@ -1069,7 +1008,7 @@ func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) { var pool BufferPool pool.Init(5) citer, err := r.NewCompactionIter( - NoTransforms, nil, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool) + NoTransforms, CategoryAndQoS{}, nil, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) defer citer.Close() i := citer.(*compactionIterator) @@ -1760,19 +1699,6 @@ func TestReader_TableFormat(t *testing.T) { } } -func buildTestTable( - t *testing.T, - numEntries uint64, - blockSize, indexBlockSize int, - compression Compression, - prefix []byte, -) *Reader { - provider, err := objstorageprovider.Open(objstorageprovider.DefaultSettings(vfs.NewMem(), "" /* dirName */)) - require.NoError(t, err) - defer provider.Close() - return buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, compression, prefix) -} - func buildTestTableWithProvider( t *testing.T, provider objstorage.Provider, diff --git a/sstable/reader_virtual.go b/sstable/reader_virtual.go index 3869fffded..29649119c8 100644 --- a/sstable/reader_virtual.go +++ b/sstable/reader_virtual.go @@ -91,14 +91,13 @@ func MakeVirtualReader(reader *Reader, p VirtualReaderParams) VirtualReader { // NewCompactionIter is the compaction iterator function for virtual readers. func (v *VirtualReader) NewCompactionIter( transforms IterTransforms, - bytesIterated *uint64, categoryAndQoS CategoryAndQoS, statsCollector *CategoryStatsCollector, rp ReaderProvider, bufferPool *BufferPool, ) (Iterator, error) { return v.reader.newCompactionIter( - transforms, bytesIterated, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) + transforms, categoryAndQoS, statsCollector, rp, &v.vState, bufferPool) } // NewIterWithBlockPropertyFiltersAndContextEtc wraps diff --git a/table_cache.go b/table_cache.go index c069941dd1..61e8e6dc87 100644 --- a/table_cache.go +++ b/table_cache.go @@ -601,9 +601,9 @@ func (c *tableCacheShard) newPointIter( if opts != nil { categoryAndQoS = opts.CategoryAndQoS } - if internalOpts.bytesIterated != nil { + if internalOpts.compaction { iter, err = cr.NewCompactionIter( - transforms, internalOpts.bytesIterated, categoryAndQoS, dbOpts.sstStatsCollector, rp, + transforms, categoryAndQoS, dbOpts.sstStatsCollector, rp, internalOpts.bufferPool) } else { iter, err = cr.NewIterWithBlockPropertyFiltersAndContextEtc(