Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: expose pebble flush utilization #89459

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int
}

func verifyStorageStats(t *testing.T, s *kvserver.Store) {
if err := s.ComputeMetrics(context.Background(), 0); err != nil {
if err := s.ComputeMetrics(context.Background()); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) {
sinceExpBasedLeaseStart := timeutil.Since(timeutil.Unix(0, l.Start.WallTime))
for i := 0; i < tc.NumServers(); i++ {
s, _ := getFirstStoreReplica(t, tc.Server(i), keys.Meta1Prefix)
require.NoError(t, s.ComputeMetrics(ctx, 0))
require.NoError(t, s.ComputeMetrics(ctx))
maxBehind := time.Duration(s.Metrics().ClosedTimestampMaxBehindNanos.Value())
// We want to make sure that maxBehind ends up being much smaller than the
// start of an expiration based lease.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestReplicaRaftOverload(t *testing.T) {
// See: https://github.com/cockroachdb/cockroach/issues/84252
require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo"))
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
require.NoError(t, s1.ComputeMetrics(ctx))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 {
return errors.New("no paused followers")
}
Expand All @@ -95,7 +95,7 @@ func TestReplicaRaftOverload(t *testing.T) {
require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */))
testutils.SucceedsSoon(t, func() error {
s1 := tc.GetFirstStoreFromServer(t, 0)
require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */))
require.NoError(t, s1.ComputeMetrics(ctx))
if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 {
return errors.Errorf("%d paused followers", n)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) {
var expirationLeases int64
var epochLeases int64
for i := range tc.Servers {
if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background(), 0); err != nil {
if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background()); err != nil {
return err
}
metrics = tc.GetFirstStoreFromServer(t, i).Metrics()
Expand Down Expand Up @@ -4835,7 +4835,7 @@ func TestUninitializedMetric(t *testing.T) {
targetStore := tc.GetFirstStoreFromServer(t, 1)

// Force the store to compute the replica metrics
require.NoError(t, targetStore.ComputeMetrics(ctx, 0))
require.NoError(t, targetStore.ComputeMetrics(ctx))

// Blocked snapshot on the second server (1) should realize 1 uninitialized replica.
require.Equal(t, int64(1), targetStore.Metrics().UninitializedCount.Value())
Expand All @@ -4845,7 +4845,7 @@ func TestUninitializedMetric(t *testing.T) {
require.NoError(t, <-addReplicaErr)

// Again force the store to compute metrics, increment tick counter 0 -> 1
require.NoError(t, targetStore.ComputeMetrics(ctx, 1))
require.NoError(t, targetStore.ComputeMetrics(ctx))

// There should now be no uninitialized replicas in the recorded metrics
require.Equal(t, int64(0), targetStore.Metrics().UninitializedCount.Value())
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,6 +1663,13 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Occurrences",
Unit: metric.Unit_COUNT,
}

metaStorageFlushUtilization = metric.Metadata{
Name: "storage.flush.utilization",
Help: "The percentage of time the storage engine is actively flushing memtables to disk.",
Measurement: "Flush Utilization",
Unit: metric.Unit_PERCENT,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand Down Expand Up @@ -1957,6 +1964,8 @@ type StoreMetrics struct {
// Replica batch evaluation metrics.
ReplicaReadBatchEvaluationLatency *metric.Histogram
ReplicaWriteBatchEvaluationLatency *metric.Histogram

FlushUtilization *metric.GaugeFloat64
}

type tenantMetricsRef struct {
Expand Down Expand Up @@ -2494,6 +2503,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
ReplicaWriteBatchEvaluationLatency: metric.NewHistogram(
metaReplicaWriteBatchEvaluationLatency, histogramWindow, metric.IOLatencyBuckets,
),
FlushUtilization: metric.NewGaugeFloat64(metaStorageFlushUtilization),
}

{
Expand Down
57 changes: 42 additions & 15 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) {
// Metrics depend in part on the system config. Compute them as soon as we
// get the first system config, then periodically in the background
// (managed by the Node).
if err := s.ComputeMetrics(ctx, -1); err != nil {
if err := s.ComputeMetrics(ctx); err != nil {
log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err)
}
log.Event(ctx, "computed initial metrics")
Expand Down Expand Up @@ -3315,29 +3315,25 @@ func (s *Store) checkpoint(ctx context.Context, tag string) (string, error) {
return checkpointDir, nil
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
//
// The tick argument should increment across repeated calls to this
// method. It is used to compute some metrics less frequently than others.
func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
// computeMetrics is a common metric computation that is used by
// ComputeMetricsPeriodically and ComputeMetrics to compute metrics
func (s *Store) computeMetrics(ctx context.Context) (m storage.Metrics, err error) {
ctx = s.AnnotateCtx(ctx)
if err := s.updateCapacityGauges(ctx); err != nil {
return err
if err = s.updateCapacityGauges(ctx); err != nil {
return m, err
}
if err := s.updateReplicationGauges(ctx); err != nil {
return err
if err = s.updateReplicationGauges(ctx); err != nil {
return m, err
}

// Get the latest engine metrics.
m := s.engine.GetMetrics()
m = s.engine.GetMetrics()
s.metrics.updateEngineMetrics(m)

// Get engine Env stats.
envStats, err := s.engine.GetEnvStats()
if err != nil {
return err
return m, err
}
s.metrics.updateEnvStats(*envStats)

Expand All @@ -3349,6 +3345,29 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
s.metrics.RdbCheckpoints.Update(int64(len(dirs)))
}

return m, nil
}

// ComputeMetricsPeriodically computes metrics that need to be computed
// periodically along with the regular metrics
func (s *Store) ComputeMetricsPeriodically(
ctx context.Context, prevMetrics *storage.Metrics, tick int,
) (m storage.Metrics, err error) {
m, err = s.computeMetrics(ctx)
if err != nil {
return m, err
}
wt := m.Flush.WriteThroughput

if prevMetrics != nil {
wt.Subtract(prevMetrics.Flush.WriteThroughput)
}
flushUtil := 0.0
if wt.WorkDuration > 0 {
flushUtil = float64(wt.WorkDuration) / float64(wt.WorkDuration+wt.IdleDuration)
}
s.metrics.FlushUtilization.Update(flushUtil)

// Log this metric infrequently (with current configurations,
// every 10 minutes). Trigger on tick 1 instead of tick 0 so that
// non-periodic callers of this method don't trigger expensive
Expand All @@ -3370,7 +3389,15 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error {
e.StoreId = int32(s.StoreID())
log.StructuredEvent(ctx, &e)
}
return nil
return m, nil
}

// ComputeMetrics immediately computes the current value of store metrics which
// cannot be computed incrementally. This method should be invoked periodically
// by a higher-level system which records store metrics.
func (s *Store) ComputeMetrics(ctx context.Context) error {
_, err := s.computeMetrics(ctx)
return err
}

// ClusterNodeCount returns this store's view of the number of nodes in the
Expand Down
17 changes: 13 additions & 4 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,11 +742,12 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
_ = stopper.RunAsyncTask(ctx, "compute-metrics", func(ctx context.Context) {
// Compute periodic stats at the same frequency as metrics are sampled.
ticker := time.NewTicker(interval)
previousMetrics := make(map[*kvserver.Store]*storage.Metrics)
defer ticker.Stop()
for tick := 0; ; tick++ {
select {
case <-ticker.C:
if err := n.computePeriodicMetrics(ctx, tick); err != nil {
if err := n.computeMetricsPeriodically(ctx, previousMetrics, tick); err != nil {
log.Errorf(ctx, "failed computing periodic metrics: %s", err)
}
case <-stopper.ShouldQuiesce():
Expand All @@ -756,12 +757,20 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time.
})
}

// computePeriodicMetrics instructs each store to compute the value of
// computeMetricsPeriodically instructs each store to compute the value of
// complicated metrics.
func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error {
func (n *Node) computeMetricsPeriodically(
ctx context.Context, storeToMetrics map[*kvserver.Store]*storage.Metrics, tick int,
) error {
return n.stores.VisitStores(func(store *kvserver.Store) error {
if err := store.ComputeMetrics(ctx, tick); err != nil {
if newMetrics, err := store.ComputeMetricsPeriodically(ctx, storeToMetrics[store], tick); err != nil {
log.Warningf(ctx, "%s: unable to compute metrics: %s", store, err)
} else {
if storeToMetrics[store] == nil {
storeToMetrics[store] = &newMetrics
} else {
*storeToMetrics[store] = newMetrics
}
}
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ func TestNodeStatusWritten(t *testing.T) {
// were multiple replicas, more care would need to be taken in the initial
// syncFeed().
forceWriteStatus := func() {
if err := ts.node.computePeriodicMetrics(ctx, 0); err != nil {
if err := ts.node.computeMetricsPeriodically(ctx, map[*kvserver.Store]*storage.Metrics{}, 0); err != nil {
t.Fatalf("error publishing store statuses: %s", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -344,7 +345,7 @@ func startServer(t *testing.T) *TestServer {
// Make sure the node status is available. This is done by forcing stores to
// publish their status, synchronizing to the event feed with a canary
// event, and then forcing the server to write summaries immediately.
if err := ts.node.computePeriodicMetrics(context.Background(), 0); err != nil {
if err := ts.node.computeMetricsPeriodically(context.Background(), map[*kvserver.Store]*storage.Metrics{}, 0); err != nil {
t.Fatalf("error publishing store statuses: %s", err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ func (tc *TestCluster) WaitForFullReplication() error {
if err := s.ForceReplicationScanAndProcess(); err != nil {
return err
}
if err := s.ComputeMetrics(context.TODO(), 0); err != nil {
if err := s.ComputeMetrics(context.TODO()); err != nil {
// This can sometimes fail since ComputeMetrics calls
// updateReplicationGauges which needs the system config gossiped.
log.Infof(context.TODO(), "%v", err)
Expand Down
4 changes: 4 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3072,6 +3072,10 @@ var charts = []sectionDescription{
"storage.l6-level-score",
},
},
{
Title: "Flush Utilization",
Metrics: []string{"storage.flush.utilization"},
},
},
},
{
Expand Down