From e88f67e2d3dc51c8c3b15320bedd629dc3c3e069 Mon Sep 17 00:00:00 2001 From: Leon Fattakhov Date: Mon, 10 Oct 2022 10:22:38 -0400 Subject: [PATCH] metric: expose pebble fsync latency with a callback The fsync latency was computed using an HDR histogram however this histogram generates too many buckets of various widths. In order to allow clients to store the fsync latency in their preferred format, a callback was introduced. This callback is defined on the `pebble.Options` which enables clients to provide a callback function which will be triggered each time pebble produces the fsync latency metrics. Additionally the `record.NewLogWriter` method was updated to take in a configuration struct which configures the `LogWriter` with the specified callbacks. --- commit_test.go | 4 +- db.go | 6 +- open.go | 7 ++- options.go | 13 +++++ record/log_writer.go | 89 +++++++++-------------------- record/log_writer_test.go | 115 +++++++------------------------------- record/record_test.go | 22 ++++---- 7 files changed, 80 insertions(+), 176 deletions(-) diff --git a/commit_test.go b/commit_test.go index 9d97864132..a9ee3266ae 100644 --- a/commit_test.go +++ b/commit_test.go @@ -184,7 +184,7 @@ func TestCommitPipelineWALClose(t *testing.T) { } // A basic commitEnv which writes to a WAL. - wal := record.NewLogWriter(sf, 0 /* logNum */) + wal := record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{}) var walDone sync.WaitGroup testEnv := commitEnv{ logSeqNum: new(uint64), @@ -235,7 +235,7 @@ func BenchmarkCommitPipeline(b *testing.B) { b.Run(fmt.Sprintf("parallel=%d", parallelism), func(b *testing.B) { b.SetParallelism(parallelism) mem := newMemTable(memTableOptions{}) - wal := record.NewLogWriter(io.Discard, 0 /* logNum */) + wal := record.NewLogWriter(io.Discard, 0 /* logNum */, record.LogWriterConfig{}) nullCommitEnv := commitEnv{ logSeqNum: new(uint64), diff --git a/db.go b/db.go index b7a051aff0..da683488cb 100644 --- a/db.go +++ b/db.go @@ -1949,8 +1949,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error { if !d.opts.DisableWAL { d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize}) - d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum) - d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval) + d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{ + OnFsync: d.opts.MetricEventListener.WALFsyncLatency, + WALMinSyncInterval: d.opts.WALMinSyncInterval, + }) } immMem := d.mu.mem.mutable diff --git a/open.go b/open.go index bef870042f..456a5d248b 100644 --- a/open.go +++ b/open.go @@ -418,8 +418,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) { BytesPerSync: d.opts.WALBytesPerSync, PreallocateSize: d.walPreallocateSize(), }) - d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum) - d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval) + logWriterConfig := record.LogWriterConfig{ + WALMinSyncInterval: d.opts.WALMinSyncInterval, + OnFsync: d.opts.MetricEventListener.WALFsyncLatency, + } + d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig) d.mu.versions.metrics.WAL.Files++ } d.updateReadStateLocked(d.opts.DebugCheck) diff --git a/options.go b/options.go index 09cfc0271e..f205fbb2f8 100644 --- a/options.go +++ b/options.go @@ -683,6 +683,10 @@ type Options struct { // The default merger concatenates values. Merger *Merger + // MetricEventListener contains a set of callbacks that will be invoked when + // various metric related events occur. + MetricEventListener MetricEventListener + // MaxConcurrentCompactions specifies the maximum number of concurrent // compactions. The default is 1. Concurrent compactions are performed // - when L0 read-amplification passes the L0CompactionConcurrency threshold @@ -759,6 +763,7 @@ type Options struct { // changing options dynamically? WALMinSyncInterval func() time.Duration + // private options are only used by internal tests or are used internally // for facilitating upgrade paths of unconfigurable functionality. private struct { @@ -794,6 +799,14 @@ type Options struct { } } +// MetricEventListener is a struct that contains the callbacks that pebble will +// call when metric related events occur +type MetricEventListener struct { + // WALFysncLatency is invoked after each WAL Fsync. As a result, this function + // should not run for an excessive amount of time. + WALFsyncLatency func(duration time.Duration) +} + // DebugCheckLevels calls CheckLevels on the provided database. // It may be set in the DebugCheck field of Options to check // level invariants whenever a new version is installed. diff --git a/record/log_writer.go b/record/log_writer.go index 8220a8c663..d1061d39ea 100644 --- a/record/log_writer.go +++ b/record/log_writer.go @@ -13,11 +13,9 @@ import ( "sync/atomic" "time" - "github.com/HdrHistogram/hdrhistogram-go" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/crc" - "github.com/cockroachdb/pebble/internal/invariants" ) var walSyncLabels = pprof.Labels("pebble", "wal-sync") @@ -229,6 +227,7 @@ func (c *flusherCond) Unlock() { } type durationFunc func() time.Duration +type recordValueFunc func(value time.Duration) // syncTimer is an interface for timers, modeled on the closure callback mode // of time.Timer. See time.AfterFunc and LogWriter.afterFunc. syncTimer is used @@ -281,10 +280,11 @@ type LogWriter struct { // Accumulated flush error. err error // minSyncInterval is the minimum duration between syncs. - minSyncInterval durationFunc - pending []*block - syncQ syncQueue - metrics *LogWriterMetrics + minSyncInterval durationFunc + onFsyncLatencyMetric recordValueFunc + pending []*block + syncQ syncQueue + metrics *LogWriterMetrics } // afterFunc is a hook to allow tests to mock out the timer functionality @@ -293,12 +293,18 @@ type LogWriter struct { afterFunc func(d time.Duration, f func()) syncTimer } +// LogWriterConfig is a struct used for configuring new LogWriters +type LogWriterConfig struct { + WALMinSyncInterval durationFunc + OnFsync recordValueFunc +} + // CapAllocatedBlocks is the maximum number of blocks allocated by the // LogWriter. const CapAllocatedBlocks = 16 // NewLogWriter returns a new LogWriter. -func NewLogWriter(w io.Writer, logNum base.FileNum) *LogWriter { +func NewLogWriter(w io.Writer, logNum base.FileNum, logWriterConfig LogWriterConfig) *LogWriter { c, _ := w.(io.Closer) s, _ := w.(syncer) r := &LogWriter{ @@ -322,25 +328,17 @@ func NewLogWriter(w io.Writer, logNum base.FileNum) *LogWriter { r.flusher.closed = make(chan struct{}) r.flusher.pending = make([]*block, 0, cap(r.free.blocks)) r.flusher.metrics = &LogWriterMetrics{} - // Histogram with max value of 30s. We are not trying to detect anomalies - // with this, and normally latencies range from 0.5ms to 25ms. - r.flusher.metrics.SyncLatencyMicros = hdrhistogram.New( - 0, (time.Second * 30).Microseconds(), 2) + + f := &r.flusher + f.minSyncInterval = logWriterConfig.WALMinSyncInterval + f.onFsyncLatencyMetric = logWriterConfig.OnFsync + go func() { pprof.Do(context.Background(), walSyncLabels, r.flushLoop) }() return r } -// SetMinSyncInterval sets the closure to invoke for retrieving the minimum -// sync duration between syncs. -func (w *LogWriter) SetMinSyncInterval(minSyncInterval durationFunc) { - f := &w.flusher - f.Lock() - f.minSyncInterval = minSyncInterval - f.Unlock() -} - func (w *LogWriter) flushLoop(context.Context) { f := &w.flusher f.Lock() @@ -453,8 +451,8 @@ func (w *LogWriter) flushLoop(context.Context) { f.Unlock() synced, syncLatency, bytesWritten, err := w.flushPending(data, pending, head, tail) f.Lock() - if synced { - f.metrics.SyncLatencyMicros.RecordValue(syncLatency.Microseconds()) + if synced && f.onFsyncLatencyMetric != nil { + f.onFsyncLatencyMetric(syncLatency) } f.err = err if f.err != nil { @@ -608,7 +606,9 @@ func (w *LogWriter) Close() error { syncLatency, err = w.syncWithLatency() } f.Lock() - f.metrics.SyncLatencyMicros.RecordValue(syncLatency.Microseconds()) + if f.onFsyncLatencyMetric != nil { + f.onFsyncLatencyMetric(syncLatency) + } f.Unlock() if w.c != nil { @@ -731,10 +731,9 @@ func (w *LogWriter) Metrics() *LogWriterMetrics { // LogWriterMetrics contains misc metrics for the log writer. type LogWriterMetrics struct { - WriteThroughput base.ThroughputMetric - PendingBufferLen base.GaugeSampleMetric - SyncQueueLen base.GaugeSampleMetric - SyncLatencyMicros *hdrhistogram.Histogram + WriteThroughput base.ThroughputMetric + PendingBufferLen base.GaugeSampleMetric + SyncQueueLen base.GaugeSampleMetric } // Merge merges metrics from x. Requires that x is non-nil. @@ -742,41 +741,5 @@ func (m *LogWriterMetrics) Merge(x *LogWriterMetrics) error { m.WriteThroughput.Merge(x.WriteThroughput) m.PendingBufferLen.Merge(x.PendingBufferLen) m.SyncQueueLen.Merge(x.SyncQueueLen) - if x.SyncLatencyMicros == nil { - return nil - } else if m.SyncLatencyMicros == nil { - m.SyncLatencyMicros = hdrhistogram.Import(x.SyncLatencyMicros.Export()) - } else { - dropped := m.SyncLatencyMicros.Merge(x.SyncLatencyMicros) - if dropped > 0 { - // This should never happen since we use a consistent min, max when - // creating these histograms, and out-of-range is the only reason for the - // merge to drop samples. - return errors.Errorf("sync latency histogram merge dropped %d samples", dropped) - } - } return nil } - -// Subtract merges metrics from x. Requires that x is non-nil. -func (m *LogWriterMetrics) Subtract(x *LogWriterMetrics) { - m.WriteThroughput.Subtract(x.WriteThroughput) - m.PendingBufferLen.Subtract(x.PendingBufferLen) - m.SyncQueueLen.Subtract(x.SyncQueueLen) - if x.SyncLatencyMicros != nil { - m.SyncLatencyMicros = subtract(m.SyncLatencyMicros, x.SyncLatencyMicros) - } -} - -// subtract returns a hdrhistogram.Histogram produce from subtracting two histograms -func subtract(h *hdrhistogram.Histogram, g *hdrhistogram.Histogram) *hdrhistogram.Histogram { - snapg, snaph := g.Export(), h.Export() - for i := range snapg.Counts { - difference := snaph.Counts[i] - snapg.Counts[i] - if invariants.Enabled && difference < 0 { - panic("histogram subtraction lead to unexpected negative counts in buckets") - } - snaph.Counts[i] = difference - } - return hdrhistogram.Import(snaph) -} diff --git a/record/log_writer_test.go b/record/log_writer_test.go index 1f14b673db..df7ad776ef 100644 --- a/record/log_writer_test.go +++ b/record/log_writer_test.go @@ -13,7 +13,6 @@ import ( "github.com/HdrHistogram/hdrhistogram-go" "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -138,7 +137,7 @@ func TestSyncError(t *testing.T) { require.NoError(t, err) injectedErr := errors.New("injected error") - w := NewLogWriter(syncErrorFile{f, injectedErr}, 0) + w := NewLogWriter(syncErrorFile{f, injectedErr}, 0, LogWriterConfig{}) syncRecord := func() { var syncErr error @@ -176,7 +175,7 @@ func (f *syncFile) Sync() error { func TestSyncRecord(t *testing.T) { f := &syncFile{} - w := NewLogWriter(f, 0) + w := NewLogWriter(f, 0, LogWriterConfig{}) var syncErr error for i := 0; i < 100000; i++ { @@ -222,9 +221,10 @@ func TestMinSyncInterval(t *testing.T) { const minSyncInterval = 100 * time.Millisecond f := &syncFile{} - w := NewLogWriter(f, 0) - w.SetMinSyncInterval(func() time.Duration { - return minSyncInterval + w := NewLogWriter(f, 0, LogWriterConfig{ + WALMinSyncInterval: func() time.Duration { + return minSyncInterval + }, }) var timer fakeTimer @@ -291,9 +291,10 @@ func TestMinSyncIntervalClose(t *testing.T) { const minSyncInterval = 100 * time.Millisecond f := &syncFile{} - w := NewLogWriter(f, 0) - w.SetMinSyncInterval(func() time.Duration { - return minSyncInterval + w := NewLogWriter(f, 0, LogWriterConfig{ + WALMinSyncInterval: func() time.Duration { + return minSyncInterval + }, }) var timer fakeTimer @@ -343,7 +344,7 @@ func (f *syncFileWithWait) Sync() error { func TestMetricsWithoutSync(t *testing.T) { f := &syncFileWithWait{} f.writeWG.Add(1) - w := NewLogWriter(f, 0) + w := NewLogWriter(f, 0, LogWriterConfig{}) offset, err := w.SyncRecord([]byte("hello"), nil, nil) require.NoError(t, err) const recordSize = 16 @@ -372,7 +373,13 @@ func TestMetricsWithoutSync(t *testing.T) { func TestMetricsWithSync(t *testing.T) { f := &syncFileWithWait{} f.syncWG.Add(1) - w := NewLogWriter(f, 0) + syncLatencyMicros := hdrhistogram.New(0, (time.Second * 30).Microseconds(), 2) + w := NewLogWriter(f, 0, LogWriterConfig{ + OnFsync: func(duration time.Duration) { + err := syncLatencyMicros.RecordValue(duration.Microseconds()) + require.NoError(t, err) + }, + }) var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { @@ -392,90 +399,6 @@ func TestMetricsWithSync(t *testing.T) { // Allow for some inaccuracy in sleep and for two syncs, one of which was // fast. require.LessOrEqual(t, int64(syncLatency/(2*time.Microsecond)), - m.SyncLatencyMicros.ValueAtQuantile(90)) + syncLatencyMicros.ValueAtQuantile(90)) require.LessOrEqual(t, int64(syncLatency/2), int64(m.WriteThroughput.WorkDuration)) } - -func TestLogWriterMetricsMergeWithNilSyncLatency(t *testing.T) { - lm1 := LogWriterMetrics{} - lm2 := LogWriterMetrics{ - SyncLatencyMicros: hdrhistogram.New(1, 1, 1), - } - err := lm1.Merge(&lm2) - if err != nil { - require.Errorf(t, err, "Unexpected error when merging two LogWriterMetrics") - } - require.Equal(t, lm2.SyncLatencyMicros, lm1.SyncLatencyMicros) -} - -const significantValueDigits = 3 -const lowestDiscernibleValue = 1 -const highestTrackableValue = 1000 - -func TestSubtractToZeroCounts(t *testing.T) { - h1 := hdrhistogram.New(lowestDiscernibleValue, highestTrackableValue, significantValueDigits) - for i := 0; i < 100; i++ { - handleRecordValue(t, h1, i) - } - - h1 = subtract(h1, h1) - - if v, want := h1.ValueAtQuantile(50), int64(0); v != want { - t.Errorf("Median was %v, but expected %v", v, want) - } -} - -func TestSubtractAfterAdd(t *testing.T) { - h1 := hdrhistogram.New(lowestDiscernibleValue, 5, significantValueDigits) - handleRecordValues(t, h1, 1, 1) - handleRecordValues(t, h1, 2, 2) - handleRecordValues(t, h1, 3, 3) - handleRecordValues(t, h1, 4, 2) - handleRecordValues(t, h1, 5, 1) - - h2 := hdrhistogram.New(lowestDiscernibleValue, 10, significantValueDigits) - handleRecordValues(t, h2, 5, 1) - handleRecordValues(t, h2, 6, 2) - handleRecordValues(t, h2, 7, 3) - handleRecordValues(t, h2, 8, 10) - handleRecordValues(t, h2, 9, 1) - - h1Original := hdrhistogram.Import(h1.Export()) - h1.Merge(h2) - - if v, want := h1.ValueAtQuantile(50), int64(7); v != want { - t.Errorf("Median was %v, but expected %v", v, want) - } - - h3 := subtract(h1, h2) - if !h1Original.Equals(h3) { - t.Errorf("Expected Histograms to be equal") - } -} - -func TestLogWriterSubtractInvalidHistograms(t *testing.T) { - lwm := LogWriterMetrics{ - WriteThroughput: base.ThroughputMetric{ - Bytes: 0, - WorkDuration: 0, - IdleDuration: 0, - }, - PendingBufferLen: base.GaugeSampleMetric{}, - SyncQueueLen: base.GaugeSampleMetric{}, - SyncLatencyMicros: nil, - } - lwm.Subtract(&lwm) - require.Nil(t, lwm.SyncLatencyMicros) - -} - -func handleRecordValues(t *testing.T, h *hdrhistogram.Histogram, valueToRecord int, n int) { - err := h.RecordValues(int64(valueToRecord), int64(n)) - if err != nil { - t.Fatal(err) - } -} - -func handleRecordValue(t *testing.T, h *hdrhistogram.Histogram, valueToRecord int) { - handleRecordValues(t, h, valueToRecord, 1) -} diff --git a/record/record_test.go b/record/record_test.go index 8d2e87cc4b..d6ba14c377 100644 --- a/record/record_test.go +++ b/record/record_test.go @@ -90,7 +90,7 @@ func testGenerator(t *testing.T, reset func(), gen func() (string, bool)) { t.Run("LogWriter", func(t *testing.T) { testGeneratorWriter(t, reset, gen, func(w io.Writer) recordWriter { - return NewLogWriter(w, 0 /* logNum */) + return NewLogWriter(w, 0 /* logNum */, LogWriterConfig{}) }) }) } @@ -797,7 +797,7 @@ func TestNoLastRecordOffset(t *testing.T) { func TestInvalidLogNum(t *testing.T) { var buf bytes.Buffer - w := NewLogWriter(&buf, 1) + w := NewLogWriter(&buf, 1, LogWriterConfig{}) for i := 0; i < 10; i++ { s := fmt.Sprintf("%04d\n", i) _, err := w.WriteRecord([]byte(s)) @@ -891,7 +891,7 @@ func TestRecycleLog(t *testing.T) { limit: blocks, } - w := NewLogWriter(limitedBuf, base.FileNum(i)) + w := NewLogWriter(limitedBuf, base.FileNum(i), LogWriterConfig{}) sizes := make([]int, 10+rnd.Intn(100)) for j := range sizes { data := randBlock() @@ -934,7 +934,7 @@ func TestRecycleLog(t *testing.T) { func TestTruncatedLog(t *testing.T) { backing := make([]byte, 2*blockSize) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1)) + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1), LogWriterConfig{}) // Write a record that spans 2 blocks. _, err := w.WriteRecord(bytes.Repeat([]byte("s"), blockSize+100)) require.NoError(t, err) @@ -949,14 +949,14 @@ func TestTruncatedLog(t *testing.T) { func TestRecycleLogWithPartialBlock(t *testing.T) { backing := make([]byte, 27) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1)) + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(1), LogWriterConfig{}) // Will write a chunk with 11 byte header + 5 byte payload. _, err := w.WriteRecord([]byte("aaaaa")) require.NoError(t, err) // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2)) + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(2), LogWriterConfig{}) // Will write a chunk with 11 byte header + 1 byte payload. _, err = w.WriteRecord([]byte("a")) require.NoError(t, err) @@ -978,14 +978,14 @@ func TestRecycleLogNumberOverflow(t *testing.T) { // interpreted correctly. backing := make([]byte, 27) - w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32)) + w := NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32), LogWriterConfig{}) // Will write a chunk with 11 byte header + 5 byte payload. _, err := w.WriteRecord([]byte("aaaaa")) require.NoError(t, err) // Close will write a 11-byte EOF chunk. require.NoError(t, w.Close()) - w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1)) + w = NewLogWriter(bytes.NewBuffer(backing[:0]), base.FileNum(math.MaxUint32+1), LogWriterConfig{}) // Will write a chunk with 11 byte header + 1 byte payload. _, err = w.WriteRecord([]byte("a")) require.NoError(t, err) @@ -1006,7 +1006,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // Write a record that is larger than the log block size. backing1 := make([]byte, 2*blockSize) - w := NewLogWriter(bytes.NewBuffer(backing1[:0]), base.FileNum(1)) + w := NewLogWriter(bytes.NewBuffer(backing1[:0]), base.FileNum(1), LogWriterConfig{}) _, err := w.WriteRecord(bytes.Repeat([]byte("a"), recordSize)) require.NoError(t, err) require.NoError(t, w.Close()) @@ -1014,7 +1014,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { // Write another record to a new incarnation of the WAL that is larger than // the block size. backing2 := make([]byte, 2*blockSize) - w = NewLogWriter(bytes.NewBuffer(backing2[:0]), base.FileNum(2)) + w = NewLogWriter(bytes.NewBuffer(backing2[:0]), base.FileNum(2), LogWriterConfig{}) _, err = w.WriteRecord(bytes.Repeat([]byte("b"), recordSize)) require.NoError(t, err) require.NoError(t, w.Close()) @@ -1036,7 +1036,7 @@ func TestRecycleLogWithPartialRecord(t *testing.T) { func BenchmarkRecordWrite(b *testing.B) { for _, size := range []int{8, 16, 32, 64, 256, 1028, 4096, 65_536} { b.Run(fmt.Sprintf("size=%d", size), func(b *testing.B) { - w := NewLogWriter(io.Discard, 0 /* logNum */) + w := NewLogWriter(io.Discard, 0 /* logNum */, LogWriterConfig{}) defer w.Close() buf := make([]byte, size)