diff --git a/event.go b/event.go index f85e820517..ef0c1c0ec2 100644 --- a/event.go +++ b/event.go @@ -10,6 +10,7 @@ import ( "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/manifest" + "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/redact" ) @@ -118,6 +119,8 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) { type DiskSlowInfo struct { // Path of file being written to. Path string + // Operation being performed on the file. + OpType vfs.OpType // Duration that has elapsed since this disk operation started. Duration time.Duration } @@ -128,8 +131,8 @@ func (i DiskSlowInfo) String() string { // SafeFormat implements redact.SafeFormatter. func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("disk slowness detected: write to file %s has been ongoing for %0.1fs", - i.Path, redact.Safe(i.Duration.Seconds())) + w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs", + redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds())) } // FlushInfo contains the info for a flush event. diff --git a/options.go b/options.go index 73ba3b4104..73d59056d2 100644 --- a/options.go +++ b/options.go @@ -966,9 +966,10 @@ func (o *Options) EnsureDefaults() *Options { if o.FS == nil { o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second, - func(name string, duration time.Duration) { + func(name string, op vfs.OpType, duration time.Duration) { o.EventListener.DiskSlow(DiskSlowInfo{ Path: name, + OpType: op, Duration: duration, }) }) diff --git a/vfs/disk_health.go b/vfs/disk_health.go index ceb9046ee7..2f8f2b200a 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -5,6 +5,7 @@ package vfs import ( + "fmt" "io" "os" "sync" @@ -22,8 +23,61 @@ const ( // here with the expectation that it is significantly more than required in // practice. See the comment above the diskHealthCheckingFS type definition. preallocatedSlotCount = 16 + // nOffsetBits is the number of bits in the packed 64-bit integer used for + // identifying an offset from the file creation time (in nanoseconds). + nOffsetBits = 60 ) +// OpType is the type of IO operation being monitored by a +// diskHealthCheckingFile. +type OpType uint8 + +// The following OpTypes is limited to the subset of file system operations that +// a diskHealthCheckingFile supports (namely writes and syncs). +const ( + OpTypeUnknown OpType = iota + OpTypeWrite + OpTypeSync + OpTypeCreate + OpTypeLink + OpTypeMkdirAll + OpTypeRemove + OpTypeRemoveAll + OpTypeRename + OpTypeReuseForWrite + // Note: opTypeMax is just used in tests. It must appear last in the list + // of OpTypes. + opTypeMax +) + +// String implements fmt.Stringer. +func (o OpType) String() string { + switch o { + case OpTypeWrite: + return "write" + case OpTypeSync: + return "sync" + case OpTypeCreate: + return "create" + case OpTypeLink: + return "link" + case OpTypeMkdirAll: + return "mkdirall" + case OpTypeRemove: + return "remove" + case OpTypeRemoveAll: + return "removall" + case OpTypeRename: + return "rename" + case OpTypeReuseForWrite: + return "reuseforwrtie" + case OpTypeUnknown: + return "unknown" + default: + panic(fmt.Sprintf("vfs: unknown op type: %d", o)) + } +} + // diskHealthCheckingFile is a File wrapper to detect slow disk operations, and // call onSlowDisk if a disk operation is seen to exceed diskSlowThreshold. // @@ -34,18 +88,29 @@ const ( type diskHealthCheckingFile struct { File - onSlowDisk func(time.Duration) + onSlowDisk func(OpType, time.Duration) diskSlowThreshold time.Duration tickInterval time.Duration - stopper chan struct{} - lastWriteNanos int64 + stopper chan struct{} + // lastWritePacked is a 64-bit unsigned int, with the most significant 7.5 + // bytes (60 bits) representing an offset (in nanoseconds) from the file + // creation time. The least significant four bits contains the OpType. + // + // The use of 60 bits for an offset provides ~36.5 years of effective + // monitoring time before the uint wraps around. 36.5 years of process uptime + // "ought to be enough for anybody". This allows for 16 operation types. + // + // NB: this packing scheme is not persisted, and is therefore safe to adjust + // across process boundaries. + lastWritePacked uint64 + createTime time.Time } // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the // specified time threshold and event listener. func newDiskHealthCheckingFile( - file File, diskSlowThreshold time.Duration, onSlowDisk func(time.Duration), + file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration), ) *diskHealthCheckingFile { return &diskHealthCheckingFile{ File: file, @@ -53,7 +118,8 @@ func newDiskHealthCheckingFile( diskSlowThreshold: diskSlowThreshold, tickInterval: defaultTickInterval, - stopper: make(chan struct{}), + stopper: make(chan struct{}), + createTime: time.Now(), } } @@ -74,16 +140,17 @@ func (d *diskHealthCheckingFile) startTicker() { return case <-ticker.C: - lastWriteNanos := atomic.LoadInt64(&d.lastWriteNanos) - if lastWriteNanos == 0 { + packed := atomic.LoadUint64(&d.lastWritePacked) + if packed == 0 { continue } - lastWrite := time.Unix(0, lastWriteNanos) + offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf) + lastWrite := d.createTime.Add(time.Duration(offsetNanos)) now := time.Now() if lastWrite.Add(d.diskSlowThreshold).Before(now) { // diskSlowThreshold was exceeded. Call the passed-in // listener. - d.onSlowDisk(now.Sub(lastWrite)) + d.onSlowDisk(op, now.Sub(lastWrite)) } } } @@ -97,7 +164,7 @@ func (d *diskHealthCheckingFile) stopTicker() { // Write implements the io.Writer interface. func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { - d.timeDiskOp(func() { + d.timeDiskOp(OpTypeWrite, func() { n, err = d.File.Write(p) }) return n, err @@ -111,7 +178,7 @@ func (d *diskHealthCheckingFile) Close() error { // Sync implements the io.Syncer interface. func (d *diskHealthCheckingFile) Sync() (err error) { - d.timeDiskOp(func() { + d.timeDiskOp(OpTypeSync, func() { err = d.File.Sync() }) return err @@ -119,15 +186,26 @@ func (d *diskHealthCheckingFile) Sync() (err error) { // timeDiskOp runs the specified closure and makes its timing visible to the // monitoring goroutine, in case it exceeds one of the slow disk durations. -func (d *diskHealthCheckingFile) timeDiskOp(op func()) { +func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) { if d == nil { op() return } - atomic.StoreInt64(&d.lastWriteNanos, time.Now().UnixNano()) + offsetNanos := time.Since(d.createTime).Nanoseconds() + // We have no guarantee of clock monotonicity. If we have a small regression + // in the clock, we set offsetNanos to zero, so we can still catch the operation + // if happens to be slow. + if offsetNanos < 0 { + offsetNanos = 0 + } + if offsetNanos > 1< 1<