Skip to content

Commit

Permalink
metrics: expose pebble flush utilization
Browse files Browse the repository at this point in the history
Create a new `GaugeFloat64` metric for pebble’s flush utilization. This
metric is not cumulative, rather, it is the metric over an interval.
This interval is determined by the `interval` parameter of the
`Node.startComputePeriodicMetrics` method.

In order to compute the metric over an interval the previous value of
the metric must be stored. As a result, a map is constructed that takes
a pointer to a store and maps it to a pointer to storage metrics:
`make(map[*kvserver.Store]*storage.Metrics)`. This map is passed to
`node.computeMetricsPeriodically` which gets the store to calculate its
metrics and then updates the previous metrics in the map.

Refactor `store.go`'s metric calculation by separating
`ComputeMetrics(ctx context.Context, tick int) error` into two methods:

* `ComputeMetrics(ctx context.Context) error`
* `ComputeMetricsPeriodically(ctx context.Context, prevMetrics
  *storage.Metrics, tick int) (m storage.Metrics, err error)`

Both methods call the `computeMetrics` which contains the common code
between the two calls. Before this, the process for retrieving metrics
instantaneous was to pass a tick value such as `-1` or `0` to the
`ComputeMetrics(ctx context.Context, tick int)` however it can be
done with a call to `ComputeMetrics(ctx context.Context)`

The `store.ComputeMetricsPeriodically` method will also return the
latest storage metrics. These metrics are used to update the mapping
between stores and metrics used for computing the metric delta over an
interval.

Release note: None
  • Loading branch information
coolcom200 committed Oct 12, 2022
1 parent 5515ce7 commit dd28d67
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 29 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int
}

