diff --git a/pkg/kv/kvserver/client_metrics_test.go b/pkg/kv/kvserver/client_metrics_test.go index 2312ba1ad5fe..38be14e0ddea 100644 --- a/pkg/kv/kvserver/client_metrics_test.go +++ b/pkg/kv/kvserver/client_metrics_test.go @@ -137,7 +137,7 @@ func verifyStats(t *testing.T, tc *testcluster.TestCluster, storeIdxSlice ...int } func verifyStorageStats(t *testing.T, s *kvserver.Store) { - if err := s.ComputeMetrics(context.Background(), 0); err != nil { + if err := s.ComputeMetrics(context.Background()); err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestStoreMaxBehindNanosOnlyTracksEpochBasedLeases(t *testing.T) { sinceExpBasedLeaseStart := timeutil.Since(timeutil.Unix(0, l.Start.WallTime)) for i := 0; i < tc.NumServers(); i++ { s, _ := getFirstStoreReplica(t, tc.Server(i), keys.Meta1Prefix) - require.NoError(t, s.ComputeMetrics(ctx, 0)) + require.NoError(t, s.ComputeMetrics(ctx)) maxBehind := time.Duration(s.Metrics().ClosedTimestampMaxBehindNanos.Value()) // We want to make sure that maxBehind ends up being much smaller than the // start of an expiration based lease. diff --git a/pkg/kv/kvserver/client_replica_raft_overload_test.go b/pkg/kv/kvserver/client_replica_raft_overload_test.go index 13babeacbd7c..678b567deb80 100644 --- a/pkg/kv/kvserver/client_replica_raft_overload_test.go +++ b/pkg/kv/kvserver/client_replica_raft_overload_test.go @@ -80,7 +80,7 @@ func TestReplicaRaftOverload(t *testing.T) { // See: https://github.com/cockroachdb/cockroach/issues/84252 require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo")) s1 := tc.GetFirstStoreFromServer(t, 0) - require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + require.NoError(t, s1.ComputeMetrics(ctx)) if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 { return errors.New("no paused followers") } @@ -95,7 +95,7 @@ func TestReplicaRaftOverload(t *testing.T) { require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) testutils.SucceedsSoon(t, func() error { s1 := tc.GetFirstStoreFromServer(t, 0) - require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + require.NoError(t, s1.ComputeMetrics(ctx)) if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 { return errors.Errorf("%d paused followers", n) } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 4b50b3e397b7..68628d76b833 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2024,7 +2024,7 @@ func TestLeaseMetricsOnSplitAndTransfer(t *testing.T) { var expirationLeases int64 var epochLeases int64 for i := range tc.Servers { - if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background(), 0); err != nil { + if err := tc.GetFirstStoreFromServer(t, i).ComputeMetrics(context.Background()); err != nil { return err } metrics = tc.GetFirstStoreFromServer(t, i).Metrics() @@ -4835,7 +4835,7 @@ func TestUninitializedMetric(t *testing.T) { targetStore := tc.GetFirstStoreFromServer(t, 1) // Force the store to compute the replica metrics - require.NoError(t, targetStore.ComputeMetrics(ctx, 0)) + require.NoError(t, targetStore.ComputeMetrics(ctx)) // Blocked snapshot on the second server (1) should realize 1 uninitialized replica. require.Equal(t, int64(1), targetStore.Metrics().UninitializedCount.Value()) @@ -4845,7 +4845,7 @@ func TestUninitializedMetric(t *testing.T) { require.NoError(t, <-addReplicaErr) // Again force the store to compute metrics, increment tick counter 0 -> 1 - require.NoError(t, targetStore.ComputeMetrics(ctx, 1)) + require.NoError(t, targetStore.ComputeMetrics(ctx)) // There should now be no uninitialized replicas in the recorded metrics require.Equal(t, int64(0), targetStore.Metrics().UninitializedCount.Value()) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index abe976485fa1..99d4498534d2 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1663,6 +1663,13 @@ Note that the measurement does not include the duration for replicating the eval Measurement: "Occurrences", Unit: metric.Unit_COUNT, } + + metaPebbleFlushUtilization = metric.Metadata{ + Name: "pebble.flush.utilization", + Help: "The percentage of time spent flushing in the pebble flush loop", + Measurement: "Flush Utilization", + Unit: metric.Unit_PERCENT, + } ) // StoreMetrics is the set of metrics for a given store. @@ -1957,6 +1964,8 @@ type StoreMetrics struct { // Replica batch evaluation metrics. ReplicaReadBatchEvaluationLatency *metric.Histogram ReplicaWriteBatchEvaluationLatency *metric.Histogram + + FlushUtilization *metric.GaugeFloat64 } type tenantMetricsRef struct { @@ -2494,6 +2503,7 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { ReplicaWriteBatchEvaluationLatency: metric.NewHistogram( metaReplicaWriteBatchEvaluationLatency, histogramWindow, metric.IOLatencyBuckets, ), + FlushUtilization: metric.NewGaugeFloat64(metaPebbleFlushUtilization), } { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 2bbdc0b27177..8e1db2082c3a 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2354,7 +2354,7 @@ func (s *Store) systemGossipUpdate(sysCfg *config.SystemConfig) { // Metrics depend in part on the system config. Compute them as soon as we // get the first system config, then periodically in the background // (managed by the Node). - if err := s.ComputeMetrics(ctx, -1); err != nil { + if err := s.ComputeMetrics(ctx); err != nil { log.Infof(ctx, "%s: failed initial metrics computation: %s", s, err) } log.Event(ctx, "computed initial metrics") @@ -3315,29 +3315,25 @@ func (s *Store) checkpoint(ctx context.Context, tag string) (string, error) { return checkpointDir, nil } -// ComputeMetrics immediately computes the current value of store metrics which -// cannot be computed incrementally. This method should be invoked periodically -// by a higher-level system which records store metrics. -// -// The tick argument should increment across repeated calls to this -// method. It is used to compute some metrics less frequently than others. -func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { +// computeMetrics is a common metric computation that is used by +// ComputeMetricsPeriodically and ComputeMetrics to compute metrics +func (s *Store) computeMetrics(ctx context.Context) (m storage.Metrics, err error) { ctx = s.AnnotateCtx(ctx) - if err := s.updateCapacityGauges(ctx); err != nil { - return err + if err = s.updateCapacityGauges(ctx); err != nil { + return m, err } - if err := s.updateReplicationGauges(ctx); err != nil { - return err + if err = s.updateReplicationGauges(ctx); err != nil { + return m, err } // Get the latest engine metrics. - m := s.engine.GetMetrics() + m = s.engine.GetMetrics() s.metrics.updateEngineMetrics(m) // Get engine Env stats. envStats, err := s.engine.GetEnvStats() if err != nil { - return err + return m, err } s.metrics.updateEnvStats(*envStats) @@ -3349,6 +3345,29 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { s.metrics.RdbCheckpoints.Update(int64(len(dirs))) } + return m, nil +} + +// ComputeMetricsPeriodically computes metrics that need to be computed +// periodically along with the regular metrics +func (s *Store) ComputeMetricsPeriodically( + ctx context.Context, prevMetrics *storage.Metrics, tick int, +) (m storage.Metrics, err error) { + m, err = s.computeMetrics(ctx) + if err != nil { + return m, err + } + wt := m.Flush.WriteThroughput + + if prevMetrics != nil { + wt.Subtract(prevMetrics.Flush.WriteThroughput) + } + flushUtil := 0.0 + if wt.WorkDuration > 0 { + flushUtil = float64(wt.WorkDuration) / float64(wt.WorkDuration+wt.IdleDuration) + } + s.metrics.FlushUtilization.Update(flushUtil) + // Log this metric infrequently (with current configurations, // every 10 minutes). Trigger on tick 1 instead of tick 0 so that // non-periodic callers of this method don't trigger expensive @@ -3370,7 +3389,15 @@ func (s *Store) ComputeMetrics(ctx context.Context, tick int) error { e.StoreId = int32(s.StoreID()) log.StructuredEvent(ctx, &e) } - return nil + return m, nil +} + +// ComputeMetrics immediately computes the current value of store metrics which +// cannot be computed incrementally. This method should be invoked periodically +// by a higher-level system which records store metrics. +func (s *Store) ComputeMetrics(ctx context.Context) error { + _, err := s.computeMetrics(ctx) + return err } // ClusterNodeCount returns this store's view of the number of nodes in the diff --git a/pkg/server/node.go b/pkg/server/node.go index caccb33a1a61..7bbed0232836 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -742,11 +742,12 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time. _ = stopper.RunAsyncTask(ctx, "compute-metrics", func(ctx context.Context) { // Compute periodic stats at the same frequency as metrics are sampled. ticker := time.NewTicker(interval) + previousMetrics := make(map[*kvserver.Store]*storage.Metrics) defer ticker.Stop() for tick := 0; ; tick++ { select { case <-ticker.C: - if err := n.computePeriodicMetrics(ctx, tick); err != nil { + if err := n.computeMetricsPeriodically(ctx, previousMetrics, tick); err != nil { log.Errorf(ctx, "failed computing periodic metrics: %s", err) } case <-stopper.ShouldQuiesce(): @@ -756,12 +757,20 @@ func (n *Node) startComputePeriodicMetrics(stopper *stop.Stopper, interval time. }) } -// computePeriodicMetrics instructs each store to compute the value of +// computeMetricsPeriodically instructs each store to compute the value of // complicated metrics. -func (n *Node) computePeriodicMetrics(ctx context.Context, tick int) error { +func (n *Node) computeMetricsPeriodically( + ctx context.Context, storeToMetrics map[*kvserver.Store]*storage.Metrics, tick int, +) error { return n.stores.VisitStores(func(store *kvserver.Store) error { - if err := store.ComputeMetrics(ctx, tick); err != nil { + if newMetrics, err := store.ComputeMetricsPeriodically(ctx, storeToMetrics[store], tick); err != nil { log.Warningf(ctx, "%s: unable to compute metrics: %s", store, err) + } else { + if storeToMetrics[store] == nil { + storeToMetrics[store] = &newMetrics + } else { + *storeToMetrics[store] = newMetrics + } } return nil }) diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index a0ab4126395b..e20ae98cc6e1 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -491,7 +491,7 @@ func TestNodeStatusWritten(t *testing.T) { // were multiple replicas, more care would need to be taken in the initial // syncFeed(). forceWriteStatus := func() { - if err := ts.node.computePeriodicMetrics(ctx, 0); err != nil { + if err := ts.node.computeMetricsPeriodically(ctx, map[*kvserver.Store]*storage.Metrics{}, 0); err != nil { t.Fatalf("error publishing store statuses: %s", err) } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index d97e521be982..dcf059f76bf0 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -50,6 +50,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -344,7 +345,7 @@ func startServer(t *testing.T) *TestServer { // Make sure the node status is available. This is done by forcing stores to // publish their status, synchronizing to the event feed with a canary // event, and then forcing the server to write summaries immediately. - if err := ts.node.computePeriodicMetrics(context.Background(), 0); err != nil { + if err := ts.node.computeMetricsPeriodically(context.Background(), map[*kvserver.Store]*storage.Metrics{}, 0); err != nil { t.Fatalf("error publishing store statuses: %s", err) } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 1b1a59489e16..7bec37509064 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -682,6 +682,8 @@ type Pebble struct { diskSlowCount int64 diskStallCount int64 + onMetricCallback pebble.MetricCallbacks + // Relevant options copied over from pebble.Options. fs vfs.FS unencryptedFS vfs.FS @@ -931,6 +933,14 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { return nil, err } + cfg.Opts.OnMetrics = pebble.MetricCallbacks{ + LogWriterFsyncLatency: func(duration int64) { + if p.onMetricCallback.LogWriterFsyncLatency != nil { + p.onMetricCallback.LogWriterFsyncLatency(duration) + } + }, + } + db, err := pebble.Open(cfg.StorageConfig.Dir, cfg.Opts) if err != nil { return nil, err @@ -1693,6 +1703,11 @@ func (p *Pebble) RegisterFlushCompletedCallback(cb func()) { p.mu.Unlock() } +// RegisterMetricCallbacks implements the Engine interface +func (p *Pebble) RegisterMetricCallbacks(callbacks pebble.MetricCallbacks) { + p.onMetricCallback = callbacks +} + // Remove implements the FS interface. func (p *Pebble) Remove(filename string) error { return p.fs.Remove(filename) diff --git a/pkg/testutils/testcluster/testcluster.go b/pkg/testutils/testcluster/testcluster.go index 38554ed4a2fc..3766be3e8745 100644 --- a/pkg/testutils/testcluster/testcluster.go +++ b/pkg/testutils/testcluster/testcluster.go @@ -1380,7 +1380,7 @@ func (tc *TestCluster) WaitForFullReplication() error { if err := s.ForceReplicationScanAndProcess(); err != nil { return err } - if err := s.ComputeMetrics(context.TODO(), 0); err != nil { + if err := s.ComputeMetrics(context.TODO()); err != nil { // This can sometimes fail since ComputeMetrics calls // updateReplicationGauges which needs the system config gossiped. log.Infof(context.TODO(), "%v", err)