Skip to content

Commit

Permalink
kvserver,storage: BatchCommitStats plumbing for raft appends and metrics
Browse files Browse the repository at this point in the history
Pebble now produces detailed stats for a batch commit. These are partially
plumbed into kvserver, for (a) log statements related to raft appends,
(b) cumulative duration metrics applicable for all writes (raft and state
machine).

Informs cockroachdb/pebble#1943

Epic: none

Release note: None
  • Loading branch information
sumeerbhola committed May 25, 2023
1 parent 562f083 commit 4e3ea0a
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 30 deletions.
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/logstore/logstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ type AppendStats struct {
PebbleBegin time.Time
PebbleEnd time.Time
PebbleBytes int64
// Only set when !NonBlocking, which means almost never, since
// kv.raft_log.non_blocking_synchronization.enabled defaults to true.
PebbleCommitStats storage.BatchCommitStats

Sync bool
// If true, PebbleEnd-PebbleBegin does not include the sync time.
Expand All @@ -113,8 +116,9 @@ type LogStore struct {
// SyncCallback is a callback that is notified when a raft log write has been
// durably committed to disk. The function is handed the response messages that
// are associated with the MsgStorageAppend that triggered the fsync.
// commitStats is populated iff this was a non-blocking sync.
type SyncCallback interface {
OnLogSync(context.Context, []raftpb.Message)
OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats)
}

func newStoreEntriesBatch(eng storage.Engine) storage.Batch {
Expand Down Expand Up @@ -257,6 +261,7 @@ func (s *LogStore) storeEntriesAndCommitBatch(
ctx: ctx,
cb: cb,
msgs: m.Responses,
batch: batch,
metrics: s.Metrics,
logCommitBegin: stats.PebbleBegin,
}
Expand All @@ -269,10 +274,11 @@ func (s *LogStore) storeEntriesAndCommitBatch(
return RaftState{}, errors.Wrap(err, expl)
}
stats.PebbleEnd = timeutil.Now()
stats.PebbleCommitStats = batch.CommitStats()
if wantsSync {
logCommitEnd := stats.PebbleEnd
s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds())
cb.OnLogSync(ctx, m.Responses)
cb.OnLogSync(ctx, m.Responses, storage.BatchCommitStats{})
}
}
stats.Sync = wantsSync
Expand Down Expand Up @@ -325,6 +331,8 @@ type nonBlockingSyncWaiterCallback struct {
ctx context.Context
cb SyncCallback
msgs []raftpb.Message
// Used to extract stats. This is the batch that has been synced.
batch storage.WriteBatch
// Used to record Metrics.
metrics Metrics
logCommitBegin time.Time
Expand All @@ -334,7 +342,8 @@ type nonBlockingSyncWaiterCallback struct {
func (cb *nonBlockingSyncWaiterCallback) run() {
dur := timeutil.Since(cb.logCommitBegin).Nanoseconds()
cb.metrics.RaftLogCommitLatency.RecordValue(dur)
cb.cb.OnLogSync(cb.ctx, cb.msgs)
commitStats := cb.batch.CommitStats()
cb.cb.OnLogSync(cb.ctx, cb.msgs, commitStats)
cb.release()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/logstore/logstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (b *discardBatch) Commit(bool) error {

type noopSyncCallback struct{}

func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {}
func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message, storage.BatchCommitStats) {}

func BenchmarkLogStore_StoreEntries(b *testing.B) {
defer log.Scope(b).Close(b)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/logstore/sync_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
if err := w.wg.SyncWait(); err != nil {
log.Fatalf(ctx, "SyncWait error: %+v", err)
}
w.wg.Close()
w.cb.run()
w.wg.Close()
case <-stopper.ShouldQuiesce():
return
}
Expand All @@ -108,7 +108,9 @@ func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) {
// durably committed. It may never be called in case the stopper stops.
//
// The syncWaiter will be Closed after its SyncWait method completes. It must
// not be Closed by the caller.
// not be Closed by the caller. The cb is called before the syncWaiter is
// closed, in case the cb implementation needs to extract something form the
// syncWaiter.
//
// If the SyncWaiterLoop has already been stopped, the callback will never be
// called.
Expand Down
80 changes: 80 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,61 @@ bytes preserved during flushes and compactions over the lifetime of the process.
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBatchCommitCount = metric.Metadata{
Name: "storage.batch-commit.count",
Help: "Count of batch commits. See storage.AggregatedBatchCommitStats for details.",
Measurement: "Commit Ops",
Unit: metric.Unit_COUNT,
}
metaBatchCommitDuration = metric.Metadata{
Name: "storage.batch-commit.duration",
Help: "Cumulative time spent in batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitSemWaitDuration = metric.Metadata{
Name: "storage.batch-commit.sem-wait.duration",
Help: "Cumulative time spent in semaphore wait, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitWALQWaitDuration = metric.Metadata{
Name: "storage.batch-commit.wal-queue-wait.duration",
Help: "Cumulative time spent waiting for memory blocks in the WAL queue, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitMemStallDuration = metric.Metadata{
Name: "storage.batch-commit.mem-stall.duration",
Help: "Cumulative time spent in a write stall due to too many memtables, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitL0StallDuration = metric.Metadata{
Name: "storage.batch-commit.l0-stall.duration",
Help: "Cumulative time spent in a write stall due to high read amplification in L0, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitWALRotDuration = metric.Metadata{
Name: "storage.batch-commit.wal-rotation.duration",
Help: "Cumulative time spent waiting for WAL rotation, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaBatchCommitCommitWaitDuration = metric.Metadata{
Name: "storage.batch-commit.commit-wait.duration",
Help: "Cumulative time spent waiting for WAL sync, for batch commit. " +
"See storage.AggregatedBatchCommitStats for details.",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

var (
Expand Down Expand Up @@ -2068,6 +2123,14 @@ type StoreMetrics struct {
FlushableIngestCount *metric.Gauge
FlushableIngestTableCount *metric.Gauge
FlushableIngestTableSize *metric.Gauge
BatchCommitCount *metric.Gauge
BatchCommitDuration *metric.Gauge
BatchCommitSemWaitDuration *metric.Gauge
BatchCommitWALQWaitDuration *metric.Gauge
BatchCommitMemStallDuration *metric.Gauge
BatchCommitL0StallDuration *metric.Gauge
BatchCommitWALRotWaitDuration *metric.Gauge
BatchCommitCommitWaitDuration *metric.Gauge

RdbCheckpoints *metric.Gauge

Expand Down Expand Up @@ -2684,6 +2747,14 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
FlushableIngestCount: metric.NewGauge(metaFlushableIngestCount),
FlushableIngestTableCount: metric.NewGauge(metaFlushableIngestTableCount),
FlushableIngestTableSize: metric.NewGauge(metaFlushableIngestTableBytes),
BatchCommitCount: metric.NewGauge(metaBatchCommitCount),
BatchCommitDuration: metric.NewGauge(metaBatchCommitDuration),
BatchCommitSemWaitDuration: metric.NewGauge(metaBatchCommitSemWaitDuration),
BatchCommitWALQWaitDuration: metric.NewGauge(metaBatchCommitWALQWaitDuration),
BatchCommitMemStallDuration: metric.NewGauge(metaBatchCommitMemStallDuration),
BatchCommitL0StallDuration: metric.NewGauge(metaBatchCommitL0StallDuration),
BatchCommitWALRotWaitDuration: metric.NewGauge(metaBatchCommitWALRotDuration),
BatchCommitCommitWaitDuration: metric.NewGauge(metaBatchCommitCommitWaitDuration),

RdbCheckpoints: metric.NewGauge(metaRdbCheckpoints),

Expand Down Expand Up @@ -3035,6 +3106,15 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.FlushableIngestCount.Update(int64(m.Flush.AsIngestCount))
sm.FlushableIngestTableCount.Update(int64(m.Flush.AsIngestTableCount))
sm.FlushableIngestTableSize.Update(int64(m.Flush.AsIngestBytes))
sm.BatchCommitCount.Update(int64(m.BatchCommitStats.Count))
sm.BatchCommitDuration.Update(int64(m.BatchCommitStats.TotalDuration))
sm.BatchCommitSemWaitDuration.Update(int64(m.BatchCommitStats.SemaphoreWaitDuration))
sm.BatchCommitWALQWaitDuration.Update(int64(m.BatchCommitStats.WALQueueWaitDuration))
sm.BatchCommitMemStallDuration.Update(int64(m.BatchCommitStats.MemTableWriteStallDuration))
sm.BatchCommitL0StallDuration.Update(int64(m.BatchCommitStats.L0ReadAmpWriteStallDuration))
sm.BatchCommitWALRotWaitDuration.Update(int64(m.BatchCommitStats.WALRotationDuration))
sm.BatchCommitCommitWaitDuration.Update(int64(m.BatchCommitStats.CommitWaitDuration))

// Update the maximum number of L0 sub-levels seen.
sm.l0SublevelsTracker.Lock()
sm.l0SublevelsTracker.swag.Record(timeutil.Now(), float64(m.Levels[0].Sublevels))
Expand Down
13 changes: 11 additions & 2 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf(", snapshot ignored")
}
}
if !(s.append.PebbleCommitStats == storage.BatchCommitStats{}) {
p.Printf(" pebble stats: [%s]", s.append.PebbleCommitStats)
}
}

func (s handleRaftReadyStats) String() string {
Expand Down Expand Up @@ -1515,9 +1518,15 @@ func (r *Replica) maybeCoalesceHeartbeat(
// replicaSyncCallback implements the logstore.SyncCallback interface.
type replicaSyncCallback Replica

func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) {
func (r *replicaSyncCallback) OnLogSync(
ctx context.Context, msgs []raftpb.Message, commitStats storage.BatchCommitStats,
) {
repl := (*Replica)(r)
// Send MsgStorageAppend's responses.
(*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */)
repl.sendRaftMessages(ctx, msgs, nil /* blocked */, false /* willDeliverLocal */)
if commitStats.TotalDuration > defaultReplicaRaftMuWarnThreshold {
log.Infof(repl.raftCtx, "slow non-blocking raft commit: %s", commitStats)
}
}

// sendRaftMessages sends a slice of Raft messages.
Expand Down
15 changes: 14 additions & 1 deletion pkg/kv/kvserver/replica_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/redact"
"github.com/stretchr/testify/assert"
"go.etcd.io/raft/v3/tracker"
Expand Down Expand Up @@ -98,7 +100,18 @@ func Test_handleRaftReadyStats_SafeFormat(t *testing.T) {
PebbleBegin: ts(3),
PebbleEnd: ts(4),
PebbleBytes: 1024 * 5,
Sync: true,
PebbleCommitStats: storage.BatchCommitStats{
BatchCommitStats: pebble.BatchCommitStats{
TotalDuration: 100 * time.Millisecond,
SemaphoreWaitDuration: 2 * time.Millisecond,
WALQueueWaitDuration: 5 * time.Millisecond,
MemTableWriteStallDuration: 8 * time.Millisecond,
L0ReadAmpWriteStallDuration: 11 * time.Millisecond,
WALRotationDuration: 14 * time.Millisecond,
CommitWaitDuration: 17 * time.Millisecond,
},
},
Sync: true,
},
tSnapBegin: ts(4),
tSnapEnd: ts(5),
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,10 @@ func (s spanSetBatch) Repr() []byte {
return s.b.Repr()
}

func (s spanSetBatch) CommitStats() storage.BatchCommitStats {
return s.b.CommitStats()
}

// NewBatch returns a storage.Batch that asserts access of the underlying
// Batch against the given SpanSet. We only consider span boundaries, associated
// timestamps are not considered.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/testdata/handle_raft_ready_stats.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied
raft ready handling: 5.00s [append=1.00s, apply=1.00s, sync=1.00s, snap=1.00s, other=1.00s], wrote [append-batch=5.0 KiB, append-ent=1.0 KiB (7), append-sst=5.0 MiB (3), apply=3 B (2 in 9 batches)], state_assertions=4, snapshot applied pebble stats: [commit-wait 17ms wal-q 5ms mem-stall 8ms l0-stall 11ms wal-rot 14ms sem 2ms]
44 changes: 42 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
prometheusgo "github.com/prometheus/client_model/go"
)

Expand Down Expand Up @@ -392,7 +393,7 @@ type EngineIterator interface {
// a clone of an existing iterator.
type CloneContext struct {
rawIter pebbleiter.Iterator
statsReporter statsReporter
statsReporter iterStatsReporter
}

// IterOptions contains options used to create an {MVCC,Engine}Iterator.
Expand Down Expand Up @@ -1024,13 +1025,43 @@ type WriteBatch interface {
// Repr returns the underlying representation of the batch and can be used to
// reconstitute the batch on a remote node using Writer.ApplyBatchRepr().
Repr() []byte
// CommitStats returns stats related to committing the batch. Should be
// called after Batch.Commit. If CommitNoSyncWait is used, it should be
// called after the call to SyncWait.
CommitStats() BatchCommitStats
}

type BatchCommitStats struct {
pebble.BatchCommitStats
}

// SafeFormat implements redact.SafeFormatter. It does not print the total
// duration.
func (stats BatchCommitStats) SafeFormat(p redact.SafePrinter, _ rune) {
p.Printf("commit-wait %s", stats.CommitWaitDuration)
if stats.WALQueueWaitDuration > 0 {
p.Printf(" wal-q %s", stats.WALQueueWaitDuration)
}
if stats.MemTableWriteStallDuration > 0 {
p.Printf(" mem-stall %s", stats.MemTableWriteStallDuration)
}
if stats.L0ReadAmpWriteStallDuration > 0 {
p.Printf(" l0-stall %s", stats.L0ReadAmpWriteStallDuration)
}
if stats.WALRotationDuration > 0 {
p.Printf(" wal-rot %s", stats.WALRotationDuration)
}
if stats.SemaphoreWaitDuration > 0 {
p.Printf(" sem %s", stats.SemaphoreWaitDuration)
}
}

// Metrics is a set of Engine metrics. Most are contained in the embedded
// *pebble.Metrics struct, which has its own documentation.
type Metrics struct {
*pebble.Metrics
Iterator AggregatedIteratorStats
Iterator AggregatedIteratorStats
BatchCommitStats AggregatedBatchCommitStats
// DiskSlowCount counts the number of times Pebble records disk slowness.
DiskSlowCount int64
// DiskStallCount counts the number of times Pebble observes slow writes
Expand Down Expand Up @@ -1092,6 +1123,15 @@ type AggregatedIteratorStats struct {
InternalSteps int
}

// AggregatedBatchCommitStats hold cumulative stats summed over all the
// batches that committed at the engine. Since these are durations, only the
// mean (over an interval) can be recovered. We can change some of these to
// histograms once we know which ones are more useful.
type AggregatedBatchCommitStats struct {
Count uint64
BatchCommitStats
}

// MetricsForInterval is a set of pebble.Metrics that need to be saved in order to
// compute metrics according to an interval.
type MetricsForInterval struct {
Expand Down
Loading

0 comments on commit 4e3ea0a

Please sign in to comment.