Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crl-release-23.1: vfs: include size of write in DiskSlowInfo #2503

Merged
merged 2 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 1 addition & 18 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,7 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) {

// DiskSlowInfo contains the info for a disk slowness event when writing to a
// file.
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType vfs.OpType
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}

func (i DiskSlowInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds()))
}
type DiskSlowInfo = vfs.DiskSlowInfo

// FlushInfo contains the info for a flush event.
type FlushInfo struct {
Expand Down
8 changes: 2 additions & 6 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,12 +1038,8 @@ func (o *Options) WithFSDefaults() *Options {
o.FS = vfs.Default
}
o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second,
func(name string, op vfs.OpType, duration time.Duration) {
o.EventListener.DiskSlow(DiskSlowInfo{
Path: name,
OpType: op,
Duration: duration,
})
func(info vfs.DiskSlowInfo) {
o.EventListener.DiskSlow(info)
})
return o
}
Expand Down
180 changes: 139 additions & 41 deletions vfs/disk_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,38 @@ import (
"fmt"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/redact"
)

const (
// defaultTickInterval is the default interval between two ticks of each
// diskHealthCheckingFile loop iteration.
defaultTickInterval = 2 * time.Second
// preallocatedSlotCount is the default number of slots available for
// concurrent filesystem operations. The slot count may be exceeded, but
// each additional slot will incur an additional allocation. We choose 16
// here with the expectation that it is significantly more than required in
// practice. See the comment above the diskHealthCheckingFS type definition.
preallocatedSlotCount = 16
// nOffsetBits is the number of bits in the packed 64-bit integer used for
// identifying an offset from the file creation time (in nanoseconds).
nOffsetBits = 60
// deltaBits is the number of bits in the packed 64-bit integer used for
// identifying a delta from the file creation time in milliseconds.
deltaBits = 40
// writeSizeBits is the number of bits in the packed 64-bit integer used for
// identifying the size of the write operation, if the operation is sized. See
// writeSizePrecision below for precision of size.
writeSizeBits = 20
// Track size of writes at kilobyte precision. See comment above lastWritePacked for more.
writeSizePrecision = 1024
)

// Variables to enable testing.
var (
// defaultTickInterval is the default interval between two ticks of each
// diskHealthCheckingFile loop iteration.
defaultTickInterval = 2 * time.Second
)

// OpType is the type of IO operation being monitored by a
Expand Down Expand Up @@ -98,18 +110,28 @@ func (o OpType) String() string {
// operation, as it reduces overhead per disk operation.
type diskHealthCheckingFile struct {
file File
onSlowDisk func(OpType, time.Duration)
onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration)
diskSlowThreshold time.Duration
tickInterval time.Duration

stopper chan struct{}
// lastWritePacked is a 64-bit unsigned int, with the most significant 7.5
// bytes (60 bits) representing an offset (in nanoseconds) from the file
// creation time. The least significant four bits contains the OpType.
// lastWritePacked is a 64-bit unsigned int. The most significant
// 40 bits represent an delta (in milliseconds) from the creation
// time of the diskHealthCheckingFile. The next most significant 20 bits
// represent the size of the write in KBs, if the write has a size. (If
// it doesn't, the 20 bits are zeroed). The least significant four bits
// contains the OpType.
//
// The use of 60 bits for an offset provides ~36.5 years of effective
// monitoring time before the uint wraps around. 36.5 years of process uptime
// "ought to be enough for anybody". This allows for 16 operation types.
// The use of 40 bits for an delta provides ~34 years of effective
// monitoring time before the uint wraps around, at millisecond precision.
// ~34 years of process uptime "ought to be enough for anybody". Millisecond
// writeSizePrecision is sufficient, given that we are monitoring for writes that take
// longer than one millisecond.
//
// The use of 20 bits for the size in KBs allows representing sizes up
// to nearly one GB. If the write is larger than that, we round down to ~one GB.
//
// The use of four bits for OpType allows for 16 operation types.
//
// NB: this packing scheme is not persisted, and is therefore safe to adjust
// across process boundaries.
Expand All @@ -120,7 +142,9 @@ type diskHealthCheckingFile struct {
// newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the
// specified time threshold and event listener.
func newDiskHealthCheckingFile(
file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration),
file File,
diskSlowThreshold time.Duration,
onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration),
) *diskHealthCheckingFile {
return &diskHealthCheckingFile{
file: file,
Expand Down Expand Up @@ -154,13 +178,13 @@ func (d *diskHealthCheckingFile) startTicker() {
if packed == 0 {
continue
}
offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf)
lastWrite := d.createTime.Add(time.Duration(offsetNanos))
delta, writeSize, op := unpack(packed)
lastWrite := d.createTime.Add(delta)
now := time.Now()
if lastWrite.Add(d.diskSlowThreshold).Before(now) {
// diskSlowThreshold was exceeded. Call the passed-in
// listener.
d.onSlowDisk(op, now.Sub(lastWrite))
d.onSlowDisk(op, writeSize, now.Sub(lastWrite))
}
}
}
Expand Down Expand Up @@ -189,7 +213,7 @@ func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) {

// Write implements the io.Writer interface.
func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) {
d.timeDiskOp(OpTypeWrite, func() {
d.timeDiskOp(OpTypeWrite, int64(len(p)), func() {
n, err = d.file.Write(p)
})
return n, err
Expand All @@ -201,13 +225,14 @@ func (d *diskHealthCheckingFile) Close() error {
return d.file.Close()
}

// Prefetch implements (vfs.File).Prefetch.
func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error {
return d.file.Prefetch(offset, length)
}

// Preallocate implements (vfs.File).Preallocate.
func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) {
d.timeDiskOp(OpTypePreallocate, func() {
d.timeDiskOp(OpTypePreallocate, n, func() {
err = d.file.Preallocate(off, n)
})
return err
Expand All @@ -220,47 +245,40 @@ func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) {

// Sync implements the io.Syncer interface.
func (d *diskHealthCheckingFile) Sync() (err error) {
d.timeDiskOp(OpTypeSync, func() {
d.timeDiskOp(OpTypeSync, 0, func() {
err = d.file.Sync()
})
return err
}

// SyncData implements (vfs.File).SyncData.
func (d *diskHealthCheckingFile) SyncData() (err error) {
d.timeDiskOp(OpTypeSyncData, func() {
d.timeDiskOp(OpTypeSyncData, 0, func() {
err = d.file.SyncData()
})
return err
}

// SyncTo implements (vfs.File).SyncTo.
func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) {
d.timeDiskOp(OpTypeSyncTo, func() {
d.timeDiskOp(OpTypeSyncTo, length, func() {
fullSync, err = d.file.SyncTo(length)
})
return fullSync, err
}

// timeDiskOp runs the specified closure and makes its timing visible to the
// monitoring goroutine, in case it exceeds one of the slow disk durations.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
// opType should always be set. writeSizeInBytes should be set if the write
// operation is sized. If not, it should be set to zero.
func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int64, op func()) {
if d == nil {
op()
return
}

offsetNanos := time.Since(d.createTime).Nanoseconds()
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set offsetNanos to zero, so we can still catch the operation
// if happens to be slow.
if offsetNanos < 0 {
offsetNanos = 0
}
if offsetNanos > 1<<nOffsetBits-1 {
panic("vfs: last write offset would result in integer wraparound")
}
packed := uint64(offsetNanos)<<(64-nOffsetBits) | uint64(opType)
delta := time.Since(d.createTime)
packed := pack(delta, writeSizeInBytes, opType)
if invariants.Enabled {
if !atomic.CompareAndSwapUint64(&d.lastWritePacked, 0, packed) {
panic("concurrent write operations detected on file")
Expand All @@ -280,6 +298,45 @@ func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) {
op()
}

// Note the slight lack of symmetry between pack & unpack. pack takes an int64 for writeSizeInBytes, since
// callers of pack use an int64. This is dictated by the vfs interface. unpack OTOH returns an int. This is
// safe because the packing scheme implies we only actually need 32 bits.
func pack(delta time.Duration, writeSizeInBytes int64, opType OpType) uint64 {
// We have no guarantee of clock monotonicity. If we have a small regression
// in the clock, we set deltaMillis to zero, so we can still catch the operation
// if happens to be slow.
deltaMillis := delta.Milliseconds()
if deltaMillis < 0 {
deltaMillis = 0
}
// As of 3/7/2023, the use of 40 bits for an delta provides ~34 years
// of effective monitoring time before the uint wraps around, at millisecond
// precision.
if deltaMillis > 1<<deltaBits-1 {
panic("vfs: last write delta would result in integer wraparound")
}

// See writeSizePrecision to get the unit of writeSize. As of 1/26/2023, the unit is KBs.
writeSize := writeSizeInBytes / writeSizePrecision
// If the size of the write is larger than we can store in the packed int, store the max
// value we can store in the packed int.
const writeSizeCeiling = 1<<writeSizeBits - 1
if writeSize > writeSizeCeiling {
writeSize = writeSizeCeiling
}

return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType)
}

func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) {
delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond
wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision
// Given the packing scheme, converting wz to an int will not truncate anything.
writeSizeInBytes = int(wz)
opType = OpType(packed & 0xf)
return delta, writeSizeInBytes, opType
}

// diskHealthCheckingDir implements disk-health checking for directories. Unlike
// other files, we allow directories to receive concurrent write operations
// (Syncs are the only write operations supported by a directory.) Since the
Expand All @@ -300,6 +357,29 @@ func (d *diskHealthCheckingDir) Sync() (err error) {
return err
}

// DiskSlowInfo captures info about detected slow operations on the vfs.
type DiskSlowInfo struct {
// Path of file being written to.
Path string
// Operation being performed on the file.
OpType OpType
// Size of write in bytes, if the write is sized.
WriteSize int
// Duration that has elapsed since this disk operation started.
Duration time.Duration
}

func (i DiskSlowInfo) String() string {
return redact.StringWithoutMarkers(i)
}

// SafeFormat implements redact.SafeFormatter.
func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs",
redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)),
redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds()))
}

// diskHealthCheckingFS adds disk-health checking facilities to a VFS.
// It times disk write operations in two ways:
//
Expand Down Expand Up @@ -339,7 +419,7 @@ func (d *diskHealthCheckingDir) Sync() (err error) {
type diskHealthCheckingFS struct {
tickInterval time.Duration
diskSlowThreshold time.Duration
onSlowDisk func(string, OpType, time.Duration)
onSlowDisk func(DiskSlowInfo)
fs FS
mu struct {
sync.Mutex
Expand Down Expand Up @@ -376,7 +456,7 @@ var _ FS = (*diskHealthCheckingFS)(nil)
//
// A threshold of zero disables disk-health checking.
func WithDiskHealthChecks(
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration),
innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo),
) (FS, io.Closer) {
if diskSlowThreshold == 0 {
return innerFS, noopCloser{}
Expand Down Expand Up @@ -492,7 +572,13 @@ func (d *diskHealthCheckingFS) startTickerLocked() {
}
d.mu.Unlock()
for i := range exceededSlots {
d.onSlowDisk(exceededSlots[i].name, exceededSlots[i].opType, now.Sub(time.Unix(0, exceededSlots[i].startNanos)))
d.onSlowDisk(
DiskSlowInfo{
Path: exceededSlots[i].name,
OpType: exceededSlots[i].opType,
WriteSize: 0, // writes at the fs level are not sized
Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)),
})
}
case <-stopper:
return
Expand Down Expand Up @@ -543,8 +629,14 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) {
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(name, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
d.onSlowDisk(
DiskSlowInfo{
Path: name,
OpType: opType,
WriteSize: writeSizeInBytes,
Duration: duration,
})
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down Expand Up @@ -658,8 +750,14 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err
if d.diskSlowThreshold == 0 {
return f, nil
}
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) {
d.onSlowDisk(newname, opType, duration)
checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) {
d.onSlowDisk(
DiskSlowInfo{
Path: newname,
OpType: opType,
WriteSize: writeSizeInBytes,
Duration: duration,
})
})
checkingFile.startTicker()
return checkingFile, nil
Expand Down
Loading