Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] admission: add support for disk bandwidth as a bottleneck resource #82813

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,12 @@ var (
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaRdbWriteStallNanos = metric.Metadata{
Name: "storage.write-stall-nanos",
Help: "Total write stall duration in nanos",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

// Disk health metrics.
metaDiskSlow = metric.Metadata{
Expand Down Expand Up @@ -1535,6 +1541,7 @@ type StoreMetrics struct {
RdbL0NumFiles *metric.Gauge
RdbBytesIngested [7]*metric.Gauge // idx = level
RdbWriteStalls *metric.Gauge
RdbWriteStallNanos *metric.Gauge

// Disk health metrics.
DiskSlow *metric.Gauge
Expand Down Expand Up @@ -1999,6 +2006,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RdbL0NumFiles: metric.NewGauge(metaRdbL0NumFiles),
RdbBytesIngested: rdbBytesIngested,
RdbWriteStalls: metric.NewGauge(metaRdbWriteStalls),
RdbWriteStallNanos: metric.NewGauge(metaRdbWriteStallNanos),

// Disk health metrics.
DiskSlow: metric.NewGauge(metaDiskSlow),
Expand Down Expand Up @@ -2252,6 +2260,7 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.RdbMarkedForCompactionFiles.Update(int64(m.Compact.MarkedFiles))
sm.RdbNumSSTables.Update(m.NumSSTables())
sm.RdbWriteStalls.Update(m.WriteStallCount)
sm.RdbWriteStallNanos.Update(m.WriteStallDuration.Nanoseconds())
sm.DiskSlow.Update(m.DiskSlowCount)
sm.DiskStalled.Update(m.DiskStallCount)

Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3772,7 +3772,10 @@ func (n KVAdmissionControllerImpl) AdmitKVWork(
}
admissionEnabled := true
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb WriteBytes for ingest requests.
// TODO(sumeer): Plumb WriteBytes for ingest requests, and for all
// AddSSTableRequests, even if they are using IngestAsWrites. For the
// rest we don't know the size since have not evaluated yet. It will be
// known when AdmittedWorkDone is called, and so we can compensate then.
ah.storeWorkHandle, err = ah.storeAdmissionQ.Admit(
ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo})
if err != nil {
Expand Down Expand Up @@ -3803,8 +3806,10 @@ func (n KVAdmissionControllerImpl) AdmittedKVWorkDone(handle interface{}) {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
if ah.storeAdmissionQ != nil {
// TODO(sumeer): Plumb ingestedIntoL0Bytes and handle error return value.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, 0)
// TODO(sumeer): Plumb bytes to populate StoreWorkDoneInfo and handle
// error return value.
_ = ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle,
admission.StoreWorkDoneInfo{ActualBytes: 0, ActualBytesIntoL0: 0})
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ go_library(
"@com_github_gorilla_mux//:mux",
"@com_github_grpc_ecosystem_grpc_gateway//runtime:go_default_library",
"@com_github_grpc_ecosystem_grpc_gateway//utilities:go_default_library",
"@com_github_lufia_iostat//:iostat",
"@com_github_marusama_semaphore//:semaphore",
"@in_gopkg_yaml_v2//:yaml_v2",
"@io_etcd_go_etcd_raft_v3//:raft",
Expand Down
32 changes: 30 additions & 2 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -767,13 +768,40 @@ func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
})
}

var provisionedBandwidthHack = settings.RegisterIntSetting(
settings.SystemOnly, "hack.provisioned_bandwidth",
"hack that configures provisioned bandwidth for experiments",
20<<30).WithPublic()

// GetPebbleMetrics implements admission.PebbleMetricsProvider.
func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
driveStats, err := status.GetDriveStatsForAC(context.Background())
if err != nil {
panic(err)
}
// TODO: this summing is a hack for experimentation since we only
// expect one real drive (may also have a boot drive but it will see
// little IO). Eventually we need a way to map DriveStats.Name to
// each store.
var diskStats admission.DiskStats
for i := 0; i < len(driveStats); i++ {
log.Infof(context.Background(), "driveStats: %s w: %s r: %s", driveStats[i].Name,
humanizeutil.IBytes(driveStats[i].BytesWritten), humanizeutil.IBytes(driveStats[i].BytesRead))
diskStats.BytesRead += driveStats[i].BytesRead
diskStats.BytesWritten += driveStats[i].BytesWritten
}
diskStats.ProvisionedBandwidth = provisionedBandwidthHack.Get(&n.storeCfg.Settings.SV)
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
metrics = append(
metrics, admission.StoreMetrics{StoreID: int32(store.StoreID()), Metrics: m.Metrics})
im := store.Engine().GetInternalIntervalMetrics()
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im,
DiskStats: diskStats,
})
return nil
})
return metrics
Expand Down
16 changes: 16 additions & 0 deletions pkg/server/status/disk_counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,19 @@ func getDiskCounters(ctx context.Context) ([]diskStats, error) {

return output, nil
}

