Skip to content

Commit

Permalink
storage: support disk stall tracing
Browse files Browse the repository at this point in the history
Currently, in the event of a disk stall we don't have visibility into
the sequence of disk events that led up to the failure due to the 10s
frequency at which we export disk metrics. This commit adds support for
storing a history of disk events from the previous 30s and in the event of
a stall, logs a trace.

Fixes: #120506.
Informs: #89786.

Epic: None.
Release note: None.
  • Loading branch information
CheranMahalingam committed Apr 4, 2024
1 parent 43109d5 commit 8426ef6
Show file tree
Hide file tree
Showing 15 changed files with 490 additions and 84 deletions.
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,7 +2425,7 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Time",
Help: "Weighted time spent reading from or writing to the store's disk since this process started (as reported by the OS)",
}
metaIopsInProgress = metric.Metadata{
metaDiskIopsInProgress = metric.Metadata{
Name: "storage.disk.iopsinprogress",
Unit: metric.Unit_COUNT,
Measurement: "Operations",
Expand Down Expand Up @@ -2840,7 +2840,7 @@ type StoreMetrics struct {
DiskWriteTime *metric.Gauge
DiskIOTime *metric.Gauge
DiskWeightedIOTime *metric.Gauge
IopsInProgress *metric.Gauge
DiskIopsInProgress *metric.Gauge
}

type tenantMetricsRef struct {
Expand Down Expand Up @@ -3592,7 +3592,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
DiskWriteTime: metric.NewGauge(metaDiskWriteTime),
DiskIOTime: metric.NewGauge(metaDiskIOTime),
DiskWeightedIOTime: metric.NewGauge(metaDiskWeightedIOTime),
IopsInProgress: metric.NewGauge(metaIopsInProgress),
DiskIopsInProgress: metric.NewGauge(metaDiskIopsInProgress),

// Estimated MVCC stats in split.
SplitsWithEstimatedStats: metric.NewCounter(metaSplitEstimatedStats),
Expand Down Expand Up @@ -3815,7 +3815,7 @@ func (sm *StoreMetrics) updateDiskStats(stats disk.Stats) {
sm.DiskWriteTime.Update(int64(stats.WritesDuration))
sm.DiskIOTime.Update(int64(stats.CumulativeDuration))
sm.DiskWeightedIOTime.Update(int64(stats.WeightedIODuration))
sm.IopsInProgress.Update(int64(stats.InProgressCount))
sm.DiskIopsInProgress.Update(int64(stats.InProgressCount))
}

func (sm *StoreMetrics) handleMetricsResult(ctx context.Context, metric result.Metrics) {
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,10 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
return Engines{}, errors.Errorf("%f%% of %s's total free space is only %s bytes, which is below the minimum requirement of %s",
spec.Size.Percent, spec.Path, humanizeutil.IBytes(sizeInBytes), humanizeutil.IBytes(base.MinimumStoreSize))
}
monitor, err := cfg.DiskMonitorManager.Monitor(spec.Path)
if err != nil {
return Engines{}, errors.Wrap(err, "creating disk monitor")
}

detail(redact.Sprintf("store %d: max size %s, max open file limit %d", i, humanizeutil.IBytes(sizeInBytes), openFileLimitPerStore))

Expand All @@ -834,6 +838,7 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) {
addCfgOpt(storage.SharedStorage(sharedStorage))
}
addCfgOpt(storage.SecondaryCache(storage.SecondaryCacheBytes(cfg.SecondaryCache, du)))
addCfgOpt(storage.DiskMonitor(monitor))
// If the spec contains Pebble options, set those too.
if spec.PebbleOptions != "" {
addCfgOpt(storage.PebbleOptions(spec.PebbleOptions, &pebble.ParseHooks{
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog/fetchpb",
"//pkg/storage/disk",
"//pkg/storage/enginepb",
"//pkg/storage/fs",
"//pkg/storage/pebbleiter",
Expand Down
14 changes: 4 additions & 10 deletions pkg/storage/disk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"linux_parse.go",
"monitor.go",
"monitor_tracer.go",
"platform_default.go",
"platform_linux.go",
"stats.go",
Expand Down Expand Up @@ -35,22 +36,15 @@ go_test(
srcs = [
"linux_parse_test.go",
"monitor_test.go",
"monitor_tracer_test.go",
],
data = glob(["testdata/**"]),
embed = [":disk"],
deps = [
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/syncutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_pebble//vfs",
"@com_github_stretchr_testify//require",
] + select({
"@io_bazel_rules_go//go/platform:android": [
"@com_github_cockroachdb_datadriven//:datadriven",
],
"@io_bazel_rules_go//go/platform:linux": [
"@com_github_cockroachdb_datadriven//:datadriven",
],
"//conditions:default": [],
}),
],
)
4 changes: 2 additions & 2 deletions pkg/storage/disk/linux_parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import (
// 12 I/Os currently in progress
// 13 time spent doing I/Os (ms)
// 14 weighted time spent doing I/Os (ms)
func parseDiskStats(contents []byte, disks []*monitoredDisk) error {
func parseDiskStats(contents []byte, disks []*monitoredDisk, measuredAt time.Time) error {
for lineNum := 0; len(contents) > 0; lineNum++ {
lineBytes, rest := splitLine(contents)
line := unsafe.String(&lineBytes[0], len(lineBytes))
Expand Down Expand Up @@ -153,7 +153,7 @@ func parseDiskStats(contents []byte, disks []*monitoredDisk) error {
} else if ok {
stats.FlushesDuration = time.Duration(millis) * time.Millisecond
}
disks[diskIdx].recordStats(stats)
disks[diskIdx].recordStats(measuredAt, stats)
}
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/disk/linux_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ func TestLinux_CollectDiskStats(t *testing.T) {
v, err = strconv.ParseUint(cmdArg.Vals[1], 10, 32)
require.NoError(t, err)
deviceID.minor = uint32(v)
disks = append(disks, &monitoredDisk{deviceID: deviceID})
tracer := newMonitorTracer(1000)
disks = append(disks, &monitoredDisk{deviceID: deviceID, tracer: tracer})
}
slices.SortFunc(disks, func(a, b *monitoredDisk) int { return compareDeviceIDs(a.deviceID, b.deviceID) })

Expand All @@ -69,11 +70,15 @@ func TestLinux_CollectDiskStats(t *testing.T) {
return err.Error()
}
for i := range disks {
monitor := Monitor{disks[i]}
stats, err := monitor.CumulativeStats()
require.NoError(t, err)

if i > 0 {
fmt.Fprintln(&buf)
}
fmt.Fprintf(&buf, "%s: ", disks[i].deviceID)
fmt.Fprint(&buf, disks[i].stats.lastMeasurement.String())
fmt.Fprint(&buf, stats.String())
}
return buf.String()
default:
Expand Down
75 changes: 41 additions & 34 deletions pkg/storage/disk/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
)

var defaultDiskStatsPollingInterval = envutil.EnvOrDefaultDuration("COCKROACH_DISK_STATS_POLLING_INTERVAL", 100*time.Millisecond)
var defaultDiskTracePeriod = envutil.EnvOrDefaultDuration("COCKROACH_DISK_TRACE_PERIOD", 30*time.Second)

// DeviceID uniquely identifies block devices.
type DeviceID struct {
Expand Down Expand Up @@ -79,7 +80,11 @@ func (m *MonitorManager) Monitor(path string) (*Monitor, error) {
}

if disk == nil {
disk = &monitoredDisk{manager: m, deviceID: dev}
disk = &monitoredDisk{
manager: m,
tracer: newMonitorTracer(int(defaultDiskTracePeriod / defaultDiskStatsPollingInterval)),
deviceID: dev,
}
m.mu.disks = append(m.mu.disks, disk)

// The design maintains the invariant that the disk stat polling loop
Expand Down Expand Up @@ -153,9 +158,11 @@ func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct

if err := collector.collect(disks); err != nil {
for i := range disks {
disks[i].stats.Lock()
disks[i].stats.err = err
disks[i].stats.Unlock()
disks[i].tracer.RecordEvent(traceEvent{
time: timeutil.Now(),
stats: Stats{},
err: err,
})
}
}
}
Expand All @@ -164,6 +171,7 @@ func (m *MonitorManager) monitorDisks(collector statsCollector, stop chan struct

type monitoredDisk struct {
manager *MonitorManager
tracer *monitorTracer
deviceID DeviceID
// Tracks the number of Monitors observing stats on this disk. Once
// the count is zero, the MonitorManager no longer needs to collect stats
Expand All @@ -172,28 +180,19 @@ type monitoredDisk struct {
// for ensuring that the monitoredDisk is a singleton which relies on refCount
// being modified atomically.
refCount int

stats struct {
syncutil.Mutex
err error
lastMeasurement Stats
}
}

func (m *monitoredDisk) recordStats(stats Stats) {
m.stats.Lock()
defer m.stats.Unlock()
m.stats.lastMeasurement = stats
m.stats.err = nil
func (m *monitoredDisk) recordStats(t time.Time, stats Stats) {
m.tracer.RecordEvent(traceEvent{
time: t,
stats: stats,
err: nil,
})
}

// Monitor provides statistics for an individual disk.
type Monitor struct {
*monitoredDisk

// prevIncrement and prevIncrementAt are used to compute incremental stats.
prevIncrement Stats
prevIncrementAt time.Time
}

func (m *Monitor) Close() {
Expand All @@ -205,26 +204,34 @@ func (m *Monitor) Close() {

// CumulativeStats returns the most-recent stats observed.
func (m *Monitor) CumulativeStats() (Stats, error) {
m.stats.Lock()
defer m.stats.Unlock()
if m.stats.err != nil {
return Stats{}, m.stats.err
if event, err := m.tracer.Latest(); err != nil {
return Stats{}, err
} else if event.err != nil {
return Stats{}, event.err
} else {
return event.stats, nil
}
return m.stats.lastMeasurement, nil
}

// IncrementalStats computes the change in stats since the last time IncrementalStats
// was invoked for this monitor. The first time IncrementalStats is invoked, it returns
// an empty struct.
func (m *Monitor) IncrementalStats() (Stats, error) {
stats, err := m.CumulativeStats()
// IncrementalStats computes the change in stats over a period, delta.
func (m *Monitor) IncrementalStats(delta time.Duration) (Stats, error) {
event, err := m.tracer.Latest()
if err != nil {
return Stats{}, err
}
if m.prevIncrementAt.IsZero() {
m.prevIncrementAt = timeutil.Now()
m.prevIncrement = stats
return Stats{}, nil
if event.err != nil {
return Stats{}, event.err
}
return stats.delta(&m.prevIncrement), nil
prevEvent, err := m.tracer.Find(timeutil.Now().Add(-delta))
if err != nil {
return Stats{}, err
}
if prevEvent.err != nil {
return Stats{}, prevEvent.err
}
return event.stats.delta(&prevEvent.stats), nil
}

func (m *Monitor) LogTrace() string {
return m.tracer.String()
}
50 changes: 23 additions & 27 deletions pkg/storage/disk/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -73,11 +72,11 @@ func TestMonitor_Close(t *testing.T) {
monitor2 := Monitor{monitoredDisk: testDisk}

monitor1.Close()
require.Equal(t, testDisk.refCount, 1)
require.Equal(t, 1, testDisk.refCount)

monitor1.Close()
// Subsequent calls to a closed monitor should not reduce refCount.
require.Equal(t, testDisk.refCount, 1)
require.Equal(t, 1, testDisk.refCount)

go monitor2.Close()
// If there are no monitors, stop the stat polling loop.
Expand All @@ -86,40 +85,37 @@ func TestMonitor_Close(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("Failed to receive stop signal")
}
require.Equal(t, testDisk.refCount, 0)
require.Equal(t, 0, testDisk.refCount)
}

func TestMonitor_IncrementalStats(t *testing.T) {
testDisk := &monitoredDisk{
stats: struct {
syncutil.Mutex
err error
lastMeasurement Stats
}{
lastMeasurement: Stats{
ReadsCount: 1,
InProgressCount: 3,
},
},
now := time.Now()
testDisk := monitoredDisk{
tracer: newMonitorTracer(3),
}
monitor := Monitor{monitoredDisk: testDisk}

// First attempt at getting incremental stats should return empty stats.
stats, err := monitor.IncrementalStats()
require.NoError(t, err)
require.Equal(t, stats, Stats{})

testDisk.stats.lastMeasurement = Stats{
monitor := Monitor{monitoredDisk: &testDisk}

testDisk.recordStats(now.Add(-10*time.Second), Stats{
ReadsCount: 1,
InProgressCount: 3,
})
// First attempt at getting incremental stats should fail since no data was
// collected at the specified time.
stats, err := monitor.IncrementalStats(time.Minute)
require.Error(t, err)
require.Equal(t, Stats{}, stats)

testDisk.recordStats(now, Stats{
ReadsCount: 2,
InProgressCount: 2,
}
})
wantIncremental := Stats{
ReadsCount: 1,
// InProgressCount is a gauge so the increment should not be computed.
InProgressCount: 2,
}

stats, err = monitor.IncrementalStats()
// Tracer should compute diff using data recorded over 10 seconds ago.
stats, err = monitor.IncrementalStats(5 * time.Second)
require.NoError(t, err)
require.Equal(t, stats, wantIncremental)
require.Equal(t, wantIncremental, stats)
}
Loading

0 comments on commit 8426ef6

Please sign in to comment.