Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vfs: capture operation type affected by disk slowness #2255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/cockroachdb/pebble/internal/humanize"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -118,6 +119,8 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType vfs.OpType
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}
Expand All @@ -128,8 +131,8 @@ func (i DiskSlowInfo) String() string {

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: write to file %s has been ongoing for %0.1fs",
i.Path, redact.Safe(i.Duration.Seconds()))
w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds()))
}

// FlushInfo contains the info for a flush event.
Expand Down
3 changes: 2 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,9 +966,10 @@ func (o *Options) EnsureDefaults() *Options {

if o.FS == nil {
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second,
func(name string, duration time.Duration) {
func(name string, op vfs.OpType, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
OpType: op,
Duration: duration,
})
})
Expand Down
143 changes: 112 additions & 31 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package vfs

import (
"fmt"
"io"
"os"
"sync"
Expand All @@ -22,8 +23,61 @@ const (
// here with the expectation that it is significantly more than required in
// practice. See the comment above the diskHealthCheckingFS type definition.
preallocatedSlotCount = 16
// nOffsetBits is the number of bits in the packed 64-bit integer used for
// identifying an offset from the file creation time (in nanoseconds).
nOffsetBits = 60
)

// OpType is the type of IO operation being monitored by a
// diskHealthCheckingFile.
type OpType uint8

// The following OpTypes is limited to the subset of file system operations that
// a diskHealthCheckingFile supports (namely writes and syncs).
const (
OpTypeUnknown OpType = iota
OpTypeWrite
OpTypeSync
OpTypeCreate
OpTypeLink
OpTypeMkdirAll
OpTypeRemove
OpTypeRemoveAll
OpTypeRename
OpTypeReuseForWrite
// Note: opTypeMax is just used in tests. It must appear last in the list
// of OpTypes.
opTypeMax
)

// String implements fmt.Stringer.
func (o OpType) String() string {
switch o {
case OpTypeWrite:
return "write"
case OpTypeSync:
return "sync"
case OpTypeCreate:
return "create"
case OpTypeLink:
return "link"
case OpTypeMkdirAll:
return "mkdirall"
case OpTypeRemove:
return "remove"
case OpTypeRemoveAll:
return "removall"
case OpTypeRename:
return "rename"
case OpTypeReuseForWrite:
return "reuseforwrtie"
case OpTypeUnknown:
return "unknown"
default:
panic(fmt.Sprintf("vfs: unknown op type: %d", o))
}
}

// diskHealthCheckingFile is a File wrapper to detect slow disk operations, and
// call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold.
//
Expand All @@ -34,26 +88,38 @@ const (
type diskHealthCheckingFile struct {
File

onSlowDisk func(time.Duration)
onSlowDisk func(OpType, time.Duration)
diskSlowThreshold time.Duration
tickInterval time.Duration

stopper chan struct{}
lastWriteNanos int64
stopper chan struct{}
// lastWritePacked is a 64-bit unsigned int, with the most significant 7.5
// bytes (60 bits) representing an offset (in nanoseconds) from the file
// creation time. The least significant four bits contains the OpType.
//
// The use of 60 bits for an offset provides ~36.5 years of effective
// monitoring time before the uint wraps around. 36.5 years of process uptime
// "ought to be enough for anybody". This allows for 16 operation types.
//
// NB: this packing scheme is not persisted, and is therefore safe to adjust
// across process boundaries.
lastWritePacked uint64
createTime time.Time
}

// newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
// specified time threshold and event listener.
func newDiskHealthCheckingFile(
file File, diskSlowThreshold time.Duration, onSlowDisk func(time.Duration),
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration),
) *diskHealthCheckingFile {
return &diskHealthCheckingFile{
File: file,
onSlowDisk: onSlowDisk,
diskSlowThreshold: diskSlowThreshold,
tickInterval: defaultTickInterval,

stopper: make(chan struct{}),
stopper: make(chan struct{}),
createTime: time.Now(),
}
}

Expand All @@ -74,16 +140,17 @@ func (d *diskHealthCheckingFile) startTicker() {
return

case <-ticker.C:
lastWriteNanos := atomic.LoadInt64(&d.lastWriteNanos)
if lastWriteNanos == 0 {
packed := atomic.LoadUint64(&d.lastWritePacked)
if packed == 0 {
continue
}
lastWrite := time.Unix(0, lastWriteNanos)
offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf)
lastWrite := d.createTime.Add(time.Duration(offsetNanos))
now := time.Now()
if lastWrite.Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Call the passed-in
// listener.
d.onSlowDisk(now.Sub(lastWrite))
d.onSlowDisk(op, now.Sub(lastWrite))
}
}
}
Expand All @@ -97,7 +164,7 @@ func (d *diskHealthCheckingFile) stopTicker() {

// Write implements the io.Writer interface.
func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
d.timeDiskOp(func() {
d.timeDiskOp(OpTypeWrite, func() {
n, err = d.File.Write(p)
})
return n, err
Expand All @@ -111,23 +178,34 @@ func (d *diskHealthCheckingFile) Close() error {

// Sync implements the io.Syncer interface.
func (d *diskHealthCheckingFile) Sync() (err error) {
d.timeDiskOp(func() {
d.timeDiskOp(OpTypeSync, func() {
err = d.File.Sync()
})
return err
}

// timeDiskOp runs the specified closure and makes its timing visible to the
// monitoring goroutine, in case it exceeds one of the slow disk durations.
func (d *diskHealthCheckingFile) timeDiskOp(op func()) {
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
if d == nil {
op()
return
}

atomic.StoreInt64(&d.lastWriteNanos, time.Now().UnixNano())
offsetNanos := time.Since(d.createTime).Nanoseconds()
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetNanos to zero, so we can still catch the operation
// if happens to be slow.
if offsetNanos < 0 {
offsetNanos = 0
}
if offsetNanos > 1<<nOffsetBits-1 {
panic("vfs: last write offset would result in integer wraparound")
}
packed := uint64(offsetNanos)<<(64-nOffsetBits) | uint64(opType)
atomic.StoreUint64(&d.lastWritePacked, packed)
defer func() {
atomic.StoreInt64(&d.lastWriteNanos, 0)
atomic.StoreUint64(&d.lastWritePacked, 0)
}()
op()
}
Expand Down Expand Up @@ -171,7 +249,7 @@ func (d *diskHealthCheckingFile) timeDiskOp(op func()) {
type diskHealthCheckingFS struct {
tickInterval time.Duration
diskSlowThreshold time.Duration
onSlowDisk func(string, time.Duration)
onSlowDisk func(string, OpType, time.Duration)
fs FS
mu struct {
sync.Mutex
Expand All @@ -194,6 +272,7 @@ type diskHealthCheckingFS struct {

type slot struct {
name string
opType OpType
startNanos int64
}

Expand All @@ -207,7 +286,7 @@ var _ FS = (*diskHealthCheckingFS)(nil)
//
// A threshold of zero disables disk-health checking.
func WithDiskHealthChecks(
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, time.Duration),
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration),
) (FS, io.Closer) {
if diskSlowThreshold == 0 {
return innerFS, noopCloser{}
Expand All @@ -230,7 +309,7 @@ func WithDiskHealthChecks(
return fs, fs
}

func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
func (d *diskHealthCheckingFS) timeFilesystemOp(name string, opType OpType, op func()) {
if d == nil {
op()
return
Expand Down Expand Up @@ -265,6 +344,7 @@ func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
// This slot is not in use. Claim it.
s = d.mu.inflight[i]
s.name = name
s.opType = opType
atomic.StoreInt64(&s.startNanos, startNanos)
break
}
Expand All @@ -277,6 +357,7 @@ func (d *diskHealthCheckingFS) timeFilesystemOp(name string, op func()) {
if s == nil {
s = &slot{
name: name,
opType: opType,
startNanos: startNanos,
}
d.mu.inflight = append(d.mu.inflight, s)
Expand Down Expand Up @@ -310,7 +391,7 @@ func (d *diskHealthCheckingFS) startTickerLocked() {
if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Invoke the provided
// callback.
d.onSlowDisk(d.mu.inflight[i].name, now.Sub(time.Unix(0, nanos)))
d.onSlowDisk(d.mu.inflight[i].name, d.mu.inflight[i].opType, now.Sub(time.Unix(0, nanos)))
}
}
d.mu.Unlock()
Expand Down Expand Up @@ -354,7 +435,7 @@ func (d *diskHealthCheckingFS) Close() error {
func (d *diskHealthCheckingFS) Create(name string) (File, error) {
var f File
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeCreate, func() {
f, err = d.fs.Create(name)
})
if err != nil {
Expand All @@ -363,8 +444,8 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(name, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand All @@ -378,7 +459,7 @@ func (d *diskHealthCheckingFS) GetDiskUsage(path string) (DiskUsage, error) {
// Link implements the FS interface.
func (d *diskHealthCheckingFS) Link(oldname, newname string) error {
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeLink, func() {
err = d.fs.Link(oldname, newname)
})
return err
Expand All @@ -397,7 +478,7 @@ func (d *diskHealthCheckingFS) Lock(name string) (io.Closer, error) {
// MkdirAll implements the FS interface.
func (d *diskHealthCheckingFS) MkdirAll(dir string, perm os.FileMode) error {
var err error
d.timeFilesystemOp(dir, func() {
d.timeFilesystemOp(dir, OpTypeMkdirAll, func() {
err = d.fs.MkdirAll(dir, perm)
})
return err
Expand All @@ -416,8 +497,8 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
}
// Directories opened with OpenDir must be opened with health checking,
// because they may be explicitly synced.
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(name, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand All @@ -441,7 +522,7 @@ func (d *diskHealthCheckingFS) PathDir(path string) string {
// Remove implements the FS interface.
func (d *diskHealthCheckingFS) Remove(name string) error {
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeRemove, func() {
err = d.fs.Remove(name)
})
return err
Expand All @@ -450,7 +531,7 @@ func (d *diskHealthCheckingFS) Remove(name string) error {
// RemoveAll implements the FS interface.
func (d *diskHealthCheckingFS) RemoveAll(name string) error {
var err error
d.timeFilesystemOp(name, func() {
d.timeFilesystemOp(name, OpTypeRemoveAll, func() {
err = d.fs.RemoveAll(name)
})
return err
Expand All @@ -459,7 +540,7 @@ func (d *diskHealthCheckingFS) RemoveAll(name string) error {
// Rename implements the FS interface.
func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeRename, func() {
err = d.fs.Rename(oldname, newname)
})
return err
Expand All @@ -469,7 +550,7 @@ func (d *diskHealthCheckingFS) Rename(oldname, newname string) error {
func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, error) {
var f File
var err error
d.timeFilesystemOp(newname, func() {
d.timeFilesystemOp(newname, OpTypeReuseForWrite, func() {
f, err = d.fs.ReuseForWrite(oldname, newname)
})
if err != nil {
Expand All @@ -478,8 +559,8 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(duration time.Duration) {
d.onSlowDisk(newname, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(newname, opType, duration)
})
checkingFile.startTicker()
return WithFd(f, checkingFile), nil
Expand Down
Loading