func GetDriveStatsForAC(ctx context.Context) ([]DriveStats, error) {
driveStats, err := disk.IOCountersWithContext(ctx)
if err != nil {
return nil, err
}
var stats []DriveStats
for _, counters := range driveStats {
stats = append(stats, DriveStats{
Name: counters.Name,
BytesRead: int64(counters.ReadBytes),
BytesWritten: int64(counters.WriteBytes),
})
}
return stats, nil
}
16 changes: 16 additions & 0 deletions pkg/server/status/disk_counters_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,19 @@ func getDiskCounters(context.Context) ([]diskStats, error) {

return output, nil
}

func GetDriveStatsForAC(context.Context) ([]DriveStats, error) {
driveStats, err := iostat.ReadDriveStats()
if err != nil {
return nil, err
}
var stats []DriveStats
for i := range driveStats {
stats = append(stats, DriveStats{
Name: driveStats[i].Name,
BytesRead: driveStats[i].BytesRead,
BytesWritten: driveStats[i].BytesWritten,
})
}
return stats, nil
}
6 changes: 6 additions & 0 deletions pkg/server/status/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,3 +708,9 @@ func GetCPUTime(ctx context.Context) (userTimeMillis, sysTimeMillis int64, err e
}
return int64(cpuTime.User), int64(cpuTime.Sys), nil
}

type DriveStats struct {
Name string
BytesRead int64
BytesWritten int64
}
9 changes: 8 additions & 1 deletion pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -978,6 +978,12 @@ type Engine interface {
// MinVersionIsAtLeastTargetVersion returns whether the engine's recorded
// storage min version is at least the target version.
MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error)

// GetInternalIntervalMetrics returns low-level metrics from Pebble, that
// are reset at every interval, where an interval is defined over successive
// calls to this method. Hence, this should be used with care, with only one
// caller, which is currently the admission control subsystem.
GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics
}

// Batch is the interface for batch specific operations.
Expand Down Expand Up @@ -1016,7 +1022,8 @@ type Metrics struct {
//
// We do not split this metric across these two reasons, but they can be
// distinguished in the pebble logs.
WriteStallCount int64
WriteStallCount int64
WriteStallDuration time.Duration
// DiskSlowCount counts the number of times Pebble records disk slowness.
DiskSlowCount int64
// DiskStallCount counts the number of times Pebble observes slow writes
Expand Down
39 changes: 32 additions & 7 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -723,9 +724,11 @@ type Pebble struct {

// Stats updated by pebble.EventListener invocations, and returned in
// GetMetrics. Updated and retrieved atomically.
writeStallCount int64
diskSlowCount int64
diskStallCount int64
writeStallCount int64
writeStallDuration time.Duration
writeStallStartNanos int64
diskSlowCount int64
diskStallCount int64

// Relevant options copied over from pebble.Options.
fs vfs.FS
Expand Down Expand Up @@ -996,6 +999,22 @@ func (p *Pebble) makeMetricEtcEventListener(ctx context.Context) pebble.EventLis
return pebble.EventListener{
WriteStallBegin: func(info pebble.WriteStallBeginInfo) {
atomic.AddInt64(&p.writeStallCount, 1)
startNanos := timeutil.Now().UnixNano()
atomic.StoreInt64(&p.writeStallStartNanos, startNanos)
},
WriteStallEnd: func() {
startNanos := atomic.SwapInt64(&p.writeStallStartNanos, 0)
if startNanos == 0 {
// Should not happen since these callbacks are registered when Pebble
// is opened, but just in case we miss the WriteStallBegin, lets not
// corrupt the metric.
return
}
stallDuration := timeutil.Now().UnixNano() - startNanos
if stallDuration < 0 {
return
}
atomic.AddInt64((*int64)(&p.writeStallDuration), stallDuration)
},
DiskSlow: func(info pebble.DiskSlowInfo) {
maxSyncDuration := maxSyncDurationDefault
Expand Down Expand Up @@ -1571,13 +1590,19 @@ func (p *Pebble) Flush() error {
func (p *Pebble) GetMetrics() Metrics {
m := p.db.Metrics()
return Metrics{
Metrics: m,
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
Metrics: m,
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
}
}

// GetInternalIntervalMetrics implements the Engine interface.
func (p *Pebble) GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics {
return p.db.InternalIntervalMetrics()
}

// GetEncryptionRegistries implements the Engine interface.
func (p *Pebble) GetEncryptionRegistries() (*EncryptionRegistries, error) {
rv := &EncryptionRegistries{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "admission",
srcs = [
"disk_bandwidth.go",
"doc.go",
"granter.go",
"work_queue.go",
Expand All @@ -24,6 +25,7 @@ go_library(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_redact//:redact",
"@com_github_hdrhistogram_hdrhistogram_go//:hdrhistogram-go",
],
)

Expand Down
Loading