From 8f1d48f4bd17312185baca524f93064ed02fc89d Mon Sep 17 00:00:00 2001 From: Leon Fattakhov Date: Wed, 28 Sep 2022 13:37:14 -0400 Subject: [PATCH] metrics: expose pebble flush utilization MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create a new `GaugeFloat64` metric for pebble’s flush utilization. This metric is not cumulative, rather, it is the metric over an interval. This interval is determined by the `interval` parameter of the `Node.startComputePeriodicMetrics` method. In order to compute the metric over an interval the previous value of the metric must be stored. As a result, a map is constructed that takes a pointer to a store and maps it to a pointer to storage metrics: `make(map[*kvserver.Store]*storage.Metrics)`. This map is passed to `node.computeMetricsPeriodically` which gets the store to calculate its metrics and then updates the previous metrics in the map. Refactor `store.go`'s metric calculation by separating `ComputeMetrics(ctx context.Context, tick int) error` into two methods: * `ComputeMetrics(ctx context.Context) error` * `ComputeMetricsPeriodically(ctx context.Context, prevMetrics *storage.Metrics, tick int) (m storage.Metrics, err error)` Both methods call the `computeMetrics` which contains the common code between the two calls. Before this, the process for retrieving metrics instantaneous was to pass a tick value such as `-1` or `0` to the `ComputeMetrics(ctx context.Context, tick int)` however it can be done with a call to `ComputeMetrics(ctx context.Context)` The `store.ComputeMetricsPeriodically` method will also return the latest storage metrics. These metrics are used to update the mapping between stores and metrics used for computing the metric delta over an interval. Release note: None --- pkg/kv/kvserver/client_metrics_test.go | 4 +- .../client_replica_raft_overload_test.go | 4 +- pkg/kv/kvserver/client_replica_test.go | 6 +- pkg/kv/kvserver/metrics.go | 10 ++++ pkg/kv/kvserver/store.go | 57 ++++++++++++++----- pkg/server/node.go | 17 ++++-- pkg/server/node_test.go | 2 +- pkg/server/status_test.go | 3 +- pkg/testutils/testcluster/testcluster.go | 2 +- pkg/ts/catalog/chart_catalog.go | 4 ++ 10 files changed, 80 insertions(+), 29 deletions(-) diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 2312ba1ad5fe..38be14e0ddea 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -137,7 +137,7 @@ func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int } func verifyStorageStats(t *testing.T, s *kvserver.Store) { - if err := s.ComputeMetrics(context.Background(), 0); err != nil { + if err := s.ComputeMetrics(context.Background()); err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) { sinceExpBasedLeaseStart := timeutil.Since(timeutil.Unix(0, l.Start.WallTime)) for i := 0; i < tc.NumServers(); i++ { s, _ := getFirstStoreReplica(t, tc.Server(i), keys.Meta1Prefix) - require.NoError(t, s.ComputeMetrics(ctx, 0)) + require.NoError(t, s.ComputeMetrics(ctx)) maxBehind := time.Duration(s.Metrics().ClosedTimestampMaxBehindNanos.Value()) // We want to make sure that maxBehind ends up being much smaller than the // start of an expiration based lease. diff --git a/pkg/kv/kvserver/client_replica_raft_overload_test.go b/pkg/kv/kvserver/client_replica_raft_overload_test.go index 13babeacbd7c..678b567deb80 100644 --- a/pkg/kv/kvserver/client_replica_raft_overload_test.go +++ b/pkg/kv/kvserver/client_replica_raft_overload_test.go @@ -80,7 +80,7 @@ func TestReplicaRaftOverload(t *testing.T) { // See: https://github.com/cockroachdb/cockroach/issues/84252 require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo")) s1 := tc.GetFirstStoreFromServer(t, 0) - require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + require.NoError(t, s1.ComputeMetrics(ctx)) if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 { return errors.New("no paused followers") } @@ -95,7 +95,7 @@ func TestReplicaRaftOverload(t *testing.T) { require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) testutils.SucceedsSoon(t, func() error { s1 := tc.GetFirstStoreFromServer(t, 0) - require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + require.NoError(t, s1.ComputeMetrics(ctx)) if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 { return errors.Errorf("%d paused followers", n) } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4b50b3e397b7..68628d76b833 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2024,7 +2024,7 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) { var expirationLeases int64 var epochLeases int64 for i := range tc.Servers { - if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background(), 0); err != nil { + if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background()); err != nil { return err } metrics = tc.GetFirstStoreFromServer(t, i).Metrics() @@ -4835,7 +4835,7 @@ func TestUninitializedMetric(t *testing.T) { targetStore := tc.GetFirstStoreFromServer(t, 1) // Force the store to compute the replica metrics - require.NoError(t, targetStore.ComputeMetrics(ctx, 0)) + require.NoError(t, targetStore.ComputeMetrics(ctx)) // Blocked snapshot on the second server (1) should realize 1 uninitialized replica. require.Equal(t, int64(1), targetStore.Metrics().UninitializedCount.Value()) @@ -4845,7 +4845,7 @@ func TestUninitializedMetric(t *testing.T) { require.NoError(t, <-addReplicaErr) // Again force the store to compute metrics, increment tick counter 0 -> 1 - require.NoError(t, targetStore.ComputeMetrics(ctx, 1)) + require.NoError(t, targetStore.ComputeMetrics(ctx)) // There should now be no uninitialized replicas in the recorded metrics require.Equal(t, int64(0), targetStore.Metrics().UninitializedCount.Value()) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index abe976485fa1..fb1fd54795e9 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1663,6 +1663,13 @@ Note that the measurement does not include the duration for replicating the eval Measurement: "Occurrences", Unit: metric.Unit_COUNT, } + + metaStorageFlushUtilization = metric.Metadata{ + Name: "storage.flush.utilization", + Help: "The percentage of time the storage engine is actively flushing memtables to disk.", + Measurement: "Flush Utilization", + Unit: metric.Unit_PERCENT, + } ) // StoreMetrics is the set of metrics for a given store. @@ -1957,6 +1964,8 @@ type StoreMetrics struct { // Replica batch evaluation metrics. ReplicaReadBatchEvaluationLatency *metric.Histogram ReplicaWriteBatchEvaluationLatency *metric.Histogram + + FlushUtilization *metric.GaugeFloat64 } type tenantMetricsRef struct { @@ -2494,6 +2503,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ReplicaWriteBatchEvaluationLatency: metric.NewHistogram( metaReplicaWriteBatchEvaluationLatency, histogramWindow, metric.IOLatencyBuckets, ), + FlushUtilization: metric.NewGaugeFloat64(metaStorageFlushUtilization), } { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2bbdc0b27177..8e1db2082c3a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2354,7 +2354,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { // Metrics depend in part on the system config. Compute them as soon as we // get the first system config, then periodically in the background // (managed by the Node). - if err := s.ComputeMetrics(ctx, -1); err != nil { + if err := s.ComputeMetrics(ctx); err != nil { log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err) } log.Event(ctx, "computed initial metrics") @@ -3315,29 +3315,25 @@ func (s *Store) checkpoint(ctx context.Context, tag string) (string, error) { return checkpointDir, nil } -// ComputeMetrics immediately computes the current value of store metrics which -// cannot be computed incrementally. This method should be invoked periodically -// by a higher-level system which records store metrics. -// -// The tick argument should increment across repeated calls to this -// method. It is used to compute some metrics less frequently than others. -func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { +// computeMetrics is a common metric computation that is used by +// ComputeMetricsPeriodically and ComputeMetrics to compute metrics +func (s *Store) computeMetrics(ctx context.Context) (m storage.Metrics, err error) { ctx = s.AnnotateCtx(ctx) - if err := s.updateCapacityGauges(ctx); err != nil { - return err + if err = s.updateCapacityGauges(ctx); err != nil { + return m, err } - if err := s.updateReplicationGauges(ctx); err != nil { - return err + if err = s.updateReplicationGauges(ctx); err != nil { + return m, err } // Get the latest engine metrics. - m := s.engine.GetMetrics() + m = s.engine.GetMetrics() s.metrics.updateEngineMetrics(m) // Get engine Env stats. envStats, err := s.engine.GetEnvStats() if err != nil { - return err + return m, err } s.metrics.updateEnvStats(*envStats) @@ -3349,6 +3345,29 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { s.metrics.RdbCheckpoints.Update(int64(len(dirs))) } + return m, nil +} + +// ComputeMetricsPeriodically computes metrics that need to be computed +// periodically along with the regular metrics +func (s *Store) ComputeMetricsPeriodically( + ctx context.Context, prevMetrics *storage.Metrics, tick int, +) (m storage.Metrics, err error) { + m, err = s.computeMetrics(ctx) + if err != nil { + return m, err + } + wt := m.Flush.WriteThroughput + + if prevMetrics != nil { + wt.Subtract(prevMetrics.Flush.WriteThroughput) + } + flushUtil := 0.0 + if wt.WorkDuration > 0 { + flushUtil = float64(wt.WorkDuration) / float64(wt.WorkDuration+wt.IdleDuration) + } + s.metrics.FlushUtilization.Update(flushUtil) + // Log this metric infrequently (with current configurations, // every 10 minutes). Trigger on tick 1 instead of tick 0 so that // non-periodic callers of this method don't trigger expensive @@ -3370,7 +3389,15 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { e.StoreId = int32(s.StoreID()) log.StructuredEvent(ctx, &e) } - return nil + return m, nil +} + +// ComputeMetrics immediately computes the current value of store metrics which +// cannot be computed incrementally. This method should be invoked periodically +// by a higher-level system which records store metrics. +func (s *Store) ComputeMetrics(ctx context.Context) error { + _, err := s.computeMetrics(ctx) + return err } // ClusterNodeCount returns this store's view of the number of nodes in the diff --git a/pkg/server/node.go b/pkg/server/node.go index caccb33a1a61..7bbed0232836 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -742,11 +742,12 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time. _ = stopper.RunAsyncTask(ctx, "compute-metrics", func(ctx context.Context) { // Compute periodic stats at the same frequency as metrics are sampled. ticker := time.NewTicker(interval) + previousMetrics := make(map[*kvserver.Store]*storage.Metrics) defer ticker.Stop() for tick := 0; ; tick++ { select { case <-ticker.C: - if err := n.computePeriodicMetrics(ctx, tick); err != nil { + if err := n.computeMetricsPeriodically(ctx, previousMetrics, tick); err != nil { log.Errorf(ctx, "failed computing periodic metrics: %s", err) } case <-stopper.ShouldQuiesce(): @@ -756,12 +757,20 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time. }) } -// computePeriodicMetrics instructs each store to compute the value of +// computeMetricsPeriodically instructs each store to compute the value of // complicated metrics. -func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error { +func (n *Node) computeMetricsPeriodically( + ctx context.Context, storeToMetrics map[*kvserver.Store]*storage.Metrics, tick int, +) error { return n.stores.VisitStores(func(store *kvserver.Store) error { - if err := store.ComputeMetrics(ctx, tick); err != nil { + if newMetrics, err := store.ComputeMetricsPeriodically(ctx, storeToMetrics[store], tick); err != nil { log.Warningf(ctx, "%s: unable to compute metrics: %s", store, err) + } else { + if storeToMetrics[store] == nil { + storeToMetrics[store] = &newMetrics + } else { + *storeToMetrics[store] = newMetrics + } } return nil }) diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index a0ab4126395b..e20ae98cc6e1 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -491,7 +491,7 @@ func TestNodeStatusWritten(t *testing.T) { // were multiple replicas, more care would need to be taken in the initial // syncFeed(). forceWriteStatus := func() { - if err := ts.node.computePeriodicMetrics(ctx, 0); err != nil { + if err := ts.node.computeMetricsPeriodically(ctx, map[*kvserver.Store]*storage.Metrics{}, 0); err != nil { t.Fatalf("error publishing store statuses: %s", err) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index d97e521be982..dcf059f76bf0 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -344,7 +345,7 @@ func startServer(t *testing.T) *TestServer { // Make sure the node status is available. This is done by forcing stores to // publish their status, synchronizing to the event feed with a canary // event, and then forcing the server to write summaries immediately. - if err := ts.node.computePeriodicMetrics(context.Background(), 0); err != nil { + if err := ts.node.computeMetricsPeriodically(context.Background(), map[*kvserver.Store]*storage.Metrics{}, 0); err != nil { t.Fatalf("error publishing store statuses: %s", err) } diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 38554ed4a2fc..3766be3e8745 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1380,7 +1380,7 @@ func (tc *TestCluster) WaitForFullReplication() error { if err := s.ForceReplicationScanAndProcess(); err != nil { return err } - if err := s.ComputeMetrics(context.TODO(), 0); err != nil { + if err := s.ComputeMetrics(context.TODO()); err != nil { // This can sometimes fail since ComputeMetrics calls // updateReplicationGauges which needs the system config gossiped. log.Infof(context.TODO(), "%v", err) diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index e4e060c66e93..2f3f33678dae 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3072,6 +3072,10 @@ var charts = []sectionDescription{ "storage.l6-level-score", }, }, + { + Title: "Flush Utilization", + Metrics: []string{"storage.flush.utilization"}, + }, }, }, {