Skip to content

Commit

Permalink
vfs: include size of write in DiskSlowInfo
Browse files Browse the repository at this point in the history
This commit adds the size of a write to DiskSlowInfo, in cases where a write is
sized. A small write stalling out points at file system / disk issues, while
a large write taking time to complete may indicate CRDB issues with a certain
workload, etc.
  • Loading branch information
joshimhoff committed Mar 8, 2023
1 parent 470c6d4 commit 177cf81
Show file tree
Hide file tree
Showing 5 changed files with 307 additions and 109 deletions.
19 changes: 1 addition & 18 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,24 +116,7 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {

// DiskSlowInfo contains the info for a disk slowness event when writing to a
// file.
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType vfs.OpType
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}

func (i DiskSlowInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds()))
}
type DiskSlowInfo = vfs.DiskSlowInfo

// FlushInfo contains the info for a flush event.
type FlushInfo struct {
Expand Down
8 changes: 2 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,12 +1034,8 @@ func (o *Options) WithFSDefaults() *Options {
o.FS = vfs.Default
}
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second,
func(name string, op vfs.OpType, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
OpType: op,
Duration: duration,
})
func(info vfs.DiskSlowInfo) {
o.EventListener.DiskSlow(info)
})
return o
}
Expand Down
178 changes: 137 additions & 41 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,32 @@ import (
"time"

"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/redact"
)

const (
// defaultTickInterval is the default interval between two ticks of each
// diskHealthCheckingFile loop iteration.
defaultTickInterval = 2 * time.Second
// preallocatedSlotCount is the default number of slots available for
// concurrent filesystem operations. The slot count may be exceeded, but
// each additional slot will incur an additional allocation. We choose 16
// here with the expectation that it is significantly more than required in
// practice. See the comment above the diskHealthCheckingFS type definition.
preallocatedSlotCount = 16
// nOffsetBits is the number of bits in the packed 64-bit integer used for
// identifying an offset from the file creation time (in nanoseconds).
nOffsetBits = 60
// deltaBits is the number of bits in the packed 64-bit integer used for
// identifying a delta from the file creation time in milliseconds.
deltaBits = 40
// writeSizeBits is the number of bits in the packed 64-bit integer used for
// identifying the size of the write operation, if the operation is sized. See
// writeSizePrecision below for precision of size.
writeSizeBits = 20
)

// Variables to enable testing.
var (
// Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
writeSizePrecision = int64(1024)
// defaultTickInterval is the default interval between two ticks of each
// diskHealthCheckingFile loop iteration.
defaultTickInterval = 2 * time.Second
)

