Skip to content

Commit

Permalink
metrics: compute interval metrics without GetInternalIntervalMetrics
Browse files Browse the repository at this point in the history
Remove dependence on the `GetInternalIntervalMetrics` as this method
resets the interval following each call. This results in issues if two
clients interact with `GetInternalIntervalMetrics` as they will reset
the other client’s interval. Ex: Client A calls every 5 seconds and
Client B calls every 10 seconds. Client B will end up getting metrics at
a 5-second interval since `GetInternalIntervalMetrics` resets the
interval on each call.

The `admission.StoreMetrics` are updated to remove the
`InternalIntervalField` as these metrics need to be computed based off
the current metrics `admission.StoreMetrics.Metrics` and the previous
metrics. The storage of the previous metrics is left up to the client.
Removing the field was chosen instead of setting it to a `nil` value as
it would be confusing to return the `StoreMetrics` and require the
client to populate the field when they compute the necessary metrics.

For the admission control use case, the flush metrics are computed as
part of `ioLoadListener.adjustTokens` with the previous metrics being
initialized in `ioLoadListener.pebbleMetricsTick` and stored on the
`ioLoadListenerState` as `cumFlushWriteThroughput`.

Additionally, make a copy of the `pebble.InternalIntervalMetrics` in
order to move away from using the `pebble` version of the interval
metrics as these metrics are now the responsibility of the caller to
compute.

Release note: None
  • Loading branch information
coolcom200 committed Oct 6, 2022
1 parent b5e54ff commit 1e07034
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 75 deletions.
10 changes: 4 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,17 +856,15 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im,
DiskStats: diskStats})
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
DiskStats: diskStats})
return nil
})
return metrics
Expand Down
6 changes: 0 additions & 6 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,6 @@ type Engine interface {
// storage min version is at least the target version.
MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error)

// GetInternalIntervalMetrics returns low-level metrics from Pebble, that
// are reset at every interval, where an interval is defined over successive
// calls to this method. Hence, this should be used with care, with only one
// caller, which is currently the admission control subsystem.
GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics

// SetCompactionConcurrency is used to set the engine's compaction
// concurrency. It returns the previous compaction concurrency.
SetCompactionConcurrency(n uint64) uint64
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,11 +1528,6 @@ func (p *Pebble) GetMetrics() Metrics {
}
}

// GetInternalIntervalMetrics implements the Engine interface.
func (p *Pebble) GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics {
return p.db.InternalIntervalMetrics()
}