func verifyStorageStats(t *testing.T, s *kvserver.Store) {
if err := s.ComputeMetrics(context.Background(), 0); err != nil {
if err := s.ComputeMetrics(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) {
sinceExpBasedLeaseStart := timeutil.Since(timeutil.Unix(0, l.Start.WallTime))
for i := 0; i < tc.NumServers(); i++ {
s, _ := getFirstStoreReplica(t, tc.Server(i), keys.Meta1Prefix)
require.NoError(t, s.ComputeMetrics(ctx, 0))
require.NoError(t, s.ComputeMetrics(ctx))
maxBehind := time.Duration(s.Metrics().ClosedTimestampMaxBehindNanos.Value())
// We want to make sure that maxBehind ends up being much smaller than the
// start of an expiration based lease.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestReplicaRaftOverload(t *testing.T) {
// See: https://github.com/cockroachdb/cockroach/issues/84252
require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo"))
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
require.NoError(t, s1.ComputeMetrics(ctx))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 {
return errors.New("no paused followers")
}
Expand All @@ -95,7 +95,7 @@ func TestReplicaRaftOverload(t *testing.T) {
require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */))
testutils.SucceedsSoon(t, func() error {
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
require.NoError(t, s1.ComputeMetrics(ctx))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 {
return errors.Errorf("%d paused followers", n)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) {
var expirationLeases int64
var epochLeases int64
for i := range tc.Servers {
if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background(), 0); err != nil {
if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background()); err != nil {
return err
}
metrics = tc.GetFirstStoreFromServer(t, i).Metrics()
Expand Down Expand Up @@ -4835,7 +4835,7 @@ func TestUninitializedMetric(t *testing.T) {
targetStore := tc.GetFirstStoreFromServer(t, 1)

// Force the store to compute the replica metrics
require.NoError(t, targetStore.ComputeMetrics(ctx, 0))
require.NoError(t, targetStore.ComputeMetrics(ctx))

// Blocked snapshot on the second server (1) should realize 1 uninitialized replica.
require.Equal(t, int64(1), targetStore.Metrics().UninitializedCount.Value())
Expand All @@ -4845,7 +4845,7 @@ func TestUninitializedMetric(t *testing.T) {
require.NoError(t, <-addReplicaErr)

// Again force the store to compute metrics, increment tick counter 0 -> 1
require.NoError(t, targetStore.ComputeMetrics(ctx, 1))
require.NoError(t, targetStore.ComputeMetrics(ctx))

// There should now be no uninitialized replicas in the recorded metrics
require.Equal(t, int64(0), targetStore.Metrics().UninitializedCount.Value())
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,13 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Occurrences",
Unit: metric.Unit_COUNT,
}

metaPebbleFlushUtilization = metric.Metadata{
Name: "pebble.flush.utilization",
Help: "The percentage of time spent flushing in the pebble flush loop",
Measurement: "Flush Utilization",
Unit: metric.Unit_PERCENT,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand Down Expand Up @@ -1957,6 +1964,8 @@ type StoreMetrics struct {
// Replica batch evaluation metrics.
ReplicaReadBatchEvaluationLatency *metric.Histogram
ReplicaWriteBatchEvaluationLatency *metric.Histogram

FlushUtilization *metric.GaugeFloat64
}

type tenantMetricsRef struct {
Expand Down Expand Up @@ -2494,6 +2503,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicaWriteBatchEvaluationLatency: metric.NewHistogram(
metaReplicaWriteBatchEvaluationLatency, histogramWindow, metric.IOLatencyBuckets,
),
FlushUtilization: metric.NewGaugeFloat64(metaPebbleFlushUtilization),
}

{
Expand Down
57 changes: 42 additions & 15 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
// Metrics depend in part on the system config. Compute them as soon as we
// get the first system config, then periodically in the background
// (managed by the Node).
if err := s.ComputeMetrics(ctx, -1); err != nil {
if err := s.ComputeMetrics(ctx); err != nil {
log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err)
}
log.Event(ctx, "computed initial metrics")
Expand Down Expand Up @@ -3315,29 +3315,25 @@ func (s *Store) checkpoint(ctx context.Context, tag string) (string, error) {
return checkpointDir, nil
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
//
// The tick argument should increment across repeated calls to this
// method. It is used to compute some metrics less frequently than others.
func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
// computeMetrics is a common metric computation that is used by
// ComputeMetricsPeriodically and ComputeMetrics to compute metrics
func (s *Store) computeMetrics(ctx context.Context) (m storage.Metrics, err error) {
ctx = s.AnnotateCtx(ctx)
if err := s.updateCapacityGauges(ctx); err != nil {
return err
if err = s.updateCapacityGauges(ctx); err != nil {
return m, err
}
if err := s.updateReplicationGauges(ctx); err != nil {
return err
if err = s.updateReplicationGauges(ctx); err != nil {
return m, err
}

// Get the latest engine metrics.
m := s.engine.GetMetrics()
m = s.engine.GetMetrics()
s.metrics.updateEngineMetrics(m)

// Get engine Env stats.
envStats, err := s.engine.GetEnvStats()
if err != nil {
return err
return m, err
}
s.metrics.updateEnvStats(*envStats)

Expand All @@ -3349,6 +3345,29 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
s.metrics.RdbCheckpoints.Update(int64(len(dirs)))
}

return m, nil
}

// ComputeMetricsPeriodically computes metrics that need to be computed
// periodically along with the regular metrics
func (s *Store) ComputeMetricsPeriodically(
ctx context.Context, prevMetrics *storage.Metrics, tick int,
) (m storage.Metrics, err error) {
m, err = s.computeMetrics(ctx)
if err != nil {
return m, err
}
wt := m.Flush.WriteThroughput

if prevMetrics != nil {
wt.Subtract(prevMetrics.Flush.WriteThroughput)
}
flushUtil := 0.0
if wt.WorkDuration > 0 {
flushUtil = float64(wt.WorkDuration) / float64(wt.WorkDuration+wt.IdleDuration)
}
s.metrics.FlushUtilization.Update(flushUtil)

// Log this metric infrequently (with current configurations,
// every 10 minutes). Trigger on tick 1 instead of tick 0 so that
// non-periodic callers of this method don't trigger expensive
Expand All @@ -3370,7 +3389,15 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
e.StoreId = int32(s.StoreID())
log.StructuredEvent(ctx, &e)
}
return nil
return m, nil
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
func (s *Store) ComputeMetrics(ctx context.Context) error {
_, err := s.computeMetrics(ctx)
return err
}

// ClusterNodeCount returns this store's view of the number of nodes in the
Expand Down
17 changes: 13 additions & 4 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,12 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
_ = stopper.RunAsyncTask(ctx, "compute-metrics", func(ctx context.Context) {
// Compute periodic stats at the same frequency as metrics are sampled.
ticker := time.NewTicker(interval)
previousMetrics := make(map[*kvserver.Store]*storage.Metrics)
defer ticker.Stop()
for tick := 0; ; tick++ {
select {
case <-ticker.C:
if err := n.computePeriodicMetrics(ctx, tick); err != nil {
if err := n.computeMetricsPeriodically(ctx, previousMetrics, tick); err != nil {
log.Errorf(ctx, "failed computing periodic metrics: %s", err)
}
case <-stopper.ShouldQuiesce():
Expand All @@ -756,12 +757,20 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
})
}

// computePeriodicMetrics instructs each store to compute the value of
// computeMetricsPeriodically instructs each store to compute the value of
// complicated metrics.
func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
func (n *Node) computeMetricsPeriodically(
ctx context.Context, storeToMetrics map[*kvserver.Store]*storage.Metrics, tick int,
) error {
return n.stores.VisitStores(func(store *kvserver.Store) error {
if err := store.ComputeMetrics(ctx, tick); err != nil {
if newMetrics, err := store.ComputeMetricsPeriodically(ctx, storeToMetrics[store], tick); err != nil {
log.Warningf(ctx, "%s: unable to compute metrics: %s", store, err)
} else {
if storeToMetrics[store] == nil {
storeToMetrics[store] = &newMetrics
} else {
*storeToMetrics[store] = newMetrics
}
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func TestNodeStatusWritten(t *testing.T) {
// were multiple replicas, more care would need to be taken in the initial
// syncFeed().
forceWriteStatus := func() {
if err := ts.node.computePeriodicMetrics(ctx, 0); err != nil {
if err := ts.node.computeMetricsPeriodically(ctx, map[*kvserver.Store]*storage.Metrics{}, 0); err != nil {
t.Fatalf("error publishing store statuses: %s", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -344,7 +345,7 @@ func startServer(t *testing.T) *TestServer {
// Make sure the node status is available. This is done by forcing stores to
// publish their status, synchronizing to the event feed with a canary
// event, and then forcing the server to write summaries immediately.
if err := ts.node.computePeriodicMetrics(context.Background(), 0); err != nil {
if err := ts.node.computeMetricsPeriodically(context.Background(), map[*kvserver.Store]*storage.Metrics{}, 0); err != nil {
t.Fatalf("error publishing store statuses: %s", err)
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ type Pebble struct {
diskSlowCount int64
diskStallCount int64

onMetricCallback pebble.MetricCallbacks

// Relevant options copied over from pebble.Options.
fs vfs.FS
unencryptedFS vfs.FS
Expand Down Expand Up @@ -931,6 +933,14 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
return nil, err
}

cfg.Opts.OnMetrics = pebble.MetricCallbacks{
LogWriterFsyncLatency: func(duration int64) {
if p.onMetricCallback.LogWriterFsyncLatency != nil {
p.onMetricCallback.LogWriterFsyncLatency(duration)
}
},
}

db, err := pebble.Open(cfg.StorageConfig.Dir, cfg.Opts)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1693,6 +1703,11 @@ func (p *Pebble) RegisterFlushCompletedCallback(cb func()) {
p.mu.Unlock()
}

// RegisterMetricCallbacks implements the Engine interface
func (p *Pebble) RegisterMetricCallbacks(callbacks pebble.MetricCallbacks) {
p.onMetricCallback = callbacks
}

// Remove implements the FS interface.
func (p *Pebble) Remove(filename string) error {
return p.fs.Remove(filename)
Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ func (tc *TestCluster) WaitForFullReplication() error {
if err := s.ForceReplicationScanAndProcess(); err != nil {
return err
}
if err := s.ComputeMetrics(context.TODO(), 0); err != nil {
if err := s.ComputeMetrics(context.TODO()); err != nil {
// This can sometimes fail since ComputeMetrics calls
// updateReplicationGauges which needs the system config gossiped.
log.Infof(context.TODO(), "%v", err)
Expand Down

0 comments on commit dd28d67

Please sign in to comment.