// OpType is the type of IO operation being monitored by a
Expand Down Expand Up @@ -98,18 +109,28 @@ func (o OpType) String() string {
// operation, as it reduces overhead per disk operation.
type diskHealthCheckingFile struct {
file File
onSlowDisk func(OpType, time.Duration)
onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
diskSlowThreshold time.Duration
tickInterval time.Duration

stopper chan struct{}
// lastWritePacked is a 64-bit unsigned int, with the most significant 7.5
// bytes (60 bits) representing an offset (in nanoseconds) from the file
// creation time. The least significant four bits contains the OpType.
// lastWritePacked is a 64-bit unsigned int. The most significant
// 40 bits represent an delta (in milliseconds) from the creation
// time of the diskHealthCheckingFile. The next most significant 20 bits
// represent the size of the write in KBs, if the write has a size. (If
// it doesn't, the 20 bits are zeroed). The least significant four bits
// contains the OpType.
//
// The use of 60 bits for an offset provides ~36.5 years of effective
// monitoring time before the uint wraps around. 36.5 years of process uptime
// "ought to be enough for anybody". This allows for 16 operation types.
// The use of 40 bits for an delta provides ~34 years of effective
// monitoring time before the uint wraps around, at millisecond precision.
// ~34 years of process uptime "ought to be enough for anybody". Millisecond
// writeSizePrecision is sufficient, given that we are monitoring for writes that take
// longer than one millisecond.
//
// The use of 20 bits for the size in KBs allows representing sizes up
// to nearly one GB. If the write is larger than that, we round down to ~one GB.
//
// The use of four bits for OpType allows for 16 operation types.
//
// NB: this packing scheme is not persisted, and is therefore safe to adjust
// across process boundaries.
Expand All @@ -120,7 +141,9 @@ type diskHealthCheckingFile struct {
// newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
// specified time threshold and event listener.
func newDiskHealthCheckingFile(
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration),
file File,
diskSlowThreshold time.Duration,
onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
) *diskHealthCheckingFile {
return &diskHealthCheckingFile{
file: file,
Expand Down Expand Up @@ -154,13 +177,13 @@ func (d *diskHealthCheckingFile) startTicker() {
if packed == 0 {
continue
}
offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf)
lastWrite := d.createTime.Add(time.Duration(offsetNanos))
delta, writeSize, op := unpack(packed)
lastWrite := d.createTime.Add(delta)
now := time.Now()
if lastWrite.Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Call the passed-in
// listener.
d.onSlowDisk(op, now.Sub(lastWrite))
d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
}
}
}
Expand Down Expand Up @@ -189,7 +212,7 @@ func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {

// Write implements the io.Writer interface.
func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
d.timeDiskOp(OpTypeWrite, func() {
d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
n, err = d.file.Write(p)
})
return n, err
Expand All @@ -201,13 +224,14 @@ func (d *diskHealthCheckingFile) Close() error {
return d.file.Close()
}

// Prefetch implements (vfs.File).Prefetch.
func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
return d.file.Prefetch(offset, length)
}

// Preallocate implements (vfs.File).Preallocate.
func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
d.timeDiskOp(OpTypePreallocate, func() {
d.timeDiskOp(OpTypePreallocate, n, func() {
err = d.file.Preallocate(off, n)
})
return err
Expand All @@ -220,47 +244,40 @@ func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {

// Sync implements the io.Syncer interface.
func (d *diskHealthCheckingFile) Sync() (err error) {
d.timeDiskOp(OpTypeSync, func() {
d.timeDiskOp(OpTypeSync, 0, func() {
err = d.file.Sync()
})
return err
}

// SyncData implements (vfs.File).SyncData.
func (d *diskHealthCheckingFile) SyncData() (err error) {
d.timeDiskOp(OpTypeSyncData, func() {
d.timeDiskOp(OpTypeSyncData, 0, func() {
err = d.file.SyncData()
})
return err
}

// SyncTo implements (vfs.File).SyncTo.
func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
d.timeDiskOp(OpTypeSyncTo, func() {
d.timeDiskOp(OpTypeSyncTo, length, func() {
fullSync, err = d.file.SyncTo(length)
})
return fullSync, err
}

// timeDiskOp runs the specified closure and makes its timing visible to the
// monitoring goroutine, in case it exceeds one of the slow disk durations.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
// opType should always be set. writeSizeInBytes should be set if the write
// operation is sized. If not, it should be set to zero.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int64, op func()) {
if d == nil {
op()
return
}

offsetNanos := time.Since(d.createTime).Nanoseconds()
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetNanos to zero, so we can still catch the operation
// if happens to be slow.
if offsetNanos < 0 {
offsetNanos = 0
}
if offsetNanos > 1<<nOffsetBits-1 {
panic("vfs: last write offset would result in integer wraparound")
}
packed := uint64(offsetNanos)<<(64-nOffsetBits) | uint64(opType)
delta := time.Since(d.createTime)
packed := pack(delta, writeSizeInBytes, opType)
if invariants.Enabled {
if !atomic.CompareAndSwapUint64(&d.lastWritePacked, 0, packed) {
panic("concurrent write operations detected on file")
Expand All @@ -280,6 +297,45 @@ func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
op()
}

// Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
// callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
// safe because the packing scheme implies we only actually need 32 bits.
func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set deltaMillis to zero, so we can still catch the operation
// if happens to be slow.
deltaMillis := delta.Milliseconds()
if deltaMillis < 0 {
deltaMillis = 0
}
// As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
// of effective monitoring time before the uint wraps around, at millisecond
// precision.
if deltaMillis > 1<<deltaBits-1 {
panic("vfs: last write delta would result in integer wraparound")
}

// See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
writeSize := writeSizeInBytes / writeSizePrecision
// If the size of the write is larger than we can store in the packed int, store the max
// value we can store in the packed int.
const writeSizeCeiling = 1<<writeSizeBits - 1
if writeSize > writeSizeCeiling {
writeSize = writeSizeCeiling
}

return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
}

func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
// Given the packing scheme, converting wz to an int will not truncate anything.
writeSizeInBytes = int(wz)
opType = OpType(packed & 0xf)
return delta, writeSizeInBytes, opType
}

// diskHealthCheckingDir implements disk-health checking for directories. Unlike
// other files, we allow directories to receive concurrent write operations
// (Syncs are the only write operations supported by a directory.) Since the
Expand All @@ -300,6 +356,28 @@ func (d *diskHealthCheckingDir) Sync() (err error) {
return err
}

// DiskSlowInfo captures info about detected slow operations on the vfs.
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType OpType
// Size of write in bytes, if the write is sized.
WriteSize int
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}

func (i DiskSlowInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, i.WriteSize, redact.Safe(i.Duration.Seconds()))
}

// diskHealthCheckingFS adds disk-health checking facilities to a VFS.
// It times disk write operations in two ways:
//
Expand Down Expand Up @@ -339,7 +417,7 @@ func (d *diskHealthCheckingDir) Sync() (err error) {
type diskHealthCheckingFS struct {
tickInterval time.Duration
diskSlowThreshold time.Duration
onSlowDisk func(string, OpType, time.Duration)
onSlowDisk func(DiskSlowInfo)
fs FS
mu struct {
sync.Mutex
Expand Down Expand Up @@ -376,7 +454,7 @@ var _ FS = (*diskHealthCheckingFS)(nil)
//
// A threshold of zero disables disk-health checking.
func WithDiskHealthChecks(
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration),
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo),
) (FS, io.Closer) {
if diskSlowThreshold == 0 {
return innerFS, noopCloser{}
Expand Down Expand Up @@ -492,7 +570,13 @@ func (d *diskHealthCheckingFS) startTickerLocked() {
}
d.mu.Unlock()
for i := range exceededSlots {
d.onSlowDisk(exceededSlots[i].name, exceededSlots[i].opType, now.Sub(time.Unix(0, exceededSlots[i].startNanos)))
d.onSlowDisk(
DiskSlowInfo{
Path: exceededSlots[i].name,
OpType: exceededSlots[i].opType,
WriteSize: 0, // writes at the fs level are not sized
Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
})
}
case <-stopper:
return
Expand Down Expand Up @@ -543,8 +627,14 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
d.onSlowDisk(
DiskSlowInfo{
Path: name,
OpType: opType,
WriteSize: writeSizeInBytes,
Duration: duration,
})
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down Expand Up @@ -658,8 +748,14 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(newname, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
d.onSlowDisk(
DiskSlowInfo{
Path: newname,
OpType: opType,
WriteSize: writeSizeInBytes,
Duration: duration,
})
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down
Loading

0 comments on commit 177cf81

Please sign in to comment.