Skip to content

Commit

Permalink
metric: expose pebble fsync latency with a callback
Browse files Browse the repository at this point in the history
The fsync latency was computed using an HDR histogram however this
histogram generates too many buckets of various widths.  In order to
allow clients to store the fsync latency in their preferred format,  a
callback was introduced. This callback is defined on the
`pebble.Options` which enables clients to provide a callback function
which will be triggered each time pebble produces the fsync latency
metrics.

Additionally the `record.NewLogWriter` method was updated to take in a
configuration struct which configures the `LogWriter` with the specified
callbacks.
  • Loading branch information
coolcom200 committed Oct 11, 2022
1 parent c53a4cc commit e88f67e
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 176 deletions.
4 changes: 2 additions & 2 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
}

// A basic commitEnv which writes to a WAL.
wal := record.NewLogWriter(sf, 0 /* logNum */)
wal := record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{})
var walDone sync.WaitGroup
testEnv := commitEnv{
logSeqNum: new(uint64),
Expand Down Expand Up @@ -235,7 +235,7 @@ func BenchmarkCommitPipeline(b *testing.B) {
b.Run(fmt.Sprintf("parallel=%d", parallelism), func(b *testing.B) {
b.SetParallelism(parallelism)
mem := newMemTable(memTableOptions{})
wal := record.NewLogWriter(io.Discard, 0 /* logNum */)
wal := record.NewLogWriter(io.Discard, 0 /* logNum */, record.LogWriterConfig{})

nullCommitEnv := commitEnv{
logSeqNum: new(uint64),
Expand Down
6 changes: 4 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1949,8 +1949,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error {

if !d.opts.DisableWAL {
d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize})
d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum)
d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
OnFsync: d.opts.MetricEventListener.WALFsyncLatency,
WALMinSyncInterval: d.opts.WALMinSyncInterval,
})
}

immMem := d.mu.mem.mutable
Expand Down
7 changes: 5 additions & 2 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
BytesPerSync: d.opts.WALBytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})
d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum)
d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
logWriterConfig := record.LogWriterConfig{
WALMinSyncInterval: d.opts.WALMinSyncInterval,
OnFsync: d.opts.MetricEventListener.WALFsyncLatency,
}
d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
d.mu.versions.metrics.WAL.Files++
}
d.updateReadStateLocked(d.opts.DebugCheck)
Expand Down
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ type Options struct {
// The default merger concatenates values.
Merger *Merger

// MetricEventListener contains a set of callbacks that will be invoked when
// various metric related events occur.
MetricEventListener MetricEventListener

// MaxConcurrentCompactions specifies the maximum number of concurrent
// compactions. The default is 1. Concurrent compactions are performed
// - when L0 read-amplification passes the L0CompactionConcurrency threshold
Expand Down Expand Up @@ -759,6 +763,7 @@ type Options struct {
// changing options dynamically?
WALMinSyncInterval func() time.Duration


// private options are only used by internal tests or are used internally
// for facilitating upgrade paths of unconfigurable functionality.
private struct {
Expand Down Expand Up @@ -794,6 +799,14 @@ type Options struct {
}
}

// MetricEventListener is a struct that contains the callbacks that pebble will
// call when metric related events occur
type MetricEventListener struct {
// WALFysncLatency is invoked after each WAL Fsync. As a result, this function
// should not run for an excessive amount of time.
WALFsyncLatency func(duration time.Duration)
}

// DebugCheckLevels calls CheckLevels on the provided database.
// It may be set in the DebugCheck field of Options to check
// level invariants whenever a new version is installed.
Expand Down
89 changes: 26 additions & 63 deletions record/log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import (
"sync/atomic"
"time"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/crc"
"github.com/cockroachdb/pebble/internal/invariants"
)

var walSyncLabels = pprof.Labels("pebble", "wal-sync")
Expand Down Expand Up @@ -229,6 +227,7 @@ func (c *flusherCond) Unlock() {
}

type durationFunc func() time.Duration
type recordValueFunc func(value time.Duration)

// syncTimer is an interface for timers, modeled on the closure callback mode
// of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used
Expand Down Expand Up @@ -281,10 +280,11 @@ type LogWriter struct {
// Accumulated flush error.
err error
// minSyncInterval is the minimum duration between syncs.
minSyncInterval durationFunc
pending []*block
syncQ syncQueue
metrics *LogWriterMetrics
minSyncInterval durationFunc
onFsyncLatencyMetric recordValueFunc
pending []*block
syncQ syncQueue
metrics *LogWriterMetrics
}

// afterFunc is a hook to allow tests to mock out the timer functionality
Expand All @@ -293,12 +293,18 @@ type LogWriter struct {
afterFunc func(d time.Duration, f func()) syncTimer
}

// LogWriterConfig is a struct used for configuring new LogWriters
type LogWriterConfig struct {
WALMinSyncInterval durationFunc
OnFsync recordValueFunc
}

// CapAllocatedBlocks is the maximum number of blocks allocated by the
// LogWriter.
const CapAllocatedBlocks = 16

// NewLogWriter returns a new LogWriter.
func NewLogWriter(w io.Writer, logNum base.FileNum) *LogWriter {
func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter {
c, _ := w.(io.Closer)
s, _ := w.(syncer)
r := &LogWriter{
Expand All @@ -322,25 +328,17 @@ func NewLogWriter(w io.Writer, logNum base.FileNum) *LogWriter {
r.flusher.closed = make(chan struct{})
r.flusher.pending = make([]*block, 0, cap(r.free.blocks))
r.flusher.metrics = &LogWriterMetrics{}
// Histogram with max value of 30s. We are not trying to detect anomalies
// with this, and normally latencies range from 0.5ms to 25ms.
r.flusher.metrics.SyncLatencyMicros = hdrhistogram.New(
0, (time.Second * 30).Microseconds(), 2)

f := &r.flusher
f.minSyncInterval = logWriterConfig.WALMinSyncInterval
f.onFsyncLatencyMetric = logWriterConfig.OnFsync

go func() {
pprof.Do(context.Background(), walSyncLabels, r.flushLoop)
}()
return r
}

// SetMinSyncInterval sets the closure to invoke for retrieving the minimum
// sync duration between syncs.
func (w *LogWriter) SetMinSyncInterval(minSyncInterval durationFunc) {
f := &w.flusher
f.Lock()
f.minSyncInterval = minSyncInterval
f.Unlock()
}

func (w *LogWriter) flushLoop(context.Context) {
f := &w.flusher
f.Lock()
Expand Down Expand Up @@ -453,8 +451,8 @@ func (w *LogWriter) flushLoop(context.Context) {
f.Unlock()
synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail)
f.Lock()
if synced {
f.metrics.SyncLatencyMicros.RecordValue(syncLatency.Microseconds())
if synced && f.onFsyncLatencyMetric != nil {
f.onFsyncLatencyMetric(syncLatency)
}
f.err = err
if f.err != nil {
Expand Down Expand Up @@ -608,7 +606,9 @@ func (w *LogWriter) Close() error {
syncLatency, err = w.syncWithLatency()
}
f.Lock()
f.metrics.SyncLatencyMicros.RecordValue(syncLatency.Microseconds())
if f.onFsyncLatencyMetric != nil {
f.onFsyncLatencyMetric(syncLatency)
}
f.Unlock()

if w.c != nil {
Expand Down Expand Up @@ -731,52 +731,15 @@ func (w *LogWriter) Metrics() *LogWriterMetrics {

// LogWriterMetrics contains misc metrics for the log writer.
type LogWriterMetrics struct {
WriteThroughput base.ThroughputMetric
PendingBufferLen base.GaugeSampleMetric
SyncQueueLen base.GaugeSampleMetric
SyncLatencyMicros *hdrhistogram.Histogram
WriteThroughput base.ThroughputMetric
PendingBufferLen base.GaugeSampleMetric
SyncQueueLen base.GaugeSampleMetric
}

// Merge merges metrics from x. Requires that x is non-nil.
func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error {
m.WriteThroughput.Merge(x.WriteThroughput)
m.PendingBufferLen.Merge(x.PendingBufferLen)
m.SyncQueueLen.Merge(x.SyncQueueLen)
if x.SyncLatencyMicros == nil {
return nil
} else if m.SyncLatencyMicros == nil {
m.SyncLatencyMicros = hdrhistogram.Import(x.SyncLatencyMicros.Export())
} else {
dropped := m.SyncLatencyMicros.Merge(x.SyncLatencyMicros)
if dropped > 0 {
// This should never happen since we use a consistent min, max when
// creating these histograms, and out-of-range is the only reason for the
// merge to drop samples.
return errors.Errorf("sync latency histogram merge dropped %d samples", dropped)
}
}
return nil
}

// Subtract merges metrics from x. Requires that x is non-nil.
func (m *LogWriterMetrics) Subtract(x *LogWriterMetrics) {
m.WriteThroughput.Subtract(x.WriteThroughput)
m.PendingBufferLen.Subtract(x.PendingBufferLen)
m.SyncQueueLen.Subtract(x.SyncQueueLen)
if x.SyncLatencyMicros != nil {
m.SyncLatencyMicros = subtract(m.SyncLatencyMicros, x.SyncLatencyMicros)
}
}

// subtract returns a hdrhistogram.Histogram produce from subtracting two histograms
func subtract(h *hdrhistogram.Histogram, g *hdrhistogram.Histogram) *hdrhistogram.Histogram {
snapg, snaph := g.Export(), h.Export()
for i := range snapg.Counts {
difference := snaph.Counts[i] - snapg.Counts[i]
if invariants.Enabled && difference < 0 {
panic("histogram subtraction lead to unexpected negative counts in buckets")
}
snaph.Counts[i] = difference
}
return hdrhistogram.Import(snaph)
}
Loading

0 comments on commit e88f67e

Please sign in to comment.