Skip to content

Commit

Permalink
vfs: capture operation type affected by disk slowness
Browse files Browse the repository at this point in the history
Currently, if a Pebble DB is backed by `vfs.FS` that is wrapped with a
`vfs.diskHealthCheckingFS`, the DB can be made aware of operations that
are taking longer than some threshold. The current implementation does not make
any distinction between the operation type (write, sync, etc.) that was
observed as slow.

Capture the type of operation being performed when emitting a disk
slowness event.
  • Loading branch information
nicktrav authored and joshimhoff committed Jan 19, 2023
1 parent f91ad39 commit 5164e72
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 74 deletions.
7 changes: 5 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -118,6 +119,8 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType vfs.OpType
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}
Expand All @@ -128,8 +131,8 @@ func (i DiskSlowInfo) String() string {

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: write to file %s has been ongoing for %0.1fs",
i.Path, redact.Safe(i.Duration.Seconds()))
w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds()))
}

// FlushInfo contains the info for a flush event.
Expand Down
3 changes: 2 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,10 @@ func (o *Options) EnsureDefaults() *Options {

if o.FS == nil {
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second,
func(name string, duration time.Duration) {
func(name string, op vfs.OpType, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
OpType: op,
Duration: duration,
})
})
Expand Down
143 changes: 112 additions & 31 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package vfs

