diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index fa95bdcaa7d4..ed679e796f12 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -416,7 +416,7 @@ var ( Unit: metric.Unit_COUNT, } - // RocksDB/Pebble metrics. + // Pebble metrics. metaRdbBlockCacheHits = metric.Metadata{ Name: "rocksdb.block.cache.hits", Help: "Count of block cache hits", @@ -616,6 +616,60 @@ Raft applied index at which this checkpoint was taken.`, Unit: metric.Unit_COUNT, } + metaBlockBytes = metric.Metadata{ + Name: "storage.iterator.block-load.bytes", + Help: "Bytes loaded by storage engine iterators (possibly cached). See storage.AggregatedIteratorStats for details.", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaBlockBytesInCache = metric.Metadata{ + Name: "storage.iterator.block-load.cached-bytes", + Help: "Bytes loaded by storage engine iterators from the block cache. See storage.AggregatedIteratorStats for details.", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaBlockReadDuration = metric.Metadata{ + Name: "storage.iterator.block-load.read-duration", + Help: "Cumulative time storage engine iterators spent loading blocks from durable storage. See storage.AggregatedIteratorStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaIterExternalSeeks = metric.Metadata{ + Name: "storage.iterator.external.seeks", + Help: "Cumulative count of seeks performed on storage engine iterators. See storage.AggregatedIteratorStats for details.", + Measurement: "Iterator Ops", + Unit: metric.Unit_COUNT, + } + metaIterExternalSteps = metric.Metadata{ + Name: "storage.iterator.external.steps", + Help: "Cumulative count of steps performed on storage engine iterators. See storage.AggregatedIteratorStats for details.", + Measurement: "Iterator Ops", + Unit: metric.Unit_COUNT, + } + metaIterInternalSeeks = metric.Metadata{ + Name: "storage.iterator.internal.seeks", + Help: `Cumulative count of seeks performed internally within storage engine iterators. + +A value high relative to 'storage.iterator.external.seeks' +is a good indication that there's an accumulation of garbage +internally within the storage engine. + +See storage.AggregatedIteratorStats for details.`, + Measurement: "Iterator Ops", + Unit: metric.Unit_COUNT, + } + metaIterInternalSteps = metric.Metadata{ + Name: "storage.iterator.internal.steps", + Help: `Cumulative count of steps performed internally within storage engine iterators. + +A value high relative to 'storage.iterator.external.steps' +is a good indication that there's an accumulation of garbage +internally within the storage engine. + +See storage.AggregatedIteratorStats for more details.`, + Measurement: "Iterator Ops", + Unit: metric.Unit_COUNT, + } metaSharedStorageBytesWritten = metric.Metadata{ Name: "storage.shared-storage.write", Help: "Bytes written to external storage", @@ -1843,9 +1897,17 @@ type StoreMetrics struct { // before pebble, and this name is kept for backwards compatibility despite // the backing metrics now originating from pebble. // - // All of these are cumulative values. They are maintained by pebble and + // All of these are cumulative values. Most are maintained by pebble and // so we have to expose them as gauges (lest we start tracking deltas from // the respective last stats we got from pebble). + // + // There's a bit of a semantic mismatch here because the mechanism of + // updating these metrics is a gauge (eg, we're reading the current value, + // not incrementing) but semantically some of them are monotonically + // increasing counters. + // + // TODO(jackson): Reconcile this mismatch so that metrics that are + // semantically counters are exported as such to Prometheus. See #99922. RdbBlockCacheHits *metric.Gauge RdbBlockCacheMisses *metric.Gauge RdbBlockCacheUsage *metric.Gauge @@ -1876,6 +1938,13 @@ type StoreMetrics struct { RdbWriteStallNanos *metric.Gauge SharedStorageBytesRead *metric.Gauge SharedStorageBytesWritten *metric.Gauge + IterBlockBytes *metric.Gauge + IterBlockBytesInCache *metric.Gauge + IterBlockReadDuration *metric.Gauge + IterExternalSeeks *metric.Gauge + IterExternalSteps *metric.Gauge + IterInternalSeeks *metric.Gauge + IterInternalSteps *metric.Gauge FlushableIngestCount *metric.Gauge FlushableIngestTableCount *metric.Gauge FlushableIngestTableSize *metric.Gauge @@ -2393,7 +2462,16 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ReadWithinUncertaintyIntervalErrorServerSideRetrySuccess: metric.NewCounter(metaReadWithinUncertaintyIntervalErrorServerSideRetrySuccess), ReadWithinUncertaintyIntervalErrorServerSideRetryFailure: metric.NewCounter(metaReadWithinUncertaintyIntervalErrorServerSideRetryFailure), - // RocksDB/Pebble metrics. + // Pebble metrics. + // + // These are all gauges today because almost all of them are cumulative + // metrics recorded within Pebble. Internally within Pebble some of + // these are monotonically increasing counters. There's a bit of a + // semantic mismatch here because the mechanism of updating the metric + // is a gauge (eg, we're reading the current value, not incrementing) + // but the meaning of the metric itself is a counter. + // TODO(jackson): Reconcile this mismatch so that metrics that are + // semantically counters are exported as such to Prometheus. See #99922. RdbBlockCacheHits: metric.NewGauge(metaRdbBlockCacheHits), RdbBlockCacheMisses: metric.NewGauge(metaRdbBlockCacheMisses), RdbBlockCacheUsage: metric.NewGauge(metaRdbBlockCacheUsage), @@ -2422,6 +2500,13 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RdbLevelScore: rdbLevelScore, RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls), RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos), + IterBlockBytes: metric.NewGauge(metaBlockBytes), + IterBlockBytesInCache: metric.NewGauge(metaBlockBytesInCache), + IterBlockReadDuration: metric.NewGauge(metaBlockReadDuration), + IterExternalSeeks: metric.NewGauge(metaIterExternalSeeks), + IterExternalSteps: metric.NewGauge(metaIterExternalSteps), + IterInternalSeeks: metric.NewGauge(metaIterInternalSeeks), + IterInternalSteps: metric.NewGauge(metaIterInternalSteps), SharedStorageBytesRead: metric.NewGauge(metaSharedStorageBytesRead), SharedStorageBytesWritten: metric.NewGauge(metaSharedStorageBytesWritten), FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount), @@ -2755,6 +2840,13 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.RdbWriteStallNanos.Update(m.WriteStallDuration.Nanoseconds()) sm.DiskSlow.Update(m.DiskSlowCount) sm.DiskStalled.Update(m.DiskStallCount) + sm.IterBlockBytes.Update(int64(m.Iterator.BlockBytes)) + sm.IterBlockBytesInCache.Update(int64(m.Iterator.BlockBytesInCache)) + sm.IterBlockReadDuration.Update(int64(m.Iterator.BlockReadDuration)) + sm.IterExternalSeeks.Update(int64(m.Iterator.ExternalSeeks)) + sm.IterExternalSteps.Update(int64(m.Iterator.ExternalSteps)) + sm.IterInternalSeeks.Update(int64(m.Iterator.InternalSeeks)) + sm.IterInternalSteps.Update(int64(m.Iterator.InternalSteps)) sm.SharedStorageBytesRead.Update(m.SharedStorageReadBytes) sm.SharedStorageBytesWritten.Update(m.SharedStorageWriteBytes) sm.RdbL0Sublevels.Update(int64(m.Levels[0].Sublevels)) diff --git a/pkg/kv/kvserver/spanset/BUILD.bazel b/pkg/kv/kvserver/spanset/BUILD.bazel index 3451ad4a0d1b..5e7ecff39184 100644 --- a/pkg/kv/kvserver/spanset/BUILD.bazel +++ b/pkg/kv/kvserver/spanset/BUILD.bazel @@ -14,7 +14,6 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/storage", - "//pkg/storage/pebbleiter", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/protoutil", diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 704023e1fd54..805fef2bcd8d 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/storage/pebbleiter" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -421,9 +420,9 @@ func (i *EngineIterator) UnsafeRawEngineKey() []byte { return i.i.UnsafeRawEngineKey() } -// GetRawIter is part of the storage.EngineIterator interface. -func (i *EngineIterator) GetRawIter() pebbleiter.Iterator { - return i.i.GetRawIter() +// CloneContext is part of the storage.EngineIterator interface. +func (i *EngineIterator) CloneContext() storage.CloneContext { + return i.i.CloneContext() } // Stats is part of the storage.EngineIterator interface. diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index 110ef6ff1394..d0f7fbbdca4c 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -223,11 +223,11 @@ func BenchmarkMVCCGet_Pebble(b *testing.B) { ctx := context.Background() for _, batch := range []bool{false, true} { b.Run(fmt.Sprintf("batch=%t", batch), func(b *testing.B) { - for _, numVersions := range []int{1, 10, 100} { + for _, numVersions := range []int{10} { b.Run(fmt.Sprintf("versions=%d", numVersions), func(b *testing.B) { for _, valueSize := range []int{8} { b.Run(fmt.Sprintf("valueSize=%d", valueSize), func(b *testing.B) { - for _, numRangeKeys := range []int{0, 1, 100} { + for _, numRangeKeys := range []int{0} { b.Run(fmt.Sprintf("numRangeKeys=%d", numRangeKeys), func(b *testing.B) { runMVCCGet(ctx, b, mvccBenchData{ numVersions: numVersions, diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 464b1103191c..3d9bee4ac5c6 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -361,9 +361,9 @@ type EngineIterator interface { // Value returns the current value as a byte slice. // REQUIRES: latest positioning function returned valid=true. Value() ([]byte, error) - // GetRawIter is a low-level method only for use in the storage package, - // that returns the underlying pebble Iterator. - GetRawIter() pebbleiter.Iterator + // CloneContext is a low-level method only for use in the storage package, + // that provides sufficient context that the iterator may be cloned. + CloneContext() CloneContext // SeekEngineKeyGEWithLimit is similar to SeekEngineKeyGE, but takes an // additional exclusive upper limit parameter. The limit is semantically // best-effort, and is an optimization to avoid O(n^2) iteration behavior in @@ -388,6 +388,13 @@ type EngineIterator interface { Stats() IteratorStats } +// CloneContext is an opaque type encapsulating sufficient context to construct +// a clone of an existing iterator. +type CloneContext struct { + rawIter pebbleiter.Iterator + statsReporter statsReporter +} + // IterOptions contains options used to create an {MVCC,Engine}Iterator. // // For performance, every {MVCC,Engine}Iterator must specify either Prefix or @@ -462,6 +469,7 @@ type IterOptions struct { // Range keys themselves are not affected by the masking, and will be // emitted as normal. RangeKeyMaskingBelow hlc.Timestamp + // useL6Filters allows the caller to opt into reading filter blocks for // L6 sstables. Only for use with Prefix = true. Helpful if a lot of prefix // Seeks are expected in quick succession, that are also likely to not @@ -1013,15 +1021,7 @@ type WriteBatch interface { // *pebble.Metrics struct, which has its own documentation. type Metrics struct { *pebble.Metrics - // WriteStallCount counts the number of times Pebble intentionally delayed - // incoming writes. Currently, the only two reasons for this to happen are: - // - "memtable count limit reached" - // - "L0 file count limit exceeded" - // - // We do not split this metric across these two reasons, but they can be - // distinguished in the pebble logs. - WriteStallCount int64 - WriteStallDuration time.Duration + Iterator AggregatedIteratorStats // DiskSlowCount counts the number of times Pebble records disk slowness. DiskSlowCount int64 // DiskStallCount counts the number of times Pebble observes slow writes @@ -1031,6 +1031,56 @@ type Metrics struct { SharedStorageWriteBytes int64 // SharedStorageReadBytes counts the number of bytes read from shared storage. SharedStorageReadBytes int64 + // WriteStallCount counts the number of times Pebble intentionally delayed + // incoming writes. Currently, the only two reasons for this to happen are: + // - "memtable count limit reached" + // - "L0 file count limit exceeded" + // + // We do not split this metric across these two reasons, but they can be + // distinguished in the pebble logs. + WriteStallCount int64 + WriteStallDuration time.Duration +} + +// AggregatedIteratorStats holds cumulative stats, collected and summed over all +// of an engine's iterators. +type AggregatedIteratorStats struct { + // BlockBytes holds the sum of sizes of all loaded blocks. If the block was + // compressed, this is the compressed bytes. This value includes blocks that + // were loaded from the cache, and bytes that needed to be read from + // persistent storage. + // + // Currently, there may be some gaps in coverage. (At the time of writing, + // 2nd-level index blocks are excluded.) + BlockBytes uint64 + // BlockBytesInCache holds the subset of BlockBytes that were already in the + // block cache, requiring no I/O. + BlockBytesInCache uint64 + // BlockReadDuration accumulates the duration spent fetching blocks due to + // block cache misses. + // + // Currently, there may be some gaps in coverage. (At the time of writing, + // range deletion and range key blocks, meta index blocks and properties + // blocks are all excluded.) + BlockReadDuration time.Duration + // ExternalSeeks is the total count of seeks in forward and backward + // directions performed on pebble.Iterators. + ExternalSeeks int + // ExternalSteps is the total count of relative positioning operations (eg, + // Nexts, Prevs, NextPrefix, NextWithLimit, etc) in forward and backward + // directions performed on pebble.Iterators. + ExternalSteps int + // InternalSeeks is the total count of steps in forward and backward + // directions performed on Pebble's internal iterator. If this is high + // relative to ExternalSeeks, it's a good indication that there's an + // accumulation of garbage within the LSM (NOT MVCC garbage). + InternalSeeks int + // InternalSteps is the total count of relative positioning operations (eg, + // Nexts, Prevs, NextPrefix, etc) in forward and backward directions + // performed on pebble's internal iterator. If this is high relative to + // ExternalSteps, it's a good indication that there's an accumulation of + // garbage within the LSM (NOT MVCC garbage). + InternalSteps int } // MetricsForInterval is a set of pebble.Metrics that need to be saved in order to diff --git a/pkg/storage/intent_interleaving_iter.go b/pkg/storage/intent_interleaving_iter.go index f2c31525b0c6..fe0d45cb51a7 100644 --- a/pkg/storage/intent_interleaving_iter.go +++ b/pkg/storage/intent_interleaving_iter.go @@ -267,7 +267,7 @@ func newIntentInterleavingIterator(reader Reader, opts IterOptions) MVCCIterator if reader.ConsistentIterators() { iter = maybeUnwrapUnsafeIter(reader.NewMVCCIterator(MVCCKeyIterKind, opts)).(*pebbleIterator) } else { - iter = newPebbleIteratorByCloning(intentIter.GetRawIter(), opts, StandardDurability) + iter = newPebbleIteratorByCloning(intentIter.CloneContext(), opts, StandardDurability) } *iiIter = intentInterleavingIter{ diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index bf324a0f482c..fb1798e988f3 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -5052,7 +5052,7 @@ func MVCCResolveWriteIntentRange( mvccIter = rw.NewMVCCIterator(MVCCKeyIterKind, iterOpts) } else { // For correctness, we need mvccIter to be consistent with engineIter. - mvccIter = newPebbleIteratorByCloning(engineIter.GetRawIter(), iterOpts, StandardDurability) + mvccIter = newPebbleIteratorByCloning(engineIter.CloneContext(), iterOpts, StandardDurability) } iterAndBuf := GetBufUsingIter(mvccIter) defer func() { diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 3ada2534b01c..7f546f39fb1b 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -768,6 +768,10 @@ type Pebble struct { diskStallCount int64 sharedBytesRead int64 sharedBytesWritten int64 + iterStats struct { + syncutil.Mutex + AggregatedIteratorStats + } // Relevant options copied over from pebble.Options. unencryptedFS vfs.FS @@ -1306,6 +1310,21 @@ func (p *Pebble) Close() { } } +// aggregateIterStats is propagated to all of an engine's iterators, aggregating +// iterator stats when an iterator is closed or its stats are reset. These +// aggregated stats are exposed through GetMetrics. +func (p *Pebble) aggregateIterStats(stats IteratorStats) { + p.iterStats.Lock() + defer p.iterStats.Unlock() + p.iterStats.BlockBytes += stats.Stats.InternalStats.BlockBytes + p.iterStats.BlockBytesInCache += stats.Stats.InternalStats.BlockBytesInCache + p.iterStats.BlockReadDuration += stats.Stats.InternalStats.BlockReadDuration + p.iterStats.ExternalSeeks += stats.Stats.ForwardSeekCount[pebble.InterfaceCall] + stats.Stats.ReverseSeekCount[pebble.InterfaceCall] + p.iterStats.ExternalSteps += stats.Stats.ForwardStepCount[pebble.InterfaceCall] + stats.Stats.ReverseStepCount[pebble.InterfaceCall] + p.iterStats.InternalSeeks += stats.Stats.ForwardSeekCount[pebble.InternalIterCall] + stats.Stats.ReverseSeekCount[pebble.InternalIterCall] + p.iterStats.InternalSteps += stats.Stats.ForwardStepCount[pebble.InternalIterCall] + stats.Stats.ReverseStepCount[pebble.InternalIterCall] +} + // Closed implements the Engine interface. func (p *Pebble) Closed() bool { return p.closed @@ -1338,13 +1357,13 @@ func (p *Pebble) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) MVCCIt return maybeWrapInUnsafeIter(iter) } - iter := newPebbleIterator(p.db, opts, StandardDurability) + iter := newPebbleIterator(p.db, opts, StandardDurability, p) return maybeWrapInUnsafeIter(iter) } // NewEngineIterator implements the Engine interface. func (p *Pebble) NewEngineIterator(opts IterOptions) EngineIterator { - return newPebbleIterator(p.db, opts, StandardDurability) + return newPebbleIterator(p.db, opts, StandardDurability, p) } // ConsistentIterators implements the Engine interface. @@ -1729,9 +1748,8 @@ func (p *Pebble) Flush() error { // GetMetrics implements the Engine interface. func (p *Pebble) GetMetrics() Metrics { - m := p.db.Metrics() - return Metrics{ - Metrics: m, + m := Metrics{ + Metrics: p.db.Metrics(), WriteStallCount: atomic.LoadInt64(&p.writeStallCount), WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))), DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount), @@ -1739,6 +1757,10 @@ func (p *Pebble) GetMetrics() Metrics { SharedStorageReadBytes: atomic.LoadInt64(&p.sharedBytesRead), SharedStorageWriteBytes: atomic.LoadInt64(&p.sharedBytesWritten), } + p.iterStats.Lock() + m.Iterator = p.iterStats.AggregatedIteratorStats + p.iterStats.Unlock() + return m } // GetEncryptionRegistries implements the Engine interface. @@ -1834,7 +1856,7 @@ func (p *Pebble) GetAuxiliaryDir() string { // NewBatch implements the Engine interface. func (p *Pebble) NewBatch() Batch { - return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings) + return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p) } // NewReadOnly implements the Engine interface. @@ -1844,12 +1866,12 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter { // NewUnindexedBatch implements the Engine interface. func (p *Pebble) NewUnindexedBatch() Batch { - return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings) + return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p) } // NewWriteBatch implements the Engine interface. func (p *Pebble) NewWriteBatch() WriteBatch { - return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings) + return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p) } // NewSnapshot implements the Engine interface. @@ -2158,13 +2180,16 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions iter = &p.prefixIter } if iter.inuse { - return newPebbleIteratorByCloning(p.iter, opts, p.durability) + return newPebbleIteratorByCloning(CloneContext{ + rawIter: p.iter, + statsReporter: p.parent, + }, opts, p.durability) } if iter.iter != nil { iter.setOptions(opts, p.durability) } else { - iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability) + iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability, p.parent) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -2188,13 +2213,16 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { iter = &p.prefixEngineIter } if iter.inuse { - return newPebbleIteratorByCloning(p.iter, opts, p.durability) + return newPebbleIteratorByCloning(CloneContext{ + rawIter: p.iter, + statsReporter: p.parent, + }, opts, p.durability) } if iter.iter != nil { iter.setOptions(opts, p.durability) } else { - iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability) + iter.initReuseOrCreate(p.parent.db, p.iter, p.iterUsed, opts, p.durability, p.parent) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -2383,13 +2411,13 @@ func (p *pebbleSnapshot) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions return maybeWrapInUnsafeIter(iter) } - iter := MVCCIterator(newPebbleIterator(p.snapshot, opts, StandardDurability)) + iter := MVCCIterator(newPebbleIterator(p.snapshot, opts, StandardDurability, p.parent)) return maybeWrapInUnsafeIter(iter) } // NewEngineIterator implements the Reader interface. func (p pebbleSnapshot) NewEngineIterator(opts IterOptions) EngineIterator { - return newPebbleIterator(p.snapshot, opts, StandardDurability) + return newPebbleIterator(p.snapshot, opts, StandardDurability, p.parent) } // ConsistentIterators implements the Reader interface. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 0bc9dc625263..ec152ca8f12c 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -55,6 +55,7 @@ type pebbleBatch struct { // scratch space for wrappedIntentWriter. scratch []byte + statsReporter statsReporter settings *cluster.Settings shouldWriteLocalTimestamps bool shouldWriteLocalTimestampsCached bool @@ -70,7 +71,11 @@ var pebbleBatchPool = sync.Pool{ // Instantiates a new pebbleBatch. func newPebbleBatch( - db *pebble.DB, batch *pebble.Batch, writeOnly bool, settings *cluster.Settings, + db *pebble.DB, + batch *pebble.Batch, + writeOnly bool, + settings *cluster.Settings, + statsReporter statsReporter, ) *pebbleBatch { pb := pebbleBatchPool.Get().(*pebbleBatch) *pb = pebbleBatch{ @@ -97,8 +102,9 @@ func newPebbleBatch( upperBoundBuf: pb.normalEngineIter.upperBoundBuf, reusable: true, }, - writeOnly: writeOnly, - settings: settings, + writeOnly: writeOnly, + statsReporter: statsReporter, + settings: settings, } pb.wrappedIntentWriter = wrapIntentWriter(pb) return pb @@ -177,13 +183,16 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M handle = p.db } if iter.inuse { - return newPebbleIteratorByCloning(p.iter, opts, StandardDurability) + return newPebbleIteratorByCloning(CloneContext{ + rawIter: p.iter, + statsReporter: p.statsReporter, + }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -210,13 +219,16 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { handle = p.db } if iter.inuse { - return newPebbleIteratorByCloning(p.iter, opts, StandardDurability) + return newPebbleIteratorByCloning(CloneContext{ + rawIter: p.iter, + statsReporter: p.statsReporter, + }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index 95ef7ec31845..a69ae1b9dc74 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -46,6 +46,10 @@ type pebbleIterator struct { // Buffer used to store MVCCRangeKeyVersions returned by RangeKeys(). Lazily // initialized the first time an iterator's RangeKeys() method is called. mvccRangeKeyVersions []MVCCRangeKeyVersion + // statsReporter is used to sum iterator stats across all the iterators + // during the lifetime of the Engine when the iterator is closed or its + // stats reset. It's intended to be used with (*Pebble). It must not be nil. + statsReporter statsReporter // Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips // SSTables based on MVCC/Engine key when true. @@ -71,6 +75,16 @@ type pebbleIterator struct { mvccDone bool } +type statsReporter interface { + aggregateIterStats(IteratorStats) +} + +var noopStatsReporter = noopStatsReporterImpl{} + +type noopStatsReporterImpl struct{} + +func (noopStatsReporterImpl) aggregateIterStats(IteratorStats) {} + var _ MVCCIterator = &pebbleIterator{} var _ EngineIterator = &pebbleIterator{} @@ -82,11 +96,14 @@ var pebbleIterPool = sync.Pool{ // newPebbleIterator creates a new Pebble iterator for the given Pebble reader. func newPebbleIterator( - handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, + handle pebble.Reader, + opts IterOptions, + durability DurabilityRequirement, + statsReporter statsReporter, ) *pebbleIterator { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, durability) + p.init(nil, opts, durability, statsReporter) p.iter = pebbleiter.MaybeWrap(handle.NewIter(&p.options)) return p } @@ -94,13 +111,13 @@ func newPebbleIterator( // newPebbleIteratorByCloning creates a new Pebble iterator by cloning the given // iterator and reconfiguring it. func newPebbleIteratorByCloning( - iter pebbleiter.Iterator, opts IterOptions, durability DurabilityRequirement, + cloneCtx CloneContext, opts IterOptions, durability DurabilityRequirement, ) *pebbleIterator { var err error p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, durability) - p.iter, err = iter.Clone(pebble.CloneOptions{ + p.init(nil, opts, durability, cloneCtx.statsReporter) + p.iter, err = cloneCtx.rawIter.Clone(pebble.CloneOptions{ IterOptions: &p.options, RefreshBatchView: true, }) @@ -117,7 +134,7 @@ func newPebbleSSTIterator( ) (*pebbleIterator, error) { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive - p.init(nil, opts, StandardDurability) + p.init(nil, opts, StandardDurability, noopStatsReporter) var externalIterOpts []pebble.ExternalIterOption if forwardOnly { @@ -138,7 +155,10 @@ func newPebbleSSTIterator( // reconfiguring the given iter. It is valid to pass a nil iter and then create // p.iter using p.options, to avoid redundant reconfiguration via SetOptions(). func (p *pebbleIterator) init( - iter pebbleiter.Iterator, opts IterOptions, durability DurabilityRequirement, + iter pebbleiter.Iterator, + opts IterOptions, + durability DurabilityRequirement, + statsReporter statsReporter, ) { *p = pebbleIterator{ iter: iter, @@ -146,6 +166,7 @@ func (p *pebbleIterator) init( lowerBoundBuf: p.lowerBoundBuf, upperBoundBuf: p.upperBoundBuf, rangeKeyMaskingBuf: p.rangeKeyMaskingBuf, + statsReporter: statsReporter, reusable: p.reusable, } p.setOptions(opts, durability) @@ -164,13 +185,14 @@ func (p *pebbleIterator) initReuseOrCreate( clone bool, opts IterOptions, durability DurabilityRequirement, + statsReporter statsReporter, ) { if iter != nil && !clone { - p.init(iter, opts, durability) + p.init(iter, opts, durability, statsReporter) return } - p.init(nil, opts, durability) + p.init(nil, opts, durability, statsReporter) if iter == nil { p.iter = pebbleiter.MaybeWrap(handle.NewIter(&p.options)) } else if clone { @@ -286,6 +308,12 @@ func (p *pebbleIterator) Close() { } p.inuse = false + // Report the iterator's stats so they can be accumulated and exposed + // through time-series metrics. + if p.iter != nil { + p.statsReporter.aggregateIterStats(p.Stats()) + } + if p.reusable { p.iter.ResetStats() return @@ -887,9 +915,9 @@ func (p *pebbleIterator) IsPrefix() bool { return p.prefix } -// GetRawIter is part of the EngineIterator interface. -func (p *pebbleIterator) GetRawIter() pebbleiter.Iterator { - return p.iter +// CloneContext is part of the EngineIterator interface. +func (p *pebbleIterator) CloneContext() CloneContext { + return CloneContext{rawIter: p.iter, statsReporter: p.statsReporter} } func (p *pebbleIterator) getBlockPropertyFilterMask() pebble.BlockPropertyFilterMask { diff --git a/pkg/storage/pebble_iterator_test.go b/pkg/storage/pebble_iterator_test.go index efaa2b3aac0e..04e6b106c7ce 100644 --- a/pkg/storage/pebble_iterator_test.go +++ b/pkg/storage/pebble_iterator_test.go @@ -73,7 +73,7 @@ func TestPebbleIterator_Corruption(t *testing.T) { LowerBound: []byte("a"), UpperBound: []byte("z"), } - iter := newPebbleIterator(p.db, iterOpts, StandardDurability) + iter := newPebbleIterator(p.db, iterOpts, StandardDurability, noopStatsReporter) // Seeking into the table catches the corruption. ok, err := iter.SeekEngineKeyGE(ek) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 684a93288cbf..ee2d19977cf6 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3249,6 +3249,29 @@ var charts = []sectionDescription{ Title: "WAL Fsync Latency", Metrics: []string{"storage.wal.fsync.latency"}, }, + { + Title: "Iterator Block Loads", + Metrics: []string{ + "storage.iterator.block-load.bytes", + "storage.iterator.block-load.cached-bytes", + }, + AxisLabel: "Bytes", + }, + { + Title: "Iterator I/O", + Metrics: []string{"storage.iterator.block-load.read-duration"}, + AxisLabel: "Duration (nanos)", + }, + { + Title: "Iterator Operations", + Metrics: []string{ + "storage.iterator.external.seeks", + "storage.iterator.external.steps", + "storage.iterator.internal.seeks", + "storage.iterator.internal.steps", + }, + AxisLabel: "Ops", + }, }, }, {