// GetEncryptionRegistries implements the Engine interface.
func (p *Pebble) GetEncryptionRegistries() (*EncryptionRegistries, error) {
rv := &EncryptionRegistries{}
Expand Down
1 change: 0 additions & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,6 @@ type StoreMetrics struct {
StoreID int32
*pebble.Metrics
WriteStallCount int64
*pebble.InternalIntervalMetrics
// Optional.
DiskStats DiskStats
}
Expand Down
33 changes: 24 additions & 9 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ type ioLoadListenerState struct {
// Gauge.
curL0Bytes int64
// Cumulative.
cumWriteStallCount int64
diskBW struct {
cumWriteStallCount int64
cumFlushWriteThroughput pebble.ThroughputMetric
diskBW struct {
// Cumulative
bytesRead uint64
bytesWritten uint64
Expand Down Expand Up @@ -309,10 +310,21 @@ func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMe
io.diskBW.bytesRead = metrics.DiskStats.BytesRead
io.diskBW.bytesWritten = metrics.DiskStats.BytesWritten
io.diskBW.incomingLSMBytes = cumLSMIncomingBytes
io.setCumulativeStoreInternalIntervalMetrics(metrics)
io.copyAuxEtcFromPerWorkEstimator()
return
}
io.adjustTokens(ctx, metrics)
io.setCumulativeStoreInternalIntervalMetrics(metrics)
}

func (io *ioLoadListener) setCumulativeStoreInternalIntervalMetrics(metrics StoreMetrics) {
wt := metrics.Flush.WriteThroughput
io.cumFlushWriteThroughput = pebble.ThroughputMetric{
Bytes: wt.Bytes,
WorkDuration: wt.WorkDuration,
IdleDuration: wt.IdleDuration,
}
}

// allocateTokensTick gives out 1/ticksInAdjustmentInterval of the
Expand Down Expand Up @@ -380,10 +392,13 @@ func computeIntervalDiskLoadInfo(
// 100+ms for all write traffic.
func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics) {
sas := io.kvRequester.getStoreAdmissionStats()
// Copy the cumulative disk banwidth values for later use.
// Copy the cumulative disk bandwidth values for later use.
cumDiskBW := io.ioLoadListenerState.diskBW
wt := metrics.Flush.WriteThroughput
wt.Subtract(io.cumFlushWriteThroughput)

res := io.adjustTokensInner(ctx, io.ioLoadListenerState,
metrics.Levels[0], metrics.WriteStallCount, metrics.InternalIntervalMetrics,
metrics.Levels[0], metrics.WriteStallCount, wt,
L0FileCountOverloadThreshold.Get(&io.settings.SV),
L0SubLevelCountOverloadThreshold.Get(&io.settings.SV),
MinFlushUtilizationFraction.Get(&io.settings.SV),
Expand Down Expand Up @@ -480,7 +495,7 @@ func (*ioLoadListener) adjustTokensInner(
prev ioLoadListenerState,
l0Metrics pebble.LevelMetrics,
cumWriteStallCount int64,
im *pebble.InternalIntervalMetrics,
flushWriteThroughput pebble.ThroughputMetric,
threshNumFiles, threshNumSublevels int64,
minFlushUtilTargetFraction float64,
) adjustTokensResult {
Expand Down Expand Up @@ -611,12 +626,12 @@ func (*ioLoadListener) adjustTokensInner(
// Compute flush utilization for this interval. A very low flush utilization
// will cause flush tokens to be unlimited.
intFlushUtilization := float64(0)
if im.Flush.WriteThroughput.WorkDuration > 0 {
intFlushUtilization = float64(im.Flush.WriteThroughput.WorkDuration) /
float64(im.Flush.WriteThroughput.WorkDuration+im.Flush.WriteThroughput.IdleDuration)
if flushWriteThroughput.WorkDuration > 0 {
intFlushUtilization = float64(flushWriteThroughput.WorkDuration) /
float64(flushWriteThroughput.WorkDuration+flushWriteThroughput.IdleDuration)
}
// Compute flush tokens for this interval that would cause 100% utilization.
intFlushTokens := float64(im.Flush.WriteThroughput.PeakRate()) * adjustmentInterval
intFlushTokens := float64(flushWriteThroughput.PeakRate()) * adjustmentInterval
intWriteStalls := cumWriteStallCount - prev.cumWriteStallCount

// Ensure flushUtilTargetFraction is in the configured bounds. This also
Expand Down
43 changes: 25 additions & 18 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ func TestIOLoadListener(t *testing.T) {
req := &testRequesterForIOLL{}
kvGranter := &testGranterWithIOTokens{}
var ioll *ioLoadListener
var cumFlushBytes int64
var cumFlushWork, cumFlushIdle time.Duration

ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
datadriven.RunTest(t, testutils.TestDataPath(t, "io_load_listener"),
Expand All @@ -56,6 +59,11 @@ func TestIOLoadListener(t *testing.T) {
// active.
ioll.mu.Mutex = &syncutil.Mutex{}
ioll.mu.kvGranter = kvGranter

// Reset the cumulative data
cumFlushBytes = 0
cumFlushWork = time.Duration(0)
cumFlushIdle = time.Duration(0)
return ""

case "prep-admission-stats":
Expand Down Expand Up @@ -112,13 +120,15 @@ func TestIOLoadListener(t *testing.T) {
d.ScanArgs(t, "flush-work-sec", &flushWorkSec)
d.ScanArgs(t, "flush-idle-sec", &flushIdleSec)
}
flushMetric := pebble.ThroughputMetric{
Bytes: int64(flushBytes),
WorkDuration: time.Duration(flushWorkSec) * time.Second,
IdleDuration: time.Duration(flushIdleSec) * time.Second,
}
im := &pebble.InternalIntervalMetrics{}
im.Flush.WriteThroughput = flushMetric

cumFlushIdle += time.Duration(flushIdleSec) * time.Second
cumFlushWork += time.Duration(flushWorkSec) * time.Second
cumFlushBytes += int64(flushBytes)

metrics.Flush.WriteThroughput.Bytes = cumFlushBytes
metrics.Flush.WriteThroughput.IdleDuration = cumFlushIdle
metrics.Flush.WriteThroughput.WorkDuration = cumFlushWork

var writeStallCount int
if d.HasArg("write-stall-count") {
d.ScanArgs(t, "write-stall-count", &writeStallCount)
Expand Down Expand Up @@ -151,10 +161,10 @@ func TestIOLoadListener(t *testing.T) {
if d.HasArg("print-only-first-tick") {
d.ScanArgs(t, "print-only-first-tick", &printOnlyFirstTick)
}

ioll.pebbleMetricsTick(ctx, StoreMetrics{
Metrics: &metrics,
WriteStallCount: int64(writeStallCount),
InternalIntervalMetrics: im,
Metrics: &metrics,
WriteStallCount: int64(writeStallCount),
DiskStats: DiskStats{
BytesRead: uint64(bytesRead),
BytesWritten: uint64(bytesWritten),
Expand Down Expand Up @@ -212,10 +222,8 @@ func TestIOLoadListenerOverflow(t *testing.T) {
Sublevels: 100,
NumFiles: 10000,
}
ioll.pebbleMetricsTick(ctx,
StoreMetrics{Metrics: &m, InternalIntervalMetrics: &pebble.InternalIntervalMetrics{}})
ioll.pebbleMetricsTick(ctx,
StoreMetrics{Metrics: &m, InternalIntervalMetrics: &pebble.InternalIntervalMetrics{}})
ioll.pebbleMetricsTick(ctx, StoreMetrics{Metrics: &m})
ioll.pebbleMetricsTick(ctx, StoreMetrics{Metrics: &m})
ioll.allocateTokensTick()
}

Expand Down Expand Up @@ -257,7 +265,7 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) {
for _, tt := range tests {
buf.Printf("%s:\n", tt.name)
res := (*ioLoadListener)(nil).adjustTokensInner(
ctx, tt.prev, tt.l0Metrics, 12, &pebble.InternalIntervalMetrics{},
ctx, tt.prev, tt.l0Metrics, 12, pebble.ThroughputMetric{},
100, 10, 0.50)
buf.Printf("%s\n", res)
}
Expand Down Expand Up @@ -303,9 +311,8 @@ func TestBadIOLoadListenerStats(t *testing.T) {
for i := 0; i < 100; i++ {
randomValues()
ioll.pebbleMetricsTick(ctx, StoreMetrics{
Metrics: &m,
InternalIntervalMetrics: &pebble.InternalIntervalMetrics{},
DiskStats: d,
Metrics: &m,
DiskStats: d,
})
for j := 0; j < ticksInAdjustmentInterval; j++ {
ioll.allocateTokensTick()
Expand Down
Loading

0 comments on commit 1e07034

Please sign in to comment.