import (
"fmt"
"io"
"os"
"sync"
Expand All @@ -22,8 +23,61 @@ const (
// here with the expectation that it is significantly more than required in
// practice. See the comment above the diskHealthCheckingFS type definition.
preallocatedSlotCount = 16
// nOffsetBits is the number of bits in the packed 64-bit integer used for
// identifying an offset from the file creation time (in nanoseconds).
nOffsetBits = 60
)

// OpType is the type of IO operation being monitored by a
// diskHealthCheckingFile.
type OpType uint8

// The following OpTypes is limited to the subset of file system operations that
// a diskHealthCheckingFile supports (namely writes and syncs).
const (
OpTypeUnknown OpType = iota
OpTypeWrite
OpTypeSync
OpTypeCreate
OpTypeLink
OpTypeMkdirAll
OpTypeRemove
OpTypeRemoveAll
OpTypeRename
OpTypeReuseForWrite
// Note: opTypeMax is just used in tests. It must appear last in the list
// of OpTypes.
opTypeMax
)

// String implements fmt.Stringer.
func (o OpType) String() string {
switch o {
case OpTypeWrite:
return "write"
case OpTypeSync:
return "sync"
case OpTypeCreate:
return "create"
case OpTypeLink:
return "link"
case OpTypeMkdirAll:
return "mkdirall"
case OpTypeRemove:
return "remove"
case OpTypeRemoveAll:
return "removall"
case OpTypeRename:
return "rename"
case OpTypeReuseForWrite:
return "reuseforwrtie"
case OpTypeUnknown:
return "unknown"
default:
panic(fmt.Sprintf("vfs: unknown op type: %d", o))
}
}

// diskHealthCheckingFile is a File wrapper to detect slow disk operations, and
// call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold.
//
Expand All @@ -34,26 +88,38 @@ const (
type diskHealthCheckingFile struct {
File

onSlowDisk func(time.Duration)
onSlowDisk func(OpType, time.Duration)
diskSlowThreshold time.Duration
tickInterval time.Duration

stopper chan struct{}
lastWriteNanos int64
stopper chan struct{}
// lastWritePacked is a 64-bit unsigned int, with the most significant 7.5
// bytes (60 bits) representing an offset (in nanoseconds) from the file
// creation time. The least significant four bits contains the OpType.
//
// The use of 60 bits for an offset provides ~36.5 years of effective
// monitoring time before the uint wraps around. 36.5 years of process uptime
// "ought to be enough for anybody". This also allows for 16 operation types.
//
// NB: this packing scheme is not persisted, and is therefore safe to adjust
// across process boundaries.
lastWritePacked uint64
createTime time.Time
}

// newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
// specified time threshold and event listener.
func newDiskHealthCheckingFile(
file File, diskSlowThreshold time.Duration, onSlowDisk func(time.Duration),
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration),
) *diskHealthCheckingFile {
return &diskHealthCheckingFile{
File: file,
onSlowDisk: onSlowDisk,
diskSlowThreshold: diskSlowThreshold,
tickInterval: defaultTickInterval,

stopper: make(chan struct{}),
stopper: make(chan struct{}),
createTime: time.Now(),
}
}

Expand All @@ -74,16 +140,17 @@ func (d *diskHealthCheckingFile) startTicker() {
return

case <-ticker.C:
lastWriteNanos := atomic.LoadInt64(&d.lastWriteNanos)
if lastWriteNanos == 0 {
packed := atomic.LoadUint64(&d.lastWritePacked)
if packed == 0 {
continue
}
lastWrite := time.Unix(0, lastWriteNanos)
offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf)
lastWrite := d.createTime.Add(time.Duration(offsetNanos))
now := time.Now()
if lastWrite.Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Call the passed-in
// listener.
d.onSlowDisk(now.Sub(lastWrite))
d.onSlowDisk(op, now.Sub(lastWrite))
}
}
}
Expand All @@ -97,7 +164,7 @@ func (d *diskHealthCheckingFile) stopTicker() {

// Write implements the io.Writer interface.
func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
d.timeDiskOp(func() {
d.timeDiskOp(OpTypeWrite, func() {
n, err = d.File.Write(p)
})
return n, err
Expand All @@ -111,23 +178,34 @@ func (d *diskHealthCheckingFile) Close() error {

// Sync implements the io.Syncer interface.
func (d *diskHealthCheckingFile) Sync() (err error) {
d.timeDiskOp(func() {
d.timeDiskOp(OpTypeSync, func() {
err = d.File.Sync()
})
return err
}

// timeDiskOp runs the specified closure and makes its timing visible to the
// monitoring goroutine, in case it exceeds one of the slow disk durations.
func (d *diskHealthCheckingFile) timeDiskOp(op func()) {
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
if d == nil {
op()
return
}

atomic.StoreInt64(&d.lastWriteNanos, time.Now().UnixNano())
offsetNanos := time.Since(d.createTime).Nanoseconds()
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetNanos to zero, so we can still catch the operation
// if happens to be slow.
if offsetNanos < 0 {
offsetNanos = 0
}
if (offsetNanos > 1<<nOffsetBits-1) {
panic("vfs: last write offset would result in integer wraparound")
}
packed := uint64(offsetNanos)<<(64-nOffsetBits) | uint64(opType)
atomic.StoreUint64(&d.lastWritePacked, packed)
defer func() {
atomic.StoreInt64(&d.lastWriteNanos, 0)
atomic.StoreUint64(&d.lastWritePacked, 0)
}()
op()
}
Expand Down Expand Up @@ -171,7 +249,7 @@ func (d *diskHealthCheckingFile) timeDiskOp(op func()) {
type diskHealthCheckingFS struct {
tickInterval time.Duration
diskSlowThreshold time.Duration
onSlowDisk func(string, time.Duration)
onSlowDisk func(string, OpType, time.Duration)
fs FS
mu struct {
sync.Mutex
Expand All @@ -194,6 +272,7 @@ type diskHealthCheckingFS struct {

type slot struct {
name string
opType OpType
startNanos int64
}

Expand All @@ -207,7 +286,7 @@ var _ FS = (*diskHealthCheckingFS)(nil)
//
// A threshold of zero disables disk-health checking.
func WithDiskHealthChecks(
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration),
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration),
) (FS, io.Closer) {
if diskSlowThreshold == 0 {
return innerFS, noopCloser{}
Expand All @@ -230,7 +309,7 @@ func WithDiskHealthChecks(
return fs, fs
}

func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
func (d *diskHealthCheckingFS) timeFilesystemOp(name string, opType OpType, op func()) {
if d == nil {
op()
return
Expand Down Expand Up @@ -265,6 +344,7 @@ func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
// This slot is not in use. Claim it.
s = d.mu.inflight[i]
s.name = name
s.opType = opType
atomic.StoreInt64(&s.startNanos, startNanos)
break
}
Expand All @@ -277,6 +357,7 @@ func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
if s == nil {
s = &slot{
name: name,
opType: opType,
startNanos: startNanos,
}
d.mu.inflight = append(d.mu.inflight, s)
Expand Down Expand Up @@ -310,7 +391,7 @@ func (d *diskHealthCheckingFS) startTickerLocked() {
if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Invoke the provided
// callback.
d.onSlowDisk(d.mu.inflight[i].name, now.Sub(time.Unix(0, nanos)))
d.onSlowDisk(d.mu.inflight[i].name, d.mu.inflight[i].opType, now.Sub(time.Unix(0, nanos)))
}
}
d.mu.Unlock()
Expand Down Expand Up @@ -354,7 +435,7 @@ func (d *diskHealthCheckingFS) Close() error {
func (d *diskHealthCheckingFS) Create(name string) (File, error) {
var f File
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeCreate, func() {
f, err = d.fs.Create(name)
})
if err != nil {
Expand All @@ -363,8 +444,8 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(name, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand All @@ -378,7 +459,7 @@ func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
// Link implements the FS interface.
func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeLink, func() {
err = d.fs.Link(oldname, newname)
})
return err
Expand All @@ -397,7 +478,7 @@ func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
// MkdirAll implements the FS interface.
func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
var err error
d.timeFilesystemOp(dir, func() {
d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
err = d.fs.MkdirAll(dir, perm)
})
return err
Expand All @@ -416,8 +497,8 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
}
// Directories opened with OpenDir must be opened with health checking,
// because they may be explicitly synced.
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(name, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand All @@ -441,7 +522,7 @@ func (d *diskHealthCheckingFS) PathDir(path string) string {
// Remove implements the FS interface.
func (d *diskHealthCheckingFS) Remove(name string) error {
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeRemove, func() {
err = d.fs.Remove(name)
})
return err
Expand All @@ -450,7 +531,7 @@ func (d *diskHealthCheckingFS) Remove(name string) error {
// RemoveAll implements the FS interface.
func (d *diskHealthCheckingFS) RemoveAll(name string) error {
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
err = d.fs.RemoveAll(name)
})
return err
Expand All @@ -459,7 +540,7 @@ func (d *diskHealthCheckingFS) RemoveAll(name string) error {
// Rename implements the FS interface.
func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeRename, func() {
err = d.fs.Rename(oldname, newname)
})
return err
Expand All @@ -469,7 +550,7 @@ func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
var f File
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
f, err = d.fs.ReuseForWrite(oldname, newname)
})
if err != nil {
Expand All @@ -478,8 +559,8 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(newname, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(newname, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand Down
Loading

0 comments on commit 5164e72

Please sign in to comment.