diff --git a/event.go b/event.go index 2da40ff8b7..59551906a9 100644 --- a/event.go +++ b/event.go @@ -116,24 +116,7 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) { // DiskSlowInfo contains the info for a disk slowness event when writing to a // file. -type DiskSlowInfo struct { - // Path of file being written to. - Path string - // Operation being performed on the file. - OpType vfs.OpType - // Duration that has elapsed since this disk operation started. - Duration time.Duration -} - -func (i DiskSlowInfo) String() string { - return redact.StringWithoutMarkers(i) -} - -// SafeFormat implements redact.SafeFormatter. -func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs", - redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds())) -} +type DiskSlowInfo = vfs.DiskSlowInfo // FlushInfo contains the info for a flush event. type FlushInfo struct { diff --git a/options.go b/options.go index 0d2bbbc84d..d23b80f209 100644 --- a/options.go +++ b/options.go @@ -1034,12 +1034,8 @@ func (o *Options) WithFSDefaults() *Options { o.FS = vfs.Default } o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second, - func(name string, op vfs.OpType, duration time.Duration) { - o.EventListener.DiskSlow(DiskSlowInfo{ - Path: name, - OpType: op, - Duration: duration, - }) + func(info vfs.DiskSlowInfo) { + o.EventListener.DiskSlow(info) }) return o } diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 8b6c7e54a9..33ce3d2e88 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -13,21 +13,32 @@ import ( "time" "github.com/cockroachdb/pebble/internal/invariants" + "github.com/cockroachdb/redact" ) const ( - // defaultTickInterval is the default interval between two ticks of each - // diskHealthCheckingFile loop iteration. - defaultTickInterval = 2 * time.Second // preallocatedSlotCount is the default number of slots available for // concurrent filesystem operations. The slot count may be exceeded, but // each additional slot will incur an additional allocation. We choose 16 // here with the expectation that it is significantly more than required in // practice. See the comment above the diskHealthCheckingFS type definition. preallocatedSlotCount = 16 - // nOffsetBits is the number of bits in the packed 64-bit integer used for - // identifying an offset from the file creation time (in nanoseconds). - nOffsetBits = 60 + // deltaBits is the number of bits in the packed 64-bit integer used for + // identifying a delta from the file creation time in milliseconds. + deltaBits = 40 + // writeSizeBits is the number of bits in the packed 64-bit integer used for + // identifying the size of the write operation, if the operation is sized. See + // writeSizePrecision below for precision of size. + writeSizeBits = 20 + // Track size of writes at kilobyte precision. See comment above lastWritePacked for more. + writeSizePrecision = 1024 +) + +// Variables to enable testing. +var ( + // defaultTickInterval is the default interval between two ticks of each + // diskHealthCheckingFile loop iteration. + defaultTickInterval = 2 * time.Second ) // OpType is the type of IO operation being monitored by a @@ -98,18 +109,28 @@ func (o OpType) String() string { // operation, as it reduces overhead per disk operation. type diskHealthCheckingFile struct { file File - onSlowDisk func(OpType, time.Duration) + onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration) diskSlowThreshold time.Duration tickInterval time.Duration stopper chan struct{} - // lastWritePacked is a 64-bit unsigned int, with the most significant 7.5 - // bytes (60 bits) representing an offset (in nanoseconds) from the file - // creation time. The least significant four bits contains the OpType. + // lastWritePacked is a 64-bit unsigned int. The most significant + // 40 bits represent an delta (in milliseconds) from the creation + // time of the diskHealthCheckingFile. The next most significant 20 bits + // represent the size of the write in KBs, if the write has a size. (If + // it doesn't, the 20 bits are zeroed). The least significant four bits + // contains the OpType. // - // The use of 60 bits for an offset provides ~36.5 years of effective - // monitoring time before the uint wraps around. 36.5 years of process uptime - // "ought to be enough for anybody". This allows for 16 operation types. + // The use of 40 bits for an delta provides ~34 years of effective + // monitoring time before the uint wraps around, at millisecond precision. + // ~34 years of process uptime "ought to be enough for anybody". Millisecond + // writeSizePrecision is sufficient, given that we are monitoring for writes that take + // longer than one millisecond. + // + // The use of 20 bits for the size in KBs allows representing sizes up + // to nearly one GB. If the write is larger than that, we round down to ~one GB. + // + // The use of four bits for OpType allows for 16 operation types. // // NB: this packing scheme is not persisted, and is therefore safe to adjust // across process boundaries. @@ -120,7 +141,9 @@ type diskHealthCheckingFile struct { // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the // specified time threshold and event listener. func newDiskHealthCheckingFile( - file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration), + file File, + diskSlowThreshold time.Duration, + onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration), ) *diskHealthCheckingFile { return &diskHealthCheckingFile{ file: file, @@ -154,13 +177,13 @@ func (d *diskHealthCheckingFile) startTicker() { if packed == 0 { continue } - offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf) - lastWrite := d.createTime.Add(time.Duration(offsetNanos)) + delta, writeSize, op := unpack(packed) + lastWrite := d.createTime.Add(delta) now := time.Now() if lastWrite.Add(d.diskSlowThreshold).Before(now) { // diskSlowThreshold was exceeded. Call the passed-in // listener. - d.onSlowDisk(op, now.Sub(lastWrite)) + d.onSlowDisk(op, writeSize, now.Sub(lastWrite)) } } } @@ -189,7 +212,7 @@ func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) { // Write implements the io.Writer interface. func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { - d.timeDiskOp(OpTypeWrite, func() { + d.timeDiskOp(OpTypeWrite, int64(len(p)), func() { n, err = d.file.Write(p) }) return n, err @@ -201,13 +224,14 @@ func (d *diskHealthCheckingFile) Close() error { return d.file.Close() } +// Prefetch implements (vfs.File).Prefetch. func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error { return d.file.Prefetch(offset, length) } // Preallocate implements (vfs.File).Preallocate. func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) { - d.timeDiskOp(OpTypePreallocate, func() { + d.timeDiskOp(OpTypePreallocate, n, func() { err = d.file.Preallocate(off, n) }) return err @@ -220,7 +244,7 @@ func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) { // Sync implements the io.Syncer interface. func (d *diskHealthCheckingFile) Sync() (err error) { - d.timeDiskOp(OpTypeSync, func() { + d.timeDiskOp(OpTypeSync, 0, func() { err = d.file.Sync() }) return err @@ -228,7 +252,7 @@ func (d *diskHealthCheckingFile) Sync() (err error) { // SyncData implements (vfs.File).SyncData. func (d *diskHealthCheckingFile) SyncData() (err error) { - d.timeDiskOp(OpTypeSyncData, func() { + d.timeDiskOp(OpTypeSyncData, 0, func() { err = d.file.SyncData() }) return err @@ -236,7 +260,7 @@ func (d *diskHealthCheckingFile) SyncData() (err error) { // SyncTo implements (vfs.File).SyncTo. func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) { - d.timeDiskOp(OpTypeSyncTo, func() { + d.timeDiskOp(OpTypeSyncTo, length, func() { fullSync, err = d.file.SyncTo(length) }) return fullSync, err @@ -244,23 +268,16 @@ func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) // timeDiskOp runs the specified closure and makes its timing visible to the // monitoring goroutine, in case it exceeds one of the slow disk durations. -func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) { +// opType should always be set. writeSizeInBytes should be set if the write +// operation is sized. If not, it should be set to zero. +func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int64, op func()) { if d == nil { op() return } - offsetNanos := time.Since(d.createTime).Nanoseconds() - // We have no guarantee of clock monotonicity. If we have a small regression - // in the clock, we set offsetNanos to zero, so we can still catch the operation - // if happens to be slow. - if offsetNanos < 0 { - offsetNanos = 0 - } - if offsetNanos > 1< 1< writeSizeCeiling { + writeSize = writeSizeCeiling + } + + return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType) +} + +func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) { + delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond + wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision + // Given the packing scheme, converting wz to an int will not truncate anything. + writeSizeInBytes = int(wz) + opType = OpType(packed & 0xf) + return delta, writeSizeInBytes, opType +} + // diskHealthCheckingDir implements disk-health checking for directories. Unlike // other files, we allow directories to receive concurrent write operations // (Syncs are the only write operations supported by a directory.) Since the @@ -300,6 +356,28 @@ func (d *diskHealthCheckingDir) Sync() (err error) { return err } +// DiskSlowInfo captures info about detected slow operations on the vfs. +type DiskSlowInfo struct { + // Path of file being written to. + Path string + // Operation being performed on the file. + OpType OpType + // Size of write in bytes, if the write is sized. + WriteSize int + // Duration that has elapsed since this disk operation started. + Duration time.Duration +} + +func (i DiskSlowInfo) String() string { + return redact.StringWithoutMarkers(i) +} + +// SafeFormat implements redact.SafeFormatter. +func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs", + redact.Safe(i.OpType.String()), i.Path, i.WriteSize, redact.Safe(i.Duration.Seconds())) +} + // diskHealthCheckingFS adds disk-health checking facilities to a VFS. // It times disk write operations in two ways: // @@ -339,7 +417,7 @@ func (d *diskHealthCheckingDir) Sync() (err error) { type diskHealthCheckingFS struct { tickInterval time.Duration diskSlowThreshold time.Duration - onSlowDisk func(string, OpType, time.Duration) + onSlowDisk func(DiskSlowInfo) fs FS mu struct { sync.Mutex @@ -376,7 +454,7 @@ var _ FS = (*diskHealthCheckingFS)(nil) // // A threshold of zero disables disk-health checking. func WithDiskHealthChecks( - innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration), + innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo), ) (FS, io.Closer) { if diskSlowThreshold == 0 { return innerFS, noopCloser{} @@ -492,7 +570,13 @@ func (d *diskHealthCheckingFS) startTickerLocked() { } d.mu.Unlock() for i := range exceededSlots { - d.onSlowDisk(exceededSlots[i].name, exceededSlots[i].opType, now.Sub(time.Unix(0, exceededSlots[i].startNanos))) + d.onSlowDisk( + DiskSlowInfo{ + Path: exceededSlots[i].name, + OpType: exceededSlots[i].opType, + WriteSize: 0, // writes at the fs level are not sized + Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)), + }) } case <-stopper: return @@ -543,8 +627,14 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) { if d.diskSlowThreshold == 0 { return f, nil } - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) { - d.onSlowDisk(name, opType, duration) + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) { + d.onSlowDisk( + DiskSlowInfo{ + Path: name, + OpType: opType, + WriteSize: writeSizeInBytes, + Duration: duration, + }) }) checkingFile.startTicker() return checkingFile, nil @@ -658,8 +748,14 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err if d.diskSlowThreshold == 0 { return f, nil } - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) { - d.onSlowDisk(newname, opType, duration) + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) { + d.onSlowDisk( + DiskSlowInfo{ + Path: newname, + OpType: opType, + WriteSize: writeSizeInBytes, + Duration: duration, + }) }) checkingFile.startTicker() return checkingFile, nil diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index fc972e6e33..77c48fd056 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -205,87 +205,207 @@ func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { var _ FS = &mockFS{} func TestDiskHealthChecking_File(t *testing.T) { + oldTickInterval := defaultTickInterval + defaultTickInterval = time.Millisecond + defer func() { defaultTickInterval = oldTickInterval }() + const ( - slowThreshold = 1 * time.Second - syncAndWriteDuration = 3 * time.Second + slowThreshold = 50 * time.Millisecond ) + + fiveKB := make([]byte, 5*writeSizePrecision) testCases := []struct { - op OpType - fn func(f File) + op OpType + writeSize int + writeDuration time.Duration + fn func(f File) + createWriteDelta time.Duration + expectStall bool }{ + // No false negatives. { - OpTypeWrite, - func(f File) { f.Write([]byte("uh oh")) }, + op: OpTypeWrite, + writeSize: 5 * writeSizePrecision, // five KB + writeDuration: 100 * time.Millisecond, + fn: func(f File) { f.Write(fiveKB) }, + expectStall: true, }, { - OpTypeSync, - func(f File) { f.Sync() }, + op: OpTypeSync, + writeSize: 0, + writeDuration: 100 * time.Millisecond, + fn: func(f File) { f.Sync() }, + expectStall: true, + }, + // No false positives. + { + op: OpTypeWrite, + writeSize: 5, + writeDuration: 25 * time.Millisecond, + fn: func(f File) { f.Write([]byte("uh oh")) }, + createWriteDelta: 100 * time.Millisecond, + expectStall: false, + }, + { + op: OpTypeSync, + writeSize: 0, + writeDuration: 25 * time.Millisecond, + fn: func(f File) { f.Sync() }, + expectStall: false, }, } for _, tc := range testCases { t.Run(tc.op.String(), func(t *testing.T) { - type info struct { - opType OpType - duration time.Duration - } - diskSlow := make(chan info, 1) + diskSlow := make(chan DiskSlowInfo, 1) mockFS := &mockFS{create: func(name string) (File, error) { - return mockFile{syncAndWriteDuration: syncAndWriteDuration}, nil + return mockFile{syncAndWriteDuration: tc.writeDuration}, nil }} fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, - func(s string, opType OpType, duration time.Duration) { - diskSlow <- info{ - opType: opType, - duration: duration, - } + func(info DiskSlowInfo) { + diskSlow <- info }) defer closer.Close() dhFile, _ := fs.Create("test") defer dhFile.Close() + // Writing after file creation tests computation of delta between file + // creation time & write time. + time.Sleep(tc.createWriteDelta) + tc.fn(dhFile) - select { - case i := <-diskSlow: - d := i.duration - if d.Seconds() < slowThreshold.Seconds() { - t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) + + if tc.expectStall { // no false negatives + select { + case i := <-diskSlow: + d := i.Duration + if d.Seconds() < slowThreshold.Seconds() { + t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) + } + require.Equal(t, tc.writeSize, i.WriteSize) + require.Equal(t, tc.op, i.OpType) + case <-time.After(200 * time.Millisecond): + t.Fatal("disk stall detector did not detect slow disk operation") + } + } else { // no false positives + select { + case <-diskSlow: + t.Fatal("disk stall detector detected a slow disk operation") + case <-time.After(200 * time.Millisecond): + return } - require.Equal(t, tc.op, i.opType) - case <-time.After(5 * time.Second): - t.Fatal("disk stall detector did not detect slow disk operation") } }) } } func TestDiskHealthChecking_NotTooManyOps(t *testing.T) { - numBitsForOpType := 64 - nOffsetBits + numBitsForOpType := 64 - deltaBits - writeSizeBits numOpTypesAllowed := int(math.Pow(2, float64(numBitsForOpType))) numOpTypes := int(opTypeMax) require.LessOrEqual(t, numOpTypes, numOpTypesAllowed) } +func TestDiskHealthChecking_File_PackingAndUnpacking(t *testing.T) { + testCases := []struct { + desc string + delta time.Duration + writeSize int64 + opType OpType + wantDelta time.Duration + wantWriteSize int + }{ + // Write op with write size in bytes. + { + desc: "write, sized op", + delta: 3000 * time.Millisecond, + writeSize: 1024, // 1 KB. + opType: OpTypeWrite, + wantDelta: 3000 * time.Millisecond, + wantWriteSize: 1024, + }, + // Sync op. No write size. Max-ish delta that packing scheme can handle. + { + desc: "sync, no write size", + delta: 34 * time.Hour * 24 * 365, + writeSize: 0, + opType: OpTypeSync, + wantDelta: 34 * time.Hour * 24 * 365, + wantWriteSize: 0, + }, + // Delta is negative (e.g. due to clock sync). Set to + // zero. + { + desc: "delta negative", + delta: -5, + writeSize: 5120, // 5 KB + opType: OpTypeWrite, + wantDelta: 0, + wantWriteSize: 5120, + }, + // Write size in bytes is larger than can fit in 20 bits. + // Round down to max that can fit in 20 bits. + { + desc: "write size truncated", + delta: 231 * time.Millisecond, + writeSize: 2097152000, // too big! + opType: OpTypeWrite, + wantDelta: 231 * time.Millisecond, + wantWriteSize: 1073740800, // (2^20-1) * writeSizePrecision ~= a bit less than one GB + }, + // Write size in bytes is max representable less than the ceiling. + { + desc: "write size barely not truncated", + delta: 231 * time.Millisecond, + writeSize: 1073739776, // max representable less than the ceiling + opType: OpTypeWrite, + wantDelta: 231 * time.Millisecond, + wantWriteSize: 1073739776, // since can fit, unchanged + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + packed := pack(tc.delta, tc.writeSize, tc.opType) + gotDelta, gotWriteSize, gotOpType := unpack(packed) + + require.Equal(t, tc.wantDelta, gotDelta) + require.Equal(t, tc.wantWriteSize, gotWriteSize) + require.Equal(t, tc.opType, gotOpType) + }) + } +} + func TestDiskHealthChecking_File_Underflow(t *testing.T) { f := &mockFile{} - hcFile := newDiskHealthCheckingFile(f, 1*time.Second, func(opType OpType, duration time.Duration) { + hcFile := newDiskHealthCheckingFile(f, 1*time.Second, func(opType OpType, writeSizeInBytes int, duration time.Duration) { // We expect to panic before sending the event. t.Fatalf("unexpected slow disk event") }) defer hcFile.Close() - // Set the file creation to the UNIX epoch, which is earlier than the max - // offset of the health check. - tEpoch := time.Unix(0, 0) - hcFile.createTime = tEpoch + t.Run("too large delta leads to panic", func(t *testing.T) { + // Given the packing scheme, 35 years of process uptime will lead to a delta + // that is too large to fit in the packed int64. + tCreate := time.Now().Add(-35 * time.Hour * 24 * 365) + hcFile.createTime = tCreate - // Assert that the time since the epoch (in nanoseconds) is indeed greater - // than the max offset. - require.True(t, time.Since(tEpoch).Nanoseconds() > 1< 1<