From 5164e727a2505c8bbd5fd40c70f9ae2be919a60f Mon Sep 17 00:00:00 2001 From: Nick Travers Date: Wed, 27 Apr 2022 13:32:09 -0700 Subject: [PATCH] vfs: capture operation type affected by disk slowness Currently, if a Pebble DB is backed by `vfs.FS` that is wrapped with a `vfs.diskHealthCheckingFS`, the DB can be made aware of operations that are taking longer than some threshold. The current implementation does not make any distinction between the operation type (write, sync, etc.) that was observed as slow. Capture the type of operation being performed when emitting a disk slowness event. --- event.go | 7 +- options.go | 3 +- vfs/disk_health.go | 143 +++++++++++++++++++++++++++-------- vfs/disk_health_test.go | 161 ++++++++++++++++++++++++++++++---------- vfs/fd_test.go | 2 +- vfs/vfs_test.go | 10 +++ 6 files changed, 252 insertions(+), 74 deletions(-) diff --git a/event.go b/event.go index f85e820517..ef0c1c0ec2 100644 --- a/event.go +++ b/event.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/redact" ) @@ -118,6 +119,8 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) { type DiskSlowInfo struct { // Path of file being written to. Path string + // Operation being performed on the file. + OpType vfs.OpType // Duration that has elapsed since this disk operation started. Duration time.Duration } @@ -128,8 +131,8 @@ func (i DiskSlowInfo) String() string { // SafeFormat implements redact.SafeFormatter. func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("disk slowness detected: write to file %s has been ongoing for %0.1fs", - i.Path, redact.Safe(i.Duration.Seconds())) + w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs", + redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds())) } // FlushInfo contains the info for a flush event. diff --git a/options.go b/options.go index 73ba3b4104..73d59056d2 100644 --- a/options.go +++ b/options.go @@ -966,9 +966,10 @@ func (o *Options) EnsureDefaults() *Options { if o.FS == nil { o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, - func(name string, duration time.Duration) { + func(name string, op vfs.OpType, duration time.Duration) { o.EventListener.DiskSlow(DiskSlowInfo{ Path: name, + OpType: op, Duration: duration, }) }) diff --git a/vfs/disk_health.go b/vfs/disk_health.go index ceb9046ee7..5e050b11ce 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -5,6 +5,7 @@ package vfs import ( + "fmt" "io" "os" "sync" @@ -22,8 +23,61 @@ const ( // here with the expectation that it is significantly more than required in // practice. See the comment above the diskHealthCheckingFS type definition. preallocatedSlotCount = 16 + // nOffsetBits is the number of bits in the packed 64-bit integer used for + // identifying an offset from the file creation time (in nanoseconds). + nOffsetBits = 60 ) +// OpType is the type of IO operation being monitored by a +// diskHealthCheckingFile. +type OpType uint8 + +// The following OpTypes is limited to the subset of file system operations that +// a diskHealthCheckingFile supports (namely writes and syncs). +const ( + OpTypeUnknown OpType = iota + OpTypeWrite + OpTypeSync + OpTypeCreate + OpTypeLink + OpTypeMkdirAll + OpTypeRemove + OpTypeRemoveAll + OpTypeRename + OpTypeReuseForWrite + // Note: opTypeMax is just used in tests. It must appear last in the list + // of OpTypes. + opTypeMax +) + +// String implements fmt.Stringer. +func (o OpType) String() string { + switch o { + case OpTypeWrite: + return "write" + case OpTypeSync: + return "sync" + case OpTypeCreate: + return "create" + case OpTypeLink: + return "link" + case OpTypeMkdirAll: + return "mkdirall" + case OpTypeRemove: + return "remove" + case OpTypeRemoveAll: + return "removall" + case OpTypeRename: + return "rename" + case OpTypeReuseForWrite: + return "reuseforwrtie" + case OpTypeUnknown: + return "unknown" + default: + panic(fmt.Sprintf("vfs: unknown op type: %d", o)) + } +} + // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and // call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold. // @@ -34,18 +88,29 @@ const ( type diskHealthCheckingFile struct { File - onSlowDisk func(time.Duration) + onSlowDisk func(OpType, time.Duration) diskSlowThreshold time.Duration tickInterval time.Duration - stopper chan struct{} - lastWriteNanos int64 + stopper chan struct{} + // lastWritePacked is a 64-bit unsigned int, with the most significant 7.5 + // bytes (60 bits) representing an offset (in nanoseconds) from the file + // creation time. The least significant four bits contains the OpType. + // + // The use of 60 bits for an offset provides ~36.5 years of effective + // monitoring time before the uint wraps around. 36.5 years of process uptime + // "ought to be enough for anybody". This also allows for 16 operation types. + // + // NB: this packing scheme is not persisted, and is therefore safe to adjust + // across process boundaries. + lastWritePacked uint64 + createTime time.Time } // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the // specified time threshold and event listener. func newDiskHealthCheckingFile( - file File, diskSlowThreshold time.Duration, onSlowDisk func(time.Duration), + file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration), ) *diskHealthCheckingFile { return &diskHealthCheckingFile{ File: file, @@ -53,7 +118,8 @@ func newDiskHealthCheckingFile( diskSlowThreshold: diskSlowThreshold, tickInterval: defaultTickInterval, - stopper: make(chan struct{}), + stopper: make(chan struct{}), + createTime: time.Now(), } } @@ -74,16 +140,17 @@ func (d *diskHealthCheckingFile) startTicker() { return case <-ticker.C: - lastWriteNanos := atomic.LoadInt64(&d.lastWriteNanos) - if lastWriteNanos == 0 { + packed := atomic.LoadUint64(&d.lastWritePacked) + if packed == 0 { continue } - lastWrite := time.Unix(0, lastWriteNanos) + offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf) + lastWrite := d.createTime.Add(time.Duration(offsetNanos)) now := time.Now() if lastWrite.Add(d.diskSlowThreshold).Before(now) { // diskSlowThreshold was exceeded. Call the passed-in // listener. - d.onSlowDisk(now.Sub(lastWrite)) + d.onSlowDisk(op, now.Sub(lastWrite)) } } } @@ -97,7 +164,7 @@ func (d *diskHealthCheckingFile) stopTicker() { // Write implements the io.Writer interface. func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { - d.timeDiskOp(func() { + d.timeDiskOp(OpTypeWrite, func() { n, err = d.File.Write(p) }) return n, err @@ -111,7 +178,7 @@ func (d *diskHealthCheckingFile) Close() error { // Sync implements the io.Syncer interface. func (d *diskHealthCheckingFile) Sync() (err error) { - d.timeDiskOp(func() { + d.timeDiskOp(OpTypeSync, func() { err = d.File.Sync() }) return err @@ -119,15 +186,26 @@ func (d *diskHealthCheckingFile) Sync() (err error) { // timeDiskOp runs the specified closure and makes its timing visible to the // monitoring goroutine, in case it exceeds one of the slow disk durations. -func (d *diskHealthCheckingFile) timeDiskOp(op func()) { +func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) { if d == nil { op() return } - atomic.StoreInt64(&d.lastWriteNanos, time.Now().UnixNano()) + offsetNanos := time.Since(d.createTime).Nanoseconds() + // We have no guarantee of clock monotonicity. If we have a small regression + // in the clock, we set offsetNanos to zero, so we can still catch the operation + // if happens to be slow. + if offsetNanos < 0 { + offsetNanos = 0 + } + if (offsetNanos > 1< 1<