Skip to content

Commit

Permalink
metrics: compute interval metrics without GetInternalIntervalMetrics
Browse files Browse the repository at this point in the history
Remove dependence on the `GetInternalIntervalMetrics` as this method
resets the interval following each call. This results in issues if two
clients interact with `GetInternalIntervalMetrics` as they will reset
the other client’s interval. Ex: Client A calls every 5 seconds and
Client B calls every
10 seconds. Client B will end up getting metrics at a 5-second interval
   since `GetInternalIntervalMetrics` resets the interval on each call.

The `admission.StoreMetrics` are updated to remove the
`InternalIntervalField` as these metrics need to be computed based off
the current metrics `admission.StoreMetrics.Metrics` and the previous
metrics. The storage of the previous metrics is left up to the client.
Removing the field was chosen instead of setting it to a `nil` value as
it would be confusing to return the `StoreMetrics` and require the
client to populate the field when they compute the  nesscary metrics.

For the admission control use case in `SetPebbleMetricsProvider`  the
last metrics are initialized at the start of `SetPebbleMetricsProvider`.
`SetPebbleMetricsProvder` starts a go routine that retrieves and process
new metrics. This go routine has been updated to compute the
`InternalIntervalMetrics` between the previous metrics and the current
metrics. Additionally it update the previous metrics to the current
metrics.  In order to pass the computed metrics a wrapper struct is
used:

```go
type StoreMetricsWithDiff struct {
	*StoreMetrics
	*pebble.InternalIntervalMetrics
}
```

Additionally, make a copy of the `pebble.InternalIntervalMetrics` in
order to move away from using the `pebble` version of the interval
metrics as these metrics are now the responsibility of the caller to
compute.

Release note: None
  • Loading branch information
coolcom200 committed Oct 4, 2022
1 parent b5e54ff commit c72f41d
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/BurntSushi/toml v0.4.1
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/Masterminds/semver/v3 v3.1.1
github.com/MichaelTJones/walk v0.0.0-20161122175330-4748e29d5718
github.com/PuerkitoBio/goquery v1.5.1
Expand Down Expand Up @@ -192,7 +193,6 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/Masterminds/goutils v1.1.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/sprig v2.22.0+incompatible // indirect
Expand Down
10 changes: 4 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,17 +856,15 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics {
var metrics []admission.StoreMetrics
_ = n.stores.VisitStores(func(store *kvserver.Store) error {
m := store.Engine().GetMetrics()
im := store.Engine().GetInternalIntervalMetrics()
diskStats := admission.DiskStats{ProvisionedBandwidth: clusterProvisionedBandwidth}
if s, ok := storeIDToDiskStats[store.StoreID()]; ok {
diskStats = s
}
metrics = append(metrics, admission.StoreMetrics{
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
InternalIntervalMetrics: im,
DiskStats: diskStats})
StoreID: int32(store.StoreID()),
Metrics: m.Metrics,
WriteStallCount: m.WriteStallCount,
DiskStats: diskStats})
return nil
})
return metrics
Expand Down
6 changes: 0 additions & 6 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,12 +929,6 @@ type Engine interface {
// storage min version is at least the target version.
MinVersionIsAtLeastTargetVersion(target roachpb.Version) (bool, error)

// GetInternalIntervalMetrics returns low-level metrics from Pebble, that
// are reset at every interval, where an interval is defined over successive
// calls to this method. Hence, this should be used with care, with only one
// caller, which is currently the admission control subsystem.
GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics

// SetCompactionConcurrency is used to set the engine's compaction
// concurrency. It returns the previous compaction concurrency.
SetCompactionConcurrency(n uint64) uint64
Expand Down
5 changes: 0 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1528,11 +1528,6 @@ func (p *Pebble) GetMetrics() Metrics {
}
}

// GetInternalIntervalMetrics implements the Engine interface.
func (p *Pebble) GetInternalIntervalMetrics() *pebble.InternalIntervalMetrics {
return p.db.InternalIntervalMetrics()
}

// GetEncryptionRegistries implements the Engine interface.
func (p *Pebble) GetEncryptionRegistries() (*EncryptionRegistries, error) {
rv := &EncryptionRegistries{}
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//record",
"@com_github_cockroachdb_redact//:redact",
"@com_github_hdrhistogram_hdrhistogram_go//:hdrhistogram-go",
],
)

Expand Down
46 changes: 42 additions & 4 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"
"unsafe"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/redact"
)

Expand Down Expand Up @@ -77,6 +80,11 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
sgc.pebbleMetricsProvider = pmp
sgc.closeCh = make(chan struct{})
metrics := sgc.pebbleMetricsProvider.GetPebbleMetrics()
var lastMetrics []*pebble.Metrics
storeMetricsWithDiff := StoreAndIntervalMetrics{
StoreMetrics: nil,
IntervalMetrics: &IntervalMetrics{},
}
for _, m := range metrics {
gc := sgc.initGrantCoordinator(m.StoreID)
// Defensive call to LoadAndStore even though Store ought to be sufficient
Expand All @@ -86,7 +94,9 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
if !loaded {
sgc.numStores++
}
gc.pebbleMetricsTick(startupCtx, m)
lastMetrics = append(lastMetrics, m.Metrics)
storeMetricsWithDiff.StoreMetrics = &m
gc.pebbleMetricsTick(startupCtx, storeMetricsWithDiff)
gc.allocateIOTokensTick()
}
if sgc.disableTickerForTesting {
Expand All @@ -109,10 +119,13 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
log.Warningf(ctx,
"expected %d store metrics and found %d metrics", sgc.numStores, len(metrics))
}
for _, m := range metrics {
for idx, m := range metrics {
if unsafeGc, ok := sgc.gcMap.Load(int64(m.StoreID)); ok {
gc := (*GrantCoordinator)(unsafeGc)
gc.pebbleMetricsTick(ctx, m)
storeMetricsWithDiff.StoreMetrics = &m
storeMetricsWithDiff.IntervalMetrics = computeIntervalMetrics(m.Metrics, lastMetrics[idx])
lastMetrics[idx] = m.Metrics
gc.pebbleMetricsTick(ctx, storeMetricsWithDiff)
iotc.UpdateIOThreshold(roachpb.StoreID(m.StoreID), gc.ioLoadListener.ioThreshold)
} else {
log.Warningf(ctx,
Expand All @@ -135,6 +148,31 @@ func (sgc *StoreGrantCoordinators) SetPebbleMetricsProvider(
}()
}

func computeIntervalMetrics(m1, m2 *pebble.Metrics) *IntervalMetrics {
// Copy the relevant metrics, ensuring to perform a deep clone of the histogram
// to avoid mutating it.
logDelta := m1.LogWriter
if m1.LogWriter.SyncLatencyMicros != nil {
logDelta.SyncLatencyMicros = hdrhistogram.Import(m1.LogWriter.SyncLatencyMicros.Export())
}
flushDelta := m1.Flush.WriteThroughput

// Subtract the cumulative metrics at the time of the last InternalIntervalMetrics call,
// if any, in order to compute the delta.
if m2 != nil {
logDelta.Subtract(&m2.LogWriter)
flushDelta.Subtract(m2.Flush.WriteThroughput)
}

iim := &IntervalMetrics{}
iim.LogWriter.PendingBufferUtilization = logDelta.PendingBufferLen.Mean() / record.CapAllocatedBlocks
iim.LogWriter.SyncQueueUtilization = logDelta.SyncQueueLen.Mean() / record.SyncConcurrency
iim.LogWriter.SyncLatencyMicros = logDelta.SyncLatencyMicros
iim.LogWriter.WriteThroughput = logDelta.WriteThroughput
iim.Flush.WriteThroughput = flushDelta
return iim
}

func (sgc *StoreGrantCoordinators) initGrantCoordinator(storeID int32) *GrantCoordinator {
coord := &GrantCoordinator{
settings: sgc.settings,
Expand Down Expand Up @@ -612,7 +650,7 @@ func NewGrantCoordinatorSQL(
// pebbleMetricsTick is called every adjustmentInterval seconds and passes
// through to the ioLoadListener, so that it can adjust the plan for future IO
// token allocations.
func (coord *GrantCoordinator) pebbleMetricsTick(ctx context.Context, m StoreMetrics) {
func (coord *GrantCoordinator) pebbleMetricsTick(ctx context.Context, m StoreAndIntervalMetrics) {
coord.ioLoadListener.pebbleMetricsTick(ctx, m)
}

Expand Down
42 changes: 41 additions & 1 deletion pkg/util/admission/granter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package admission
import (
"time"

"github.com/HdrHistogram/hdrhistogram-go"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
Expand Down Expand Up @@ -643,11 +644,50 @@ type StoreMetrics struct {
StoreID int32
*pebble.Metrics
WriteStallCount int64
*pebble.InternalIntervalMetrics
// Optional.
DiskStats DiskStats
}

// IntervalMetrics exposes metrics about internal subsystems, that can
// be useful for deep observability purposes, and for higher-level admission
// control systems that are trying to estimate the capacity of the DB.
//
// These represent the metrics over the interval of time from the last call to
// retrieve these metrics. These are not cumulative, unlike Metrics.
type IntervalMetrics struct {
// LogWriter metrics.
LogWriter struct {
// WriteThroughput is the WAL throughput.
WriteThroughput pebble.ThroughputMetric
// PendingBufferUtilization is the utilization of the WAL writer's
// finite-sized pending blocks buffer. It provides an additional signal
// regarding how close to "full" the WAL writer is. The value is in the
// interval [0,1].
PendingBufferUtilization float64
// SyncQueueUtilization is the utilization of the WAL writer's
// finite-sized queue of work that is waiting to sync. The value is in the
// interval [0,1].
SyncQueueUtilization float64
// SyncLatencyMicros is a distribution of the fsync latency observed by
// the WAL writer. It can be nil if there were no fsyncs.
SyncLatencyMicros *hdrhistogram.Histogram
}
// Flush loop metrics.
Flush struct {
// WriteThroughput is the flushing throughput.
WriteThroughput pebble.ThroughputMetric
}
// NB: the LogWriter throughput and the Flush throughput are not directly
// comparable because the former does not compress, unlike the latter.
}

// StoreAndIntervalMetrics is a wrapper around StoreMetrics that also contains
// the IntervalMetrics
type StoreAndIntervalMetrics struct {
*StoreMetrics
*IntervalMetrics
}

// DiskStats provide low-level stats about the disk resources used for a
// store. We assume that the disk is not shared across multiple stores.
// However, transient and moderate usage that is not due to the store is
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/admission/io_load_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func cumLSMWriteAndIngestedBytes(

// pebbleMetricsTicks is called every adjustmentInterval seconds, and decides
// the token allocations until the next call.
func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreMetrics) {
func (io *ioLoadListener) pebbleMetricsTick(ctx context.Context, metrics StoreAndIntervalMetrics) {
ctx = logtags.AddTag(ctx, "s", io.storeID)
m := metrics.Metrics
if !io.statsInitialized {
Expand Down Expand Up @@ -378,12 +378,12 @@ func computeIntervalDiskLoadInfo(
// inability to flush fast enough can result in write stalls due to high
// memtable counts, which we want to avoid as it can cause latency hiccups of
// 100+ms for all write traffic.
func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreMetrics) {
func (io *ioLoadListener) adjustTokens(ctx context.Context, metrics StoreAndIntervalMetrics) {
sas := io.kvRequester.getStoreAdmissionStats()
// Copy the cumulative disk banwidth values for later use.
cumDiskBW := io.ioLoadListenerState.diskBW
res := io.adjustTokensInner(ctx, io.ioLoadListenerState,
metrics.Levels[0], metrics.WriteStallCount, metrics.InternalIntervalMetrics,
metrics.Levels[0], metrics.WriteStallCount, metrics.IntervalMetrics,
L0FileCountOverloadThreshold.Get(&io.settings.SV),
L0SubLevelCountOverloadThreshold.Get(&io.settings.SV),
MinFlushUtilizationFraction.Get(&io.settings.SV),
Expand Down Expand Up @@ -480,7 +480,7 @@ func (*ioLoadListener) adjustTokensInner(
prev ioLoadListenerState,
l0Metrics pebble.LevelMetrics,
cumWriteStallCount int64,
im *pebble.InternalIntervalMetrics,
im *IntervalMetrics,
threshNumFiles, threshNumSublevels int64,
minFlushUtilTargetFraction float64,
) adjustTokensResult {
Expand Down
36 changes: 20 additions & 16 deletions pkg/util/admission/io_load_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestIOLoadListener(t *testing.T) {
WorkDuration: time.Duration(flushWorkSec) * time.Second,
IdleDuration: time.Duration(flushIdleSec) * time.Second,
}
im := &pebble.InternalIntervalMetrics{}
im := &IntervalMetrics{}
im.Flush.WriteThroughput = flushMetric
var writeStallCount int
if d.HasArg("write-stall-count") {
Expand Down Expand Up @@ -151,15 +151,17 @@ func TestIOLoadListener(t *testing.T) {
if d.HasArg("print-only-first-tick") {
d.ScanArgs(t, "print-only-first-tick", &printOnlyFirstTick)
}
ioll.pebbleMetricsTick(ctx, StoreMetrics{
Metrics: &metrics,
WriteStallCount: int64(writeStallCount),
InternalIntervalMetrics: im,
DiskStats: DiskStats{
BytesRead: uint64(bytesRead),
BytesWritten: uint64(bytesWritten),
ProvisionedBandwidth: int64(provisionedBandwidth),
ioll.pebbleMetricsTick(ctx, StoreAndIntervalMetrics{
StoreMetrics: &StoreMetrics{
Metrics: &metrics,
WriteStallCount: int64(writeStallCount),
DiskStats: DiskStats{
BytesRead: uint64(bytesRead),
BytesWritten: uint64(bytesWritten),
ProvisionedBandwidth: int64(provisionedBandwidth),
},
},
IntervalMetrics: im,
})
var buf strings.Builder
// Do the ticks until just before next adjustment.
Expand Down Expand Up @@ -213,9 +215,9 @@ func TestIOLoadListenerOverflow(t *testing.T) {
NumFiles: 10000,
}
ioll.pebbleMetricsTick(ctx,
StoreMetrics{Metrics: &m, InternalIntervalMetrics: &pebble.InternalIntervalMetrics{}})
StoreAndIntervalMetrics{StoreMetrics: &StoreMetrics{Metrics: &m}, IntervalMetrics: &IntervalMetrics{}})
ioll.pebbleMetricsTick(ctx,
StoreMetrics{Metrics: &m, InternalIntervalMetrics: &pebble.InternalIntervalMetrics{}})
StoreAndIntervalMetrics{StoreMetrics: &StoreMetrics{Metrics: &m}, IntervalMetrics: &IntervalMetrics{}})
ioll.allocateTokensTick()
}

Expand Down Expand Up @@ -257,7 +259,7 @@ func TestAdjustTokensInnerAndLogging(t *testing.T) {
for _, tt := range tests {
buf.Printf("%s:\n", tt.name)
res := (*ioLoadListener)(nil).adjustTokensInner(
ctx, tt.prev, tt.l0Metrics, 12, &pebble.InternalIntervalMetrics{},
ctx, tt.prev, tt.l0Metrics, 12, &IntervalMetrics{},
100, 10, 0.50)
buf.Printf("%s\n", res)
}
Expand Down Expand Up @@ -302,10 +304,12 @@ func TestBadIOLoadListenerStats(t *testing.T) {
ioll.mu.kvGranter = kvGranter
for i := 0; i < 100; i++ {
randomValues()
ioll.pebbleMetricsTick(ctx, StoreMetrics{
Metrics: &m,
InternalIntervalMetrics: &pebble.InternalIntervalMetrics{},
DiskStats: d,
ioll.pebbleMetricsTick(ctx, StoreAndIntervalMetrics{
StoreMetrics: &StoreMetrics{
Metrics: &m,
DiskStats: d,
},
IntervalMetrics: &IntervalMetrics{},
})
for j := 0; j < ticksInAdjustmentInterval; j++ {
ioll.allocateTokensTick()
Expand Down

0 comments on commit c72f41d

Please sign in to comment.