Skip to content

Commit

Permalink
vfs: include size of write in DiskSlowInfo
Browse files Browse the repository at this point in the history
This commit adds the size of a write to DiskSlowInfo, in cases where a write is
sized. A small write stalling out points at file system / disk issues, while
a large write taking time to complete may indicate CRDB issues with a certain
workload, etc.
  • Loading branch information
joshimhoff committed Jan 26, 2023
1 parent 169e5db commit 9787bb1
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 60 deletions.
8 changes: 6 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type DiskSlowInfo struct {
Path string
// Operation being performed on the file.
OpType vfs.OpType
// Size of write in kilobytes, if the write is sized.
WriteSize int
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}
Expand All @@ -131,8 +133,10 @@ func (i DiskSlowInfo) String() string {

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds()))
// TODO(before merge): Should we mention the ceiling of ~1.04 GB here? Any reviewers have an
// opinion?
w.Printf("disk slowness detected: %s on file %s (%d kilobytes) has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, i.WriteSize, redact.Safe(i.Duration.Seconds()))
}

// FlushInfo contains the info for a flush event.
Expand Down
9 changes: 5 additions & 4 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,11 +966,12 @@ func (o *Options) EnsureDefaults() *Options {

if o.FS == nil {
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(vfs.Default, 5*time.Second,
func(name string, op vfs.OpType, duration time.Duration) {
func(name string, op vfs.OpType, writeSize int, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
OpType: op,
Duration: duration,
Path: name,
OpType: op,
WriteSize: writeSize,
Duration: duration,
})
})
}
Expand Down
113 changes: 77 additions & 36 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ const (
preallocatedSlotCount = 16
// nOffsetBits is the number of bits in the packed 64-bit integer used for
// identifying an offset from the file creation time (in nanoseconds).
nOffsetBits = 60
nOffsetBits = 40
// nWriteSizeBits is the number of bits in the packed 64-bit integer used for
// identifying the size of the write operation, if the operation is sized.
nWriteSizeBits = 20
)

// Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
var writeSizePrecision = 1000

// OpType is the type of IO operation being monitored by a
// diskHealthCheckingFile.
type OpType uint8
Expand Down Expand Up @@ -88,29 +94,44 @@ func (o OpType) String() string {
type diskHealthCheckingFile struct {
File

onSlowDisk func(OpType, time.Duration)
onSlowDisk func(OpType, int, time.Duration)
diskSlowThreshold time.Duration
tickInterval time.Duration

stopper chan struct{}
// lastWritePacked is a 64-bit unsigned int, with the most significant 7.5
// bytes (60 bits) representing an offset (in nanoseconds) from the file
// creation time. The least significant four bits contains the OpType.
// lastWritePacked is a 64-bit unsigned int. The most significant
// 40 bits represent an offset (in milliseconds) from the creation
// time of the diskHealthCheckingFile. The next most significant 20 bits
// represent the size of the write in KBs, if the write has a size. (If
// it doesn't, the 20 bits are zeroed). The least significant four bits
// contains the OpType.
//
// The use of 40 bits for an offset provides ~34 years of effective
// monitoring time before the uint wraps around, at millisecond precision.
// ~34 years of process uptime "ought to be enough for anybody". Millisecond
// writeSizePrecision is sufficient, given that we are monitoring for writes that take
// longer than one millisecond.
//
// The use of 20 bits for the size in KBs allows representing sizes of up
// to ~1.04 GBs. If the write is larger than that, we round down to ~1.04 GBs.
//
// The use of 60 bits for an offset provides ~36.5 years of effective
// monitoring time before the uint wraps around. 36.5 years of process uptime
// "ought to be enough for anybody". This allows for 16 operation types.
// The use of four bits for OpType allows for 16 operation types.
//
// NB: this packing scheme is not persisted, and is therefore safe to adjust
// across process boundaries.
//
// TODO(before merge): 1 KB to ~1.04 GB is range of writes that we can represent via
// this scheme, with bigger writes being rounded down to ~1.04 GB. Not sure how big
// writes in pebble are, on both low side, average side, & high side. Need to check
// on that, & adjust scheme if needed.
lastWritePacked uint64
createTime time.Time
}

// newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
// specified time threshold and event listener.
func newDiskHealthCheckingFile(
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration),
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, int, time.Duration),
) *diskHealthCheckingFile {
return &diskHealthCheckingFile{
File: file,
Expand Down Expand Up @@ -138,19 +159,18 @@ func (d *diskHealthCheckingFile) startTicker() {
select {
case <-d.stopper:
return

case <-ticker.C:
packed := atomic.LoadUint64(&d.lastWritePacked)
if packed == 0 {
continue
}
offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf)
lastWrite := d.createTime.Add(time.Duration(offsetNanos))
offsetMillis, writeSize, op := unpack(packed)
lastWrite := d.createTime.Add(time.Duration(offsetMillis))
now := time.Now()
if lastWrite.Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Call the passed-in
// listener.
d.onSlowDisk(op, now.Sub(lastWrite))
d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
}
}
}
Expand All @@ -164,7 +184,7 @@ func (d *diskHealthCheckingFile) stopTicker() {

// Write implements the io.Writer interface.
func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
d.timeDiskOp(OpTypeWrite, func() {
d.timeDiskOp(OpTypeWrite, len(p), func() {
n, err = d.File.Write(p)
})
return n, err
Expand All @@ -178,38 +198,59 @@ func (d *diskHealthCheckingFile) Close() error {

// Sync implements the io.Syncer interface.
func (d *diskHealthCheckingFile) Sync() (err error) {
d.timeDiskOp(OpTypeSync, func() {
d.timeDiskOp(OpTypeSync, 0, func() {
err = d.File.Sync()
})
return err
}

// timeDiskOp runs the specified closure and makes its timing visible to the
// monitoring goroutine, in case it exceeds one of the slow disk durations.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
// opType should always be set. writeSizeInBytes should be set if the write
// operation is sized. If not, it should be set to zero.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int, op func()) {
if d == nil {
op()
return
}

offsetNanos := time.Since(d.createTime).Nanoseconds()
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetNanos to zero, so we can still catch the operation
// if happens to be slow.
if offsetNanos < 0 {
offsetNanos = 0
}
if offsetNanos > 1<<nOffsetBits-1 {
panic("vfs: last write offset would result in integer wraparound")
}
packed := uint64(offsetNanos)<<(64-nOffsetBits) | uint64(opType)
offsetMillis := time.Since(d.createTime).Milliseconds()
packed := pack(offsetMillis, writeSizeInBytes, opType)
atomic.StoreUint64(&d.lastWritePacked, packed)
defer func() {
atomic.StoreUint64(&d.lastWritePacked, 0)
}()
op()
}

func pack(offsetMillis int64, writeSizeInBytes int, opType OpType) uint64 {
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetMillis to zero, so we can still catch the operation
// if happens to be slow.
if offsetMillis < 0 {
offsetMillis = 0
}
if offsetMillis > 1<<nOffsetBits-1 {
panic("vfs: last write offset would result in integer wraparound")
}

// See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
writeSize := writeSizeInBytes / writeSizePrecision
// If the size of the write is larger than we can store in the packed int, store the max
// value we can store in the packed int.
if writeSize > 1<<nWriteSizeBits-1 {
writeSize = 1<<nWriteSizeBits-1
}

return uint64(offsetMillis)<<(64-nOffsetBits) | uint64(writeSize)<<(64-nOffsetBits-nWriteSizeBits) | uint64(opType)

}

// See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
func unpack(packed uint64) (offsetMillis int64, writeSize int, opType OpType) {
return int64(packed>>(64-nOffsetBits)), int(packed>>(64-nOffsetBits-nWriteSizeBits))&((1<<nWriteSizeBits)-1), OpType(packed&0xf)
}

// diskHealthCheckingFS adds disk-health checking facilities to a VFS.
// It times disk write operations in two ways:
//
Expand Down Expand Up @@ -249,7 +290,7 @@ func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
type diskHealthCheckingFS struct {
tickInterval time.Duration
diskSlowThreshold time.Duration
onSlowDisk func(string, OpType, time.Duration)
onSlowDisk func(string, OpType, int, time.Duration)
fs FS
mu struct {
sync.Mutex
Expand Down Expand Up @@ -286,7 +327,7 @@ var _ FS = (*diskHealthCheckingFS)(nil)
//
// A threshold of zero disables disk-health checking.
func WithDiskHealthChecks(
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration),
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, int, time.Duration),
) (FS, io.Closer) {
if diskSlowThreshold == 0 {
return innerFS, noopCloser{}
Expand Down Expand Up @@ -391,7 +432,7 @@ func (d *diskHealthCheckingFS) startTickerLocked() {
if nanos != 0 && time.Unix(0, nanos).Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Invoke the provided
// callback.
d.onSlowDisk(d.mu.inflight[i].name, d.mu.inflight[i].opType, now.Sub(time.Unix(0, nanos)))
d.onSlowDisk(d.mu.inflight[i].name, d.mu.inflight[i].opType, 0, now.Sub(time.Unix(0, nanos)))
}
}
d.mu.Unlock()
Expand Down Expand Up @@ -444,8 +485,8 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSize int, duration time.Duration) {
d.onSlowDisk(name, opType, writeSize, duration)
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down Expand Up @@ -497,8 +538,8 @@ func (d *diskHealthCheckingFS) OpenDir(name string) (File, error) {
}
// Directories opened with OpenDir must be opened with health checking,
// because they may be explicitly synced.
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSize int, duration time.Duration) {
d.onSlowDisk(name, opType, writeSize, duration)
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down Expand Up @@ -559,8 +600,8 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(newname, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSize int, duration time.Duration) {
d.onSlowDisk(newname, opType, writeSize, duration)
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down
Loading

0 comments on commit 9787bb1

Please sign in to comment.