From 9c8ac76daa256a7b68ac7df4987d21e2a2177cd1 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 12 May 2023 16:31:36 -0400 Subject: [PATCH 1/2] Revert "vfs: mark file basename as safe to avoid log redaction" This reverts commit 11c9b6d0a8524310ab34d4e536b6a372eb48f15c. --- vfs/disk_health.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 1c0b9c5976..33ce3d2e88 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -8,7 +8,6 @@ import ( "fmt" "io" "os" - "path/filepath" "sync" "sync/atomic" "time" @@ -376,8 +375,7 @@ func (i DiskSlowInfo) String() string { // SafeFormat implements redact.SafeFormatter. func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs", - redact.Safe(i.OpType.String()), redact.Safe(filepath.Base(i.Path)), - redact.Safe(i.WriteSize), redact.Safe(i.Duration.Seconds())) + redact.Safe(i.OpType.String()), i.Path, i.WriteSize, redact.Safe(i.Duration.Seconds())) } // diskHealthCheckingFS adds disk-health checking facilities to a VFS. From a045c794336121a5fe2de2a8abde8ff7871e6cf2 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 12 May 2023 16:31:38 -0400 Subject: [PATCH 2/2] Revert "vfs: include size of write in DiskSlowInfo" This reverts commit e599d0d6b23cd2faaa14e53119b7b231ab51298c. --- event.go | 19 +++- options.go | 8 +- vfs/disk_health.go | 178 ++++++++-------------------------- vfs/disk_health_test.go | 207 +++++++++------------------------------- vfs/fd_test.go | 2 +- 5 files changed, 109 insertions(+), 305 deletions(-) diff --git a/event.go b/event.go index f243e24c1d..a2d7643b00 100644 --- a/event.go +++ b/event.go @@ -119,7 +119,24 @@ func (i levelInfos) SafeFormat(w redact.SafePrinter, _ rune) { // DiskSlowInfo contains the info for a disk slowness event when writing to a // file. -type DiskSlowInfo = vfs.DiskSlowInfo +type DiskSlowInfo struct { + // Path of file being written to. + Path string + // Operation being performed on the file. + OpType vfs.OpType + // Duration that has elapsed since this disk operation started. + Duration time.Duration +} + +func (i DiskSlowInfo) String() string { + return redact.StringWithoutMarkers(i) +} + +// SafeFormat implements redact.SafeFormatter. +func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { + w.Printf("disk slowness detected: %s on file %s has been ongoing for %0.1fs", + redact.Safe(i.OpType.String()), i.Path, redact.Safe(i.Duration.Seconds())) +} // FlushInfo contains the info for a flush event. type FlushInfo struct { diff --git a/options.go b/options.go index a72a78c35f..edb4675763 100644 --- a/options.go +++ b/options.go @@ -1049,8 +1049,12 @@ func (o *Options) WithFSDefaults() *Options { o.FS = vfs.Default } o.FS, o.private.fsCloser = vfs.WithDiskHealthChecks(o.FS, 5*time.Second, - func(info vfs.DiskSlowInfo) { - o.EventListener.DiskSlow(info) + func(name string, op vfs.OpType, duration time.Duration) { + o.EventListener.DiskSlow(DiskSlowInfo{ + Path: name, + OpType: op, + Duration: duration, + }) }) return o } diff --git a/vfs/disk_health.go b/vfs/disk_health.go index 33ce3d2e88..8b6c7e54a9 100644 --- a/vfs/disk_health.go +++ b/vfs/disk_health.go @@ -13,32 +13,21 @@ import ( "time" "github.com/cockroachdb/pebble/internal/invariants" - "github.com/cockroachdb/redact" ) const ( + // defaultTickInterval is the default interval between two ticks of each + // diskHealthCheckingFile loop iteration. + defaultTickInterval = 2 * time.Second // preallocatedSlotCount is the default number of slots available for // concurrent filesystem operations. The slot count may be exceeded, but // each additional slot will incur an additional allocation. We choose 16 // here with the expectation that it is significantly more than required in // practice. See the comment above the diskHealthCheckingFS type definition. preallocatedSlotCount = 16 - // deltaBits is the number of bits in the packed 64-bit integer used for - // identifying a delta from the file creation time in milliseconds. - deltaBits = 40 - // writeSizeBits is the number of bits in the packed 64-bit integer used for - // identifying the size of the write operation, if the operation is sized. See - // writeSizePrecision below for precision of size. - writeSizeBits = 20 - // Track size of writes at kilobyte precision. See comment above lastWritePacked for more. - writeSizePrecision = 1024 -) - -// Variables to enable testing. -var ( - // defaultTickInterval is the default interval between two ticks of each - // diskHealthCheckingFile loop iteration. - defaultTickInterval = 2 * time.Second + // nOffsetBits is the number of bits in the packed 64-bit integer used for + // identifying an offset from the file creation time (in nanoseconds). + nOffsetBits = 60 ) // OpType is the type of IO operation being monitored by a @@ -109,28 +98,18 @@ func (o OpType) String() string { // operation, as it reduces overhead per disk operation. type diskHealthCheckingFile struct { file File - onSlowDisk func(opType OpType, writeSizeInBytes int, duration time.Duration) + onSlowDisk func(OpType, time.Duration) diskSlowThreshold time.Duration tickInterval time.Duration stopper chan struct{} - // lastWritePacked is a 64-bit unsigned int. The most significant - // 40 bits represent an delta (in milliseconds) from the creation - // time of the diskHealthCheckingFile. The next most significant 20 bits - // represent the size of the write in KBs, if the write has a size. (If - // it doesn't, the 20 bits are zeroed). The least significant four bits - // contains the OpType. + // lastWritePacked is a 64-bit unsigned int, with the most significant 7.5 + // bytes (60 bits) representing an offset (in nanoseconds) from the file + // creation time. The least significant four bits contains the OpType. // - // The use of 40 bits for an delta provides ~34 years of effective - // monitoring time before the uint wraps around, at millisecond precision. - // ~34 years of process uptime "ought to be enough for anybody". Millisecond - // writeSizePrecision is sufficient, given that we are monitoring for writes that take - // longer than one millisecond. - // - // The use of 20 bits for the size in KBs allows representing sizes up - // to nearly one GB. If the write is larger than that, we round down to ~one GB. - // - // The use of four bits for OpType allows for 16 operation types. + // The use of 60 bits for an offset provides ~36.5 years of effective + // monitoring time before the uint wraps around. 36.5 years of process uptime + // "ought to be enough for anybody". This allows for 16 operation types. // // NB: this packing scheme is not persisted, and is therefore safe to adjust // across process boundaries. @@ -141,9 +120,7 @@ type diskHealthCheckingFile struct { // newDiskHealthCheckingFile instantiates a new diskHealthCheckingFile, with the // specified time threshold and event listener. func newDiskHealthCheckingFile( - file File, - diskSlowThreshold time.Duration, - onSlowDisk func(OpType OpType, writeSizeInBytes int, duration time.Duration), + file File, diskSlowThreshold time.Duration, onSlowDisk func(OpType, time.Duration), ) *diskHealthCheckingFile { return &diskHealthCheckingFile{ file: file, @@ -177,13 +154,13 @@ func (d *diskHealthCheckingFile) startTicker() { if packed == 0 { continue } - delta, writeSize, op := unpack(packed) - lastWrite := d.createTime.Add(delta) + offsetNanos, op := int64(packed>>(64-nOffsetBits)), OpType(packed&0xf) + lastWrite := d.createTime.Add(time.Duration(offsetNanos)) now := time.Now() if lastWrite.Add(d.diskSlowThreshold).Before(now) { // diskSlowThreshold was exceeded. Call the passed-in // listener. - d.onSlowDisk(op, writeSize, now.Sub(lastWrite)) + d.onSlowDisk(op, now.Sub(lastWrite)) } } } @@ -212,7 +189,7 @@ func (d *diskHealthCheckingFile) ReadAt(p []byte, off int64) (int, error) { // Write implements the io.Writer interface. func (d *diskHealthCheckingFile) Write(p []byte) (n int, err error) { - d.timeDiskOp(OpTypeWrite, int64(len(p)), func() { + d.timeDiskOp(OpTypeWrite, func() { n, err = d.file.Write(p) }) return n, err @@ -224,14 +201,13 @@ func (d *diskHealthCheckingFile) Close() error { return d.file.Close() } -// Prefetch implements (vfs.File).Prefetch. func (d *diskHealthCheckingFile) Prefetch(offset, length int64) error { return d.file.Prefetch(offset, length) } // Preallocate implements (vfs.File).Preallocate. func (d *diskHealthCheckingFile) Preallocate(off, n int64) (err error) { - d.timeDiskOp(OpTypePreallocate, n, func() { + d.timeDiskOp(OpTypePreallocate, func() { err = d.file.Preallocate(off, n) }) return err @@ -244,7 +220,7 @@ func (d *diskHealthCheckingFile) Stat() (os.FileInfo, error) { // Sync implements the io.Syncer interface. func (d *diskHealthCheckingFile) Sync() (err error) { - d.timeDiskOp(OpTypeSync, 0, func() { + d.timeDiskOp(OpTypeSync, func() { err = d.file.Sync() }) return err @@ -252,7 +228,7 @@ func (d *diskHealthCheckingFile) Sync() (err error) { // SyncData implements (vfs.File).SyncData. func (d *diskHealthCheckingFile) SyncData() (err error) { - d.timeDiskOp(OpTypeSyncData, 0, func() { + d.timeDiskOp(OpTypeSyncData, func() { err = d.file.SyncData() }) return err @@ -260,7 +236,7 @@ func (d *diskHealthCheckingFile) SyncData() (err error) { // SyncTo implements (vfs.File).SyncTo. func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) { - d.timeDiskOp(OpTypeSyncTo, length, func() { + d.timeDiskOp(OpTypeSyncTo, func() { fullSync, err = d.file.SyncTo(length) }) return fullSync, err @@ -268,16 +244,23 @@ func (d *diskHealthCheckingFile) SyncTo(length int64) (fullSync bool, err error) // timeDiskOp runs the specified closure and makes its timing visible to the // monitoring goroutine, in case it exceeds one of the slow disk durations. -// opType should always be set. writeSizeInBytes should be set if the write -// operation is sized. If not, it should be set to zero. -func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, writeSizeInBytes int64, op func()) { +func (d *diskHealthCheckingFile) timeDiskOp(opType OpType, op func()) { if d == nil { op() return } - delta := time.Since(d.createTime) - packed := pack(delta, writeSizeInBytes, opType) + offsetNanos := time.Since(d.createTime).Nanoseconds() + // We have no guarantee of clock monotonicity. If we have a small regression + // in the clock, we set offsetNanos to zero, so we can still catch the operation + // if happens to be slow. + if offsetNanos < 0 { + offsetNanos = 0 + } + if offsetNanos > 1< 1< writeSizeCeiling { - writeSize = writeSizeCeiling - } - - return uint64(deltaMillis)<<(64-deltaBits) | uint64(writeSize)<<(64-deltaBits-writeSizeBits) | uint64(opType) -} - -func unpack(packed uint64) (delta time.Duration, writeSizeInBytes int, opType OpType) { - delta = time.Duration(packed>>(64-deltaBits)) * time.Millisecond - wz := int64(packed>>(64-deltaBits-writeSizeBits)) & ((1 << writeSizeBits) - 1) * writeSizePrecision - // Given the packing scheme, converting wz to an int will not truncate anything. - writeSizeInBytes = int(wz) - opType = OpType(packed & 0xf) - return delta, writeSizeInBytes, opType -} - // diskHealthCheckingDir implements disk-health checking for directories. Unlike // other files, we allow directories to receive concurrent write operations // (Syncs are the only write operations supported by a directory.) Since the @@ -356,28 +300,6 @@ func (d *diskHealthCheckingDir) Sync() (err error) { return err } -// DiskSlowInfo captures info about detected slow operations on the vfs. -type DiskSlowInfo struct { - // Path of file being written to. - Path string - // Operation being performed on the file. - OpType OpType - // Size of write in bytes, if the write is sized. - WriteSize int - // Duration that has elapsed since this disk operation started. - Duration time.Duration -} - -func (i DiskSlowInfo) String() string { - return redact.StringWithoutMarkers(i) -} - -// SafeFormat implements redact.SafeFormatter. -func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { - w.Printf("disk slowness detected: %s on file %s (%d bytes) has been ongoing for %0.1fs", - redact.Safe(i.OpType.String()), i.Path, i.WriteSize, redact.Safe(i.Duration.Seconds())) -} - // diskHealthCheckingFS adds disk-health checking facilities to a VFS. // It times disk write operations in two ways: // @@ -417,7 +339,7 @@ func (i DiskSlowInfo) SafeFormat(w redact.SafePrinter, _ rune) { type diskHealthCheckingFS struct { tickInterval time.Duration diskSlowThreshold time.Duration - onSlowDisk func(DiskSlowInfo) + onSlowDisk func(string, OpType, time.Duration) fs FS mu struct { sync.Mutex @@ -454,7 +376,7 @@ var _ FS = (*diskHealthCheckingFS)(nil) // // A threshold of zero disables disk-health checking. func WithDiskHealthChecks( - innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(info DiskSlowInfo), + innerFS FS, diskSlowThreshold time.Duration, onSlowDisk func(string, OpType, time.Duration), ) (FS, io.Closer) { if diskSlowThreshold == 0 { return innerFS, noopCloser{} @@ -570,13 +492,7 @@ func (d *diskHealthCheckingFS) startTickerLocked() { } d.mu.Unlock() for i := range exceededSlots { - d.onSlowDisk( - DiskSlowInfo{ - Path: exceededSlots[i].name, - OpType: exceededSlots[i].opType, - WriteSize: 0, // writes at the fs level are not sized - Duration: now.Sub(time.Unix(0, exceededSlots[i].startNanos)), - }) + d.onSlowDisk(exceededSlots[i].name, exceededSlots[i].opType, now.Sub(time.Unix(0, exceededSlots[i].startNanos))) } case <-stopper: return @@ -627,14 +543,8 @@ func (d *diskHealthCheckingFS) Create(name string) (File, error) { if d.diskSlowThreshold == 0 { return f, nil } - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) { - d.onSlowDisk( - DiskSlowInfo{ - Path: name, - OpType: opType, - WriteSize: writeSizeInBytes, - Duration: duration, - }) + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) { + d.onSlowDisk(name, opType, duration) }) checkingFile.startTicker() return checkingFile, nil @@ -748,14 +658,8 @@ func (d *diskHealthCheckingFS) ReuseForWrite(oldname, newname string) (File, err if d.diskSlowThreshold == 0 { return f, nil } - checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, writeSizeInBytes int, duration time.Duration) { - d.onSlowDisk( - DiskSlowInfo{ - Path: newname, - OpType: opType, - WriteSize: writeSizeInBytes, - Duration: duration, - }) + checkingFile := newDiskHealthCheckingFile(f, d.diskSlowThreshold, func(opType OpType, duration time.Duration) { + d.onSlowDisk(newname, opType, duration) }) checkingFile.startTicker() return checkingFile, nil diff --git a/vfs/disk_health_test.go b/vfs/disk_health_test.go index 77c48fd056..fc972e6e33 100644 --- a/vfs/disk_health_test.go +++ b/vfs/disk_health_test.go @@ -205,207 +205,87 @@ func (m mockFS) GetDiskUsage(path string) (DiskUsage, error) { var _ FS = &mockFS{} func TestDiskHealthChecking_File(t *testing.T) { - oldTickInterval := defaultTickInterval - defaultTickInterval = time.Millisecond - defer func() { defaultTickInterval = oldTickInterval }() - const ( - slowThreshold = 50 * time.Millisecond + slowThreshold = 1 * time.Second + syncAndWriteDuration = 3 * time.Second ) - - fiveKB := make([]byte, 5*writeSizePrecision) testCases := []struct { - op OpType - writeSize int - writeDuration time.Duration - fn func(f File) - createWriteDelta time.Duration - expectStall bool + op OpType + fn func(f File) }{ - // No false negatives. { - op: OpTypeWrite, - writeSize: 5 * writeSizePrecision, // five KB - writeDuration: 100 * time.Millisecond, - fn: func(f File) { f.Write(fiveKB) }, - expectStall: true, + OpTypeWrite, + func(f File) { f.Write([]byte("uh oh")) }, }, { - op: OpTypeSync, - writeSize: 0, - writeDuration: 100 * time.Millisecond, - fn: func(f File) { f.Sync() }, - expectStall: true, - }, - // No false positives. - { - op: OpTypeWrite, - writeSize: 5, - writeDuration: 25 * time.Millisecond, - fn: func(f File) { f.Write([]byte("uh oh")) }, - createWriteDelta: 100 * time.Millisecond, - expectStall: false, - }, - { - op: OpTypeSync, - writeSize: 0, - writeDuration: 25 * time.Millisecond, - fn: func(f File) { f.Sync() }, - expectStall: false, + OpTypeSync, + func(f File) { f.Sync() }, }, } for _, tc := range testCases { t.Run(tc.op.String(), func(t *testing.T) { - diskSlow := make(chan DiskSlowInfo, 1) + type info struct { + opType OpType + duration time.Duration + } + diskSlow := make(chan info, 1) mockFS := &mockFS{create: func(name string) (File, error) { - return mockFile{syncAndWriteDuration: tc.writeDuration}, nil + return mockFile{syncAndWriteDuration: syncAndWriteDuration}, nil }} fs, closer := WithDiskHealthChecks(mockFS, slowThreshold, - func(info DiskSlowInfo) { - diskSlow <- info + func(s string, opType OpType, duration time.Duration) { + diskSlow <- info{ + opType: opType, + duration: duration, + } }) defer closer.Close() dhFile, _ := fs.Create("test") defer dhFile.Close() - // Writing after file creation tests computation of delta between file - // creation time & write time. - time.Sleep(tc.createWriteDelta) - tc.fn(dhFile) - - if tc.expectStall { // no false negatives - select { - case i := <-diskSlow: - d := i.Duration - if d.Seconds() < slowThreshold.Seconds() { - t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) - } - require.Equal(t, tc.writeSize, i.WriteSize) - require.Equal(t, tc.op, i.OpType) - case <-time.After(200 * time.Millisecond): - t.Fatal("disk stall detector did not detect slow disk operation") - } - } else { // no false positives - select { - case <-diskSlow: - t.Fatal("disk stall detector detected a slow disk operation") - case <-time.After(200 * time.Millisecond): - return + select { + case i := <-diskSlow: + d := i.duration + if d.Seconds() < slowThreshold.Seconds() { + t.Fatalf("expected %0.1f to be greater than threshold %0.1f", d.Seconds(), slowThreshold.Seconds()) } + require.Equal(t, tc.op, i.opType) + case <-time.After(5 * time.Second): + t.Fatal("disk stall detector did not detect slow disk operation") } }) } } func TestDiskHealthChecking_NotTooManyOps(t *testing.T) { - numBitsForOpType := 64 - deltaBits - writeSizeBits + numBitsForOpType := 64 - nOffsetBits numOpTypesAllowed := int(math.Pow(2, float64(numBitsForOpType))) numOpTypes := int(opTypeMax) require.LessOrEqual(t, numOpTypes, numOpTypesAllowed) } -func TestDiskHealthChecking_File_PackingAndUnpacking(t *testing.T) { - testCases := []struct { - desc string - delta time.Duration - writeSize int64 - opType OpType - wantDelta time.Duration - wantWriteSize int - }{ - // Write op with write size in bytes. - { - desc: "write, sized op", - delta: 3000 * time.Millisecond, - writeSize: 1024, // 1 KB. - opType: OpTypeWrite, - wantDelta: 3000 * time.Millisecond, - wantWriteSize: 1024, - }, - // Sync op. No write size. Max-ish delta that packing scheme can handle. - { - desc: "sync, no write size", - delta: 34 * time.Hour * 24 * 365, - writeSize: 0, - opType: OpTypeSync, - wantDelta: 34 * time.Hour * 24 * 365, - wantWriteSize: 0, - }, - // Delta is negative (e.g. due to clock sync). Set to - // zero. - { - desc: "delta negative", - delta: -5, - writeSize: 5120, // 5 KB - opType: OpTypeWrite, - wantDelta: 0, - wantWriteSize: 5120, - }, - // Write size in bytes is larger than can fit in 20 bits. - // Round down to max that can fit in 20 bits. - { - desc: "write size truncated", - delta: 231 * time.Millisecond, - writeSize: 2097152000, // too big! - opType: OpTypeWrite, - wantDelta: 231 * time.Millisecond, - wantWriteSize: 1073740800, // (2^20-1) * writeSizePrecision ~= a bit less than one GB - }, - // Write size in bytes is max representable less than the ceiling. - { - desc: "write size barely not truncated", - delta: 231 * time.Millisecond, - writeSize: 1073739776, // max representable less than the ceiling - opType: OpTypeWrite, - wantDelta: 231 * time.Millisecond, - wantWriteSize: 1073739776, // since can fit, unchanged - }, - } - for _, tc := range testCases { - t.Run(tc.desc, func(t *testing.T) { - packed := pack(tc.delta, tc.writeSize, tc.opType) - gotDelta, gotWriteSize, gotOpType := unpack(packed) - - require.Equal(t, tc.wantDelta, gotDelta) - require.Equal(t, tc.wantWriteSize, gotWriteSize) - require.Equal(t, tc.opType, gotOpType) - }) - } -} - func TestDiskHealthChecking_File_Underflow(t *testing.T) { f := &mockFile{} - hcFile := newDiskHealthCheckingFile(f, 1*time.Second, func(opType OpType, writeSizeInBytes int, duration time.Duration) { + hcFile := newDiskHealthCheckingFile(f, 1*time.Second, func(opType OpType, duration time.Duration) { // We expect to panic before sending the event. t.Fatalf("unexpected slow disk event") }) defer hcFile.Close() - t.Run("too large delta leads to panic", func(t *testing.T) { - // Given the packing scheme, 35 years of process uptime will lead to a delta - // that is too large to fit in the packed int64. - tCreate := time.Now().Add(-35 * time.Hour * 24 * 365) - hcFile.createTime = tCreate + // Set the file creation to the UNIX epoch, which is earlier than the max + // offset of the health check. + tEpoch := time.Unix(0, 0) + hcFile.createTime = tEpoch - // Assert that the time since tCreate (in milliseconds) is indeed greater - // than the max delta that can fit. - require.True(t, time.Since(tCreate).Milliseconds() > 1< 1<