Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metric: expose pebble fsync latency with a callback #2001

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestCommitPipelineWALClose(t *testing.T) {
}

// A basic commitEnv which writes to a WAL.
wal := record.NewLogWriter(sf, 0 /* logNum */)
wal := record.NewLogWriter(sf, 0 /* logNum */, record.LogWriterConfig{})
var walDone sync.WaitGroup
testEnv := commitEnv{
logSeqNum: new(uint64),
Expand Down Expand Up @@ -235,7 +235,7 @@ func BenchmarkCommitPipeline(b *testing.B) {
b.Run(fmt.Sprintf("parallel=%d", parallelism), func(b *testing.B) {
b.SetParallelism(parallelism)
mem := newMemTable(memTableOptions{})
wal := record.NewLogWriter(io.Discard, 0 /* logNum */)
wal := record.NewLogWriter(io.Discard, 0 /* logNum */, record.LogWriterConfig{})

nullCommitEnv := commitEnv{
logSeqNum: new(uint64),
Expand Down
2 changes: 1 addition & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3190,7 +3190,7 @@ func TestCompactFlushQueuedMemTableAndFlushMetrics(t *testing.T) {
func() {
begin := time.Now()
for {
metrics := d.InternalIntervalMetrics()
metrics := d.Metrics()
require.NotNil(t, metrics)
if int64(50<<10) < metrics.Flush.WriteThroughput.Bytes {
// The writes (during which the flush is idle) and the flush work
Expand Down
47 changes: 4 additions & 43 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"sync/atomic"
"time"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
Expand Down Expand Up @@ -415,10 +414,6 @@ type DB struct {
disabled int
}

// Stores the Metrics for the previous call to InternalIntervalMetrics() in
// order to compute the current intervals metrics
lastMetrics *Metrics

// The list of active snapshots.
snapshots snapshotList

Expand Down Expand Up @@ -1510,42 +1505,6 @@ func (d *DB) AsyncFlush() (<-chan struct{}, error) {
return flushed, nil
}

// InternalIntervalMetrics returns the InternalIntervalMetrics and resets for
// the next interval (which is until the next call to this method).
// Deprecated: Use Metrics.Flush and Metrics.LogWriter instead
func (d *DB) InternalIntervalMetrics() *InternalIntervalMetrics {
m := d.Metrics()
d.mu.Lock()
defer d.mu.Unlock()

// Copy the relevant metrics, ensuring to perform a deep clone of the histogram
// to avoid mutating it.
logDelta := m.LogWriter
if m.LogWriter.SyncLatencyMicros != nil {
logDelta.SyncLatencyMicros = hdrhistogram.Import(m.LogWriter.SyncLatencyMicros.Export())
}
flushDelta := m.Flush.WriteThroughput

// Subtract the cumulative metrics at the time of the last InternalIntervalMetrics call,
// if any, in order to compute the delta.
if d.mu.lastMetrics != nil {
logDelta.Subtract(&d.mu.lastMetrics.LogWriter)
flushDelta.Subtract(d.mu.lastMetrics.Flush.WriteThroughput)
}

// Save the *Metrics we used so that a subsequent call to InternalIntervalMetrics
// can compute the delta relative to it.
d.mu.lastMetrics = m

iim := &InternalIntervalMetrics{}
iim.LogWriter.PendingBufferUtilization = logDelta.PendingBufferLen.Mean() / record.CapAllocatedBlocks
iim.LogWriter.SyncQueueUtilization = logDelta.SyncQueueLen.Mean() / record.SyncConcurrency
iim.LogWriter.SyncLatencyMicros = logDelta.SyncLatencyMicros
iim.LogWriter.WriteThroughput = logDelta.WriteThroughput
iim.Flush.WriteThroughput = flushDelta
return iim
}

// Metrics returns metrics about the database.
func (d *DB) Metrics() *Metrics {
metrics := &Metrics{}
Expand Down Expand Up @@ -1990,8 +1949,10 @@ func (d *DB) makeRoomForWrite(b *Batch) error {

if !d.opts.DisableWAL {
d.mu.log.queue = append(d.mu.log.queue, fileInfo{fileNum: newLogNum, fileSize: newLogSize})
d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum)
d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
d.mu.log.LogWriter = record.NewLogWriter(newLogFile, newLogNum, record.LogWriterConfig{
OnFsync: d.opts.MetricEventListener.WALFsyncLatency,
WALMinSyncInterval: d.opts.WALMinSyncInterval,
})
}

immMem := d.mu.mem.mutable
Expand Down
81 changes: 0 additions & 81 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1314,83 +1313,3 @@ func verifyGetNotFound(t *testing.T, r Reader, key []byte) {
t.Fatalf("expected nil, but got %s", val)
}
}

func TestFlushIntervalMetrics(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("test is flaky on windows")
}
sumIntervalMetrics := InternalIntervalMetrics{}
mem := vfs.NewMem()
d, err := Open("", testingRandomized(&Options{
FS: mem,
}))
require.NoError(t, err)
// Flush metrics are updated after and non-atomically with the memtable
// being removed from the queue.
waitAndAccumlateIntervalMetrics := func() {
begin := time.Now()
for {
metrics := d.InternalIntervalMetrics()
sumIntervalMetrics.Flush.WriteThroughput.Merge(metrics.Flush.WriteThroughput)
sumIntervalMetrics.LogWriter.WriteThroughput.Merge(metrics.LogWriter.WriteThroughput)
if sumIntervalMetrics.LogWriter.SyncLatencyMicros == nil {
sumIntervalMetrics.LogWriter.SyncLatencyMicros = metrics.LogWriter.SyncLatencyMicros
} else {
sumIntervalMetrics.LogWriter.SyncLatencyMicros.Merge(metrics.LogWriter.SyncLatencyMicros)
}

require.NotNil(t, metrics)
if int64(50<<10) < metrics.Flush.WriteThroughput.Bytes {
// The writes (during which the flush is idle) and the flush work
// should not be so fast as to be unrealistic. If these turn out to be
// flaky we could instead inject a clock.
tinyInterval := int64(50 * time.Microsecond)
require.Less(t, tinyInterval, int64(metrics.Flush.WriteThroughput.WorkDuration))
require.Less(t, tinyInterval, int64(metrics.Flush.WriteThroughput.IdleDuration))
return
}
if time.Since(begin) > 2*time.Second {
t.Fatal()
}
time.Sleep(time.Millisecond)
}
}

writeAndFlush := func() {
// Add the key "a" to the memtable, then fill up the memtable with the key
// prefix "b". The compaction will only overlap with the queued memtable,
// not the mutable memtable.
// NB: The initial memtable size is 256KB, which is filled up with random
// values which typically don't compress well. The test also appends the
// random value to the "b" key to limit overwriting of the same key, which
// would get collapsed at flush time since there are no open snapshots.
value := make([]byte, 50)
rand.Read(value)
require.NoError(t, d.Set([]byte("a"), value, nil))
for {
rand.Read(value)
require.NoError(t, d.Set(append([]byte("b"), value...), value, nil))
d.mu.Lock()
done := len(d.mu.mem.queue) == 2
d.mu.Unlock()
if done {
break
}
}

require.NoError(t, d.Compact([]byte("a"), []byte("a\x00"), false))
d.mu.Lock()
require.Equal(t, 1, len(d.mu.mem.queue))
d.mu.Unlock()
}

writeAndFlush()
waitAndAccumlateIntervalMetrics()
writeAndFlush()
waitAndAccumlateIntervalMetrics()

require.NoError(t, d.Close())
dbMetrics := d.Metrics()
require.Equal(t, dbMetrics.Flush.WriteThroughput.Bytes, sumIntervalMetrics.Flush.WriteThroughput.Bytes)
require.Equal(t, dbMetrics.LogWriter.SyncLatencyMicros.Export(), sumIntervalMetrics.LogWriter.SyncLatencyMicros.Export())
}
39 changes: 0 additions & 39 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package pebble
import (
"fmt"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/humanize"
Expand Down Expand Up @@ -433,41 +432,3 @@ func hitRate(hits, misses int64) float64 {
}
return 100 * float64(hits) / float64(sum)
}

// InternalIntervalMetrics exposes metrics about internal subsystems, that can
// be useful for deep observability purposes, and for higher-level admission
// control systems that are trying to estimate the capacity of the DB. These
// are experimental and subject to change, since they expose internal
// implementation details, so do not rely on these without discussion with the
// Pebble team.
// These represent the metrics over the interval of time from the last call to
// retrieve these metrics. These are not cumulative, unlike Metrics. The main
// challenge in making these cumulative is the hdrhistogram.Histogram, which
// does not have the ability to subtract a histogram from a preceding metric
// retrieval.
type InternalIntervalMetrics struct {
// LogWriter metrics.
LogWriter struct {
// WriteThroughput is the WAL throughput.
WriteThroughput ThroughputMetric
// PendingBufferUtilization is the utilization of the WAL writer's
// finite-sized pending blocks buffer. It provides an additional signal
// regarding how close to "full" the WAL writer is. The value is in the
// interval [0,1].
PendingBufferUtilization float64
// SyncQueueUtilization is the utilization of the WAL writer's
// finite-sized queue of work that is waiting to sync. The value is in the
// interval [0,1].
SyncQueueUtilization float64
// SyncLatencyMicros is a distribution of the fsync latency observed by
// the WAL writer. It can be nil if there were no fsyncs.
SyncLatencyMicros *hdrhistogram.Histogram
}
// Flush loop metrics.
Flush struct {
// WriteThroughput is the flushing throughput.
WriteThroughput ThroughputMetric
}
// NB: the LogWriter throughput and the Flush throughput are not directly
// comparable because the former does not compress, unlike the latter.
}
7 changes: 5 additions & 2 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,11 @@ func Open(dirname string, opts *Options) (db *DB, _ error) {
BytesPerSync: d.opts.WALBytesPerSync,
PreallocateSize: d.walPreallocateSize(),
})
d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum)
d.mu.log.LogWriter.SetMinSyncInterval(d.opts.WALMinSyncInterval)
logWriterConfig := record.LogWriterConfig{
WALMinSyncInterval: d.opts.WALMinSyncInterval,
OnFsync: d.opts.MetricEventListener.WALFsyncLatency,
}
d.mu.log.LogWriter = record.NewLogWriter(logFile, newLogNum, logWriterConfig)
d.mu.versions.metrics.WAL.Files++
}
d.updateReadStateLocked(d.opts.DebugCheck)
Expand Down
13 changes: 13 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,10 @@ type Options struct {
// The default merger concatenates values.
Merger *Merger

// MetricEventListener contains a set of callbacks that will be invoked when
// various metric related events occur.
MetricEventListener MetricEventListener

// MaxConcurrentCompactions specifies the maximum number of concurrent
// compactions. The default is 1. Concurrent compactions are performed
// - when L0 read-amplification passes the L0CompactionConcurrency threshold
Expand Down Expand Up @@ -794,6 +798,15 @@ type Options struct {
}
}

// MetricEventListener is a struct that contains the callbacks that pebble will
// call when metric related events occur.
type MetricEventListener struct {
// WALFysncLatency is invoked with the duration of every WAL Fsync. It is
// invoked synchronously within the WAL's flush loop, so this function should
// not run for an excessive amount of time to avoid stalling writes.
WALFsyncLatency func(duration time.Duration)
}

// DebugCheckLevels calls CheckLevels on the provided database.
// It may be set in the DebugCheck field of Options to check
// level invariants whenever a new version is installed.
Expand Down
Loading