From 4e3ea0accc741ffe3357d5b9532879a1f672a125 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 25 Apr 2023 14:30:33 -0400 Subject: [PATCH] kvserver,storage: BatchCommitStats plumbing for raft appends and metrics Pebble now produces detailed stats for a batch commit. These are partially plumbed into kvserver, for (a) log statements related to raft appends, (b) cumulative duration metrics applicable for all writes (raft and state machine). Informs https://github.com/cockroachdb/pebble/issues/1943 Epic: none Release note: None --- pkg/kv/kvserver/logstore/logstore.go | 15 +++- .../kvserver/logstore/logstore_bench_test.go | 2 +- pkg/kv/kvserver/logstore/sync_waiter.go | 6 +- pkg/kv/kvserver/metrics.go | 80 +++++++++++++++++++ pkg/kv/kvserver/replica_raft.go | 13 ++- pkg/kv/kvserver/replica_raft_test.go | 15 +++- pkg/kv/kvserver/spanset/batch.go | 4 + .../testdata/handle_raft_ready_stats.txt | 2 +- pkg/storage/engine.go | 44 +++++++++- pkg/storage/pebble.go | 27 ++++++- pkg/storage/pebble_batch.go | 35 +++++--- pkg/storage/pebble_iterator.go | 10 +-- 12 files changed, 223 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index 5db622574419..ece2ffd87f5b 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -87,6 +87,9 @@ type AppendStats struct { PebbleBegin time.Time PebbleEnd time.Time PebbleBytes int64 + // Only set when !NonBlocking, which means almost never, since + // kv.raft_log.non_blocking_synchronization.enabled defaults to true. + PebbleCommitStats storage.BatchCommitStats Sync bool // If true, PebbleEnd-PebbleBegin does not include the sync time. @@ -113,8 +116,9 @@ type LogStore struct { // SyncCallback is a callback that is notified when a raft log write has been // durably committed to disk. The function is handed the response messages that // are associated with the MsgStorageAppend that triggered the fsync. +// commitStats is populated iff this was a non-blocking sync. type SyncCallback interface { - OnLogSync(context.Context, []raftpb.Message) + OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) } func newStoreEntriesBatch(eng storage.Engine) storage.Batch { @@ -257,6 +261,7 @@ func (s *LogStore) storeEntriesAndCommitBatch( ctx: ctx, cb: cb, msgs: m.Responses, + batch: batch, metrics: s.Metrics, logCommitBegin: stats.PebbleBegin, } @@ -269,10 +274,11 @@ func (s *LogStore) storeEntriesAndCommitBatch( return RaftState{}, errors.Wrap(err, expl) } stats.PebbleEnd = timeutil.Now() + stats.PebbleCommitStats = batch.CommitStats() if wantsSync { logCommitEnd := stats.PebbleEnd s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) - cb.OnLogSync(ctx, m.Responses) + cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{}) } } stats.Sync = wantsSync @@ -325,6 +331,8 @@ type nonBlockingSyncWaiterCallback struct { ctx context.Context cb SyncCallback msgs []raftpb.Message + // Used to extract stats. This is the batch that has been synced. + batch storage.WriteBatch // Used to record Metrics. metrics Metrics logCommitBegin time.Time @@ -334,7 +342,8 @@ type nonBlockingSyncWaiterCallback struct { func (cb *nonBlockingSyncWaiterCallback) run() { dur := timeutil.Since(cb.logCommitBegin).Nanoseconds() cb.metrics.RaftLogCommitLatency.RecordValue(dur) - cb.cb.OnLogSync(cb.ctx, cb.msgs) + commitStats := cb.batch.CommitStats() + cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats) cb.release() } diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index da7604427ed6..a22268c24057 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -38,7 +38,7 @@ func (b *discardBatch) Commit(bool) error { type noopSyncCallback struct{} -func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {} +func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) {} func BenchmarkLogStore_StoreEntries(b *testing.B) { defer log.Scope(b).Close(b) diff --git a/pkg/kv/kvserver/logstore/sync_waiter.go b/pkg/kv/kvserver/logstore/sync_waiter.go index 31880796dfc0..9e422b294008 100644 --- a/pkg/kv/kvserver/logstore/sync_waiter.go +++ b/pkg/kv/kvserver/logstore/sync_waiter.go @@ -95,8 +95,8 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { if err := w.wg.SyncWait(); err != nil { log.Fatalf(ctx, "SyncWait error: %+v", err) } - w.wg.Close() w.cb.run() + w.wg.Close() case <-stopper.ShouldQuiesce(): return } @@ -108,7 +108,9 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { // durably committed. It may never be called in case the stopper stops. // // The syncWaiter will be Closed after its SyncWait method completes. It must -// not be Closed by the caller. +// not be Closed by the caller. The cb is called before the syncWaiter is +// closed, in case the cb implementation needs to extract something form the +// syncWaiter. // // If the SyncWaiterLoop has already been stopped, the callback will never be // called. diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index df31b2c6cdf7..3712fd528669 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -759,6 +759,61 @@ bytes preserved during flushes and compactions over the lifetime of the process. Measurement: "Bytes", Unit: metric.Unit_BYTES, } + metaBatchCommitCount = metric.Metadata{ + Name: "storage.batch-commit.count", + Help: "Count of batch commits. See storage.AggregatedBatchCommitStats for details.", + Measurement: "Commit Ops", + Unit: metric.Unit_COUNT, + } + metaBatchCommitDuration = metric.Metadata{ + Name: "storage.batch-commit.duration", + Help: "Cumulative time spent in batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitSemWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.sem-wait.duration", + Help: "Cumulative time spent in semaphore wait, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitWALQWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.wal-queue-wait.duration", + Help: "Cumulative time spent waiting for memory blocks in the WAL queue, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitMemStallDuration = metric.Metadata{ + Name: "storage.batch-commit.mem-stall.duration", + Help: "Cumulative time spent in a write stall due to too many memtables, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitL0StallDuration = metric.Metadata{ + Name: "storage.batch-commit.l0-stall.duration", + Help: "Cumulative time spent in a write stall due to high read amplification in L0, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitWALRotDuration = metric.Metadata{ + Name: "storage.batch-commit.wal-rotation.duration", + Help: "Cumulative time spent waiting for WAL rotation, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + metaBatchCommitCommitWaitDuration = metric.Metadata{ + Name: "storage.batch-commit.commit-wait.duration", + Help: "Cumulative time spent waiting for WAL sync, for batch commit. " + + "See storage.AggregatedBatchCommitStats for details.", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } ) var ( @@ -2068,6 +2123,14 @@ type StoreMetrics struct { FlushableIngestCount *metric.Gauge FlushableIngestTableCount *metric.Gauge FlushableIngestTableSize *metric.Gauge + BatchCommitCount *metric.Gauge + BatchCommitDuration *metric.Gauge + BatchCommitSemWaitDuration *metric.Gauge + BatchCommitWALQWaitDuration *metric.Gauge + BatchCommitMemStallDuration *metric.Gauge + BatchCommitL0StallDuration *metric.Gauge + BatchCommitWALRotWaitDuration *metric.Gauge + BatchCommitCommitWaitDuration *metric.Gauge RdbCheckpoints *metric.Gauge @@ -2684,6 +2747,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount), FlushableIngestTableCount: metric.NewGauge(metaFlushableIngestTableCount), FlushableIngestTableSize: metric.NewGauge(metaFlushableIngestTableBytes), + BatchCommitCount: metric.NewGauge(metaBatchCommitCount), + BatchCommitDuration: metric.NewGauge(metaBatchCommitDuration), + BatchCommitSemWaitDuration: metric.NewGauge(metaBatchCommitSemWaitDuration), + BatchCommitWALQWaitDuration: metric.NewGauge(metaBatchCommitWALQWaitDuration), + BatchCommitMemStallDuration: metric.NewGauge(metaBatchCommitMemStallDuration), + BatchCommitL0StallDuration: metric.NewGauge(metaBatchCommitL0StallDuration), + BatchCommitWALRotWaitDuration: metric.NewGauge(metaBatchCommitWALRotDuration), + BatchCommitCommitWaitDuration: metric.NewGauge(metaBatchCommitCommitWaitDuration), RdbCheckpoints: metric.NewGauge(metaRdbCheckpoints), @@ -3035,6 +3106,15 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { sm.FlushableIngestCount.Update(int64(m.Flush.AsIngestCount)) sm.FlushableIngestTableCount.Update(int64(m.Flush.AsIngestTableCount)) sm.FlushableIngestTableSize.Update(int64(m.Flush.AsIngestBytes)) + sm.BatchCommitCount.Update(int64(m.BatchCommitStats.Count)) + sm.BatchCommitDuration.Update(int64(m.BatchCommitStats.TotalDuration)) + sm.BatchCommitSemWaitDuration.Update(int64(m.BatchCommitStats.SemaphoreWaitDuration)) + sm.BatchCommitWALQWaitDuration.Update(int64(m.BatchCommitStats.WALQueueWaitDuration)) + sm.BatchCommitMemStallDuration.Update(int64(m.BatchCommitStats.MemTableWriteStallDuration)) + sm.BatchCommitL0StallDuration.Update(int64(m.BatchCommitStats.L0ReadAmpWriteStallDuration)) + sm.BatchCommitWALRotWaitDuration.Update(int64(m.BatchCommitStats.WALRotationDuration)) + sm.BatchCommitCommitWaitDuration.Update(int64(m.BatchCommitStats.CommitWaitDuration)) + // Update the maximum number of L0 sub-levels seen. sm.l0SublevelsTracker.Lock() sm.l0SublevelsTracker.swag.Record(timeutil.Now(), float64(m.Levels[0].Sublevels)) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 80d3b162410b..d7dd421e8db7 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -688,6 +688,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { p.Printf(", snapshot ignored") } } + if !(s.append.PebbleCommitStats == storage.BatchCommitStats{}) { + p.Printf(" pebble stats: [%s]", s.append.PebbleCommitStats) + } } func (s handleRaftReadyStats) String() string { @@ -1515,9 +1518,15 @@ func (r *Replica) maybeCoalesceHeartbeat( // replicaSyncCallback implements the logstore.SyncCallback interface. type replicaSyncCallback Replica -func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) { +func (r *replicaSyncCallback) OnLogSync( + ctx context.Context, msgs []raftpb.Message, commitStats storage.BatchCommitStats, +) { + repl := (*Replica)(r) // Send MsgStorageAppend's responses. - (*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */) + repl.sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */) + if commitStats.TotalDuration > defaultReplicaRaftMuWarnThreshold { + log.Infof(repl.raftCtx, "slow non-blocking raft commit: %s", commitStats) + } } // sendRaftMessages sends a slice of Raft messages. diff --git a/pkg/kv/kvserver/replica_raft_test.go b/pkg/kv/kvserver/replica_raft_test.go index 95c344c4f59a..5a6c80869d38 100644 --- a/pkg/kv/kvserver/replica_raft_test.go +++ b/pkg/kv/kvserver/replica_raft_test.go @@ -17,11 +17,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils/datapathutils" "github.com/cockroachdb/cockroach/pkg/testutils/echotest" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/pebble" "github.com/cockroachdb/redact" "github.com/stretchr/testify/assert" "go.etcd.io/raft/v3/tracker" @@ -98,7 +100,18 @@ func Test_handleRaftReadyStats_SafeFormat(t *testing.T) { PebbleBegin: ts(3), PebbleEnd: ts(4), PebbleBytes: 1024 * 5, - Sync: true, + PebbleCommitStats: storage.BatchCommitStats{ + BatchCommitStats: pebble.BatchCommitStats{ + TotalDuration: 100 * time.Millisecond, + SemaphoreWaitDuration: 2 * time.Millisecond, + WALQueueWaitDuration: 5 * time.Millisecond, + MemTableWriteStallDuration: 8 * time.Millisecond, + L0ReadAmpWriteStallDuration: 11 * time.Millisecond, + WALRotationDuration: 14 * time.Millisecond, + CommitWaitDuration: 17 * time.Millisecond, + }, + }, + Sync: true, }, tSnapBegin: ts(4), tSnapEnd: ts(5), diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index 805fef2bcd8d..5d2a773379ae 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -785,6 +785,10 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } +func (s spanSetBatch) CommitStats() storage.BatchCommitStats { + return s.b.CommitStats() +} + // NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. diff --git a/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt b/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt index 0dfb67499ba7..1713c37c67dc 100644 --- a/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt +++ b/pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt @@ -1,3 +1,3 @@ echo ---- -raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied +raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied pebble stats: [commit-wait 17ms wal-q 5ms mem-stall 8ms l0-stall 11ms wal-rot 14ms sem 2ms] diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 16f3904c7367..3ae1dd5a5123 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/vfs" + "github.com/cockroachdb/redact" prometheusgo "github.com/prometheus/client_model/go" ) @@ -392,7 +393,7 @@ type EngineIterator interface { // a clone of an existing iterator. type CloneContext struct { rawIter pebbleiter.Iterator - statsReporter statsReporter + statsReporter iterStatsReporter } // IterOptions contains options used to create an {MVCC,Engine}Iterator. @@ -1024,13 +1025,43 @@ type WriteBatch interface { // Repr returns the underlying representation of the batch and can be used to // reconstitute the batch on a remote node using Writer.ApplyBatchRepr(). Repr() []byte + // CommitStats returns stats related to committing the batch. Should be + // called after Batch.Commit. If CommitNoSyncWait is used, it should be + // called after the call to SyncWait. + CommitStats() BatchCommitStats +} + +type BatchCommitStats struct { + pebble.BatchCommitStats +} + +// SafeFormat implements redact.SafeFormatter. It does not print the total +// duration. +func (stats BatchCommitStats) SafeFormat(p redact.SafePrinter, _ rune) { + p.Printf("commit-wait %s", stats.CommitWaitDuration) + if stats.WALQueueWaitDuration > 0 { + p.Printf(" wal-q %s", stats.WALQueueWaitDuration) + } + if stats.MemTableWriteStallDuration > 0 { + p.Printf(" mem-stall %s", stats.MemTableWriteStallDuration) + } + if stats.L0ReadAmpWriteStallDuration > 0 { + p.Printf(" l0-stall %s", stats.L0ReadAmpWriteStallDuration) + } + if stats.WALRotationDuration > 0 { + p.Printf(" wal-rot %s", stats.WALRotationDuration) + } + if stats.SemaphoreWaitDuration > 0 { + p.Printf(" sem %s", stats.SemaphoreWaitDuration) + } } // Metrics is a set of Engine metrics. Most are contained in the embedded // *pebble.Metrics struct, which has its own documentation. type Metrics struct { *pebble.Metrics - Iterator AggregatedIteratorStats + Iterator AggregatedIteratorStats + BatchCommitStats AggregatedBatchCommitStats // DiskSlowCount counts the number of times Pebble records disk slowness. DiskSlowCount int64 // DiskStallCount counts the number of times Pebble observes slow writes @@ -1092,6 +1123,15 @@ type AggregatedIteratorStats struct { InternalSteps int } +// AggregatedBatchCommitStats hold cumulative stats summed over all the +// batches that committed at the engine. Since these are durations, only the +// mean (over an interval) can be recovered. We can change some of these to +// histograms once we know which ones are more useful. +type AggregatedBatchCommitStats struct { + Count uint64 + BatchCommitStats +} + // MetricsForInterval is a set of pebble.Metrics that need to be saved in order to // compute metrics according to an interval. type MetricsForInterval struct { diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 815920edf785..dbbd22a57c5a 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -773,7 +773,10 @@ type Pebble struct { syncutil.Mutex AggregatedIteratorStats } - + batchCommitStats struct { + syncutil.Mutex + AggregatedBatchCommitStats + } // Relevant options copied over from pebble.Options. unencryptedFS vfs.FS logCtx context.Context @@ -1350,6 +1353,19 @@ func (p *Pebble) aggregateIterStats(stats IteratorStats) { p.iterStats.InternalSteps += stats.Stats.ForwardStepCount[pebble.InternalIterCall] + stats.Stats.ReverseStepCount[pebble.InternalIterCall] } +func (p *Pebble) aggregateBatchCommitStats(stats BatchCommitStats) { + p.batchCommitStats.Lock() + p.batchCommitStats.Count++ + p.batchCommitStats.TotalDuration += stats.TotalDuration + p.batchCommitStats.SemaphoreWaitDuration += stats.SemaphoreWaitDuration + p.batchCommitStats.WALQueueWaitDuration += stats.WALQueueWaitDuration + p.batchCommitStats.MemTableWriteStallDuration += stats.MemTableWriteStallDuration + p.batchCommitStats.L0ReadAmpWriteStallDuration += stats.L0ReadAmpWriteStallDuration + p.batchCommitStats.WALRotationDuration += stats.WALRotationDuration + p.batchCommitStats.CommitWaitDuration += stats.CommitWaitDuration + p.batchCommitStats.Unlock() +} + // Closed implements the Engine interface. func (p *Pebble) Closed() bool { return p.closed @@ -1785,6 +1801,9 @@ func (p *Pebble) GetMetrics() Metrics { p.iterStats.Lock() m.Iterator = p.iterStats.AggregatedIteratorStats p.iterStats.Unlock() + p.batchCommitStats.Lock() + m.BatchCommitStats = p.batchCommitStats.AggregatedBatchCommitStats + p.batchCommitStats.Unlock() return m } @@ -1892,7 +1911,7 @@ func (p *Pebble) GetAuxiliaryDir() string { // NewBatch implements the Engine interface. func (p *Pebble) NewBatch() Batch { - return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewIndexedBatch(), false /* writeOnly */, p.settings, p, p) } // NewReadOnly implements the Engine interface. @@ -1902,12 +1921,12 @@ func (p *Pebble) NewReadOnly(durability DurabilityRequirement) ReadWriter { // NewUnindexedBatch implements the Engine interface. func (p *Pebble) NewUnindexedBatch() Batch { - return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewBatch(), false /* writeOnly */, p.settings, p, p) } // NewWriteBatch implements the Engine interface. func (p *Pebble) NewWriteBatch() WriteBatch { - return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p) + return newPebbleBatch(p.db, p.db.NewBatch(), true /* writeOnly */, p.settings, p, p) } // NewSnapshot implements the Engine interface. diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index ec152ca8f12c..fcfa7e7789f0 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -55,7 +55,8 @@ type pebbleBatch struct { // scratch space for wrappedIntentWriter. scratch []byte - statsReporter statsReporter + iterStatsReporter iterStatsReporter + batchStatsReporter batchStatsReporter settings *cluster.Settings shouldWriteLocalTimestamps bool shouldWriteLocalTimestampsCached bool @@ -69,13 +70,18 @@ var pebbleBatchPool = sync.Pool{ }, } +type batchStatsReporter interface { + aggregateBatchCommitStats(stats BatchCommitStats) +} + // Instantiates a new pebbleBatch. func newPebbleBatch( db *pebble.DB, batch *pebble.Batch, writeOnly bool, settings *cluster.Settings, - statsReporter statsReporter, + iterStatsReporter iterStatsReporter, + batchStatsReporter batchStatsReporter, ) *pebbleBatch { pb := pebbleBatchPool.Get().(*pebbleBatch) *pb = pebbleBatch{ @@ -102,9 +108,10 @@ func newPebbleBatch( upperBoundBuf: pb.normalEngineIter.upperBoundBuf, reusable: true, }, - writeOnly: writeOnly, - statsReporter: statsReporter, - settings: settings, + writeOnly: writeOnly, + iterStatsReporter: iterStatsReporter, + batchStatsReporter: batchStatsReporter, + settings: settings, } pb.wrappedIntentWriter = wrapIntentWriter(pb) return pb @@ -185,14 +192,14 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ rawIter: p.iter, - statsReporter: p.statsReporter, + statsReporter: p.iterStatsReporter, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -221,14 +228,14 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { if iter.inuse { return newPebbleIteratorByCloning(CloneContext{ rawIter: p.iter, - statsReporter: p.statsReporter, + statsReporter: p.iterStatsReporter, }, opts, StandardDurability) } if iter.iter != nil { iter.setOptions(opts, StandardDurability) } else { - iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.statsReporter) + iter.initReuseOrCreate(handle, p.iter, p.iterUsed, opts, StandardDurability, p.iterStatsReporter) if p.iter == nil { // For future cloning. p.iter = iter.iter @@ -557,6 +564,8 @@ func (p *pebbleBatch) Commit(sync bool) error { // or don't have after they receive an error from this method. panic(err) } + p.batchStatsReporter.aggregateBatchCommitStats( + BatchCommitStats{p.batch.CommitStats()}) return err } @@ -595,6 +604,9 @@ func (p *pebbleBatch) SyncWait() error { // or don't have after they receive an error from this method. panic(err) } + p.batchStatsReporter.aggregateBatchCommitStats( + BatchCommitStats{p.batch.CommitStats()}) + return err } @@ -624,6 +636,11 @@ func (p *pebbleBatch) Repr() []byte { return reprCopy } +// CommitStats implements the Batch interface. +func (p *pebbleBatch) CommitStats() BatchCommitStats { + return BatchCommitStats{BatchCommitStats: p.batch.CommitStats()} +} + // ShouldWriteLocalTimestamps implements the Writer interface. func (p *pebbleBatch) ShouldWriteLocalTimestamps(ctx context.Context) bool { // pebbleBatch is short-lived, so cache the value for performance. diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index a69ae1b9dc74..973cfe5e007a 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -49,7 +49,7 @@ type pebbleIterator struct { // statsReporter is used to sum iterator stats across all the iterators // during the lifetime of the Engine when the iterator is closed or its // stats reset. It's intended to be used with (*Pebble). It must not be nil. - statsReporter statsReporter + statsReporter iterStatsReporter // Set to true to govern whether to call SeekPrefixGE or SeekGE. Skips // SSTables based on MVCC/Engine key when true. @@ -75,7 +75,7 @@ type pebbleIterator struct { mvccDone bool } -type statsReporter interface { +type iterStatsReporter interface { aggregateIterStats(IteratorStats) } @@ -99,7 +99,7 @@ func newPebbleIterator( handle pebble.Reader, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) *pebbleIterator { p := pebbleIterPool.Get().(*pebbleIterator) p.reusable = false // defensive @@ -158,7 +158,7 @@ func (p *pebbleIterator) init( iter pebbleiter.Iterator, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) { *p = pebbleIterator{ iter: iter, @@ -185,7 +185,7 @@ func (p *pebbleIterator) initReuseOrCreate( clone bool, opts IterOptions, durability DurabilityRequirement, - statsReporter statsReporter, + statsReporter iterStatsReporter, ) { if iter != nil && !clone { p.init(iter, opts, durability, statsReporter)