diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 496f9ccce673..d9ea8aae8584 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -899,6 +899,18 @@ var ( Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapshotSendQueueSize = metric.Metadata{ + Name: "range.snapshots.send-queue-bytes", + Help: "Total size of all snapshots in the snapshot send queue", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRangeSnapshotRecvQueueSize = metric.Metadata{ + Name: "range.snapshots.recv-queue-bytes", + Help: "Total size of all snapshots in the snapshot receive queue", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaRangeRaftLeaderTransfers = metric.Metadata{ Name: "range.raftleadertransfers", @@ -2046,6 +2058,8 @@ type StoreMetrics struct { RangeSnapshotRecvInProgress *metric.Gauge RangeSnapshotSendTotalInProgress *metric.Gauge RangeSnapshotRecvTotalInProgress *metric.Gauge + RangeSnapshotSendQueueSize *metric.Gauge + RangeSnapshotRecvQueueSize *metric.Gauge // Delegate snapshot metrics. These don't count self-delegated snapshots. DelegateSnapshotSendBytes *metric.Counter @@ -2645,6 +2659,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRecvInProgress: metric.NewGauge(metaRangeSnapshotRecvInProgress), RangeSnapshotSendTotalInProgress: metric.NewGauge(metaRangeSnapshotSendTotalInProgress), RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress), + RangeSnapshotSendQueueSize: metric.NewGauge(metaRangeSnapshotSendQueueSize), + RangeSnapshotRecvQueueSize: metric.NewGauge(metaRangeSnapshotRecvQueueSize), RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers), RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries), DelegateSnapshotSendBytes: metric.NewCounter(metaDelegateSnapshotSendBytes), diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 72f68241c430..335d9df08e9b 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -77,6 +77,15 @@ var snapshotPrioritizationEnabled = settings.RegisterBoolSetting( true, ) +// snapshotMetrics contains metrics on the number and size of snapshots in +// progress or in the snapshot queue. +type snapshotMetrics struct { + QueueLen *metric.Gauge + QueueSize *metric.Gauge + InProgress *metric.Gauge + TotalInProgress *metric.Gauge +} + // incomingSnapshotStream is the minimal interface on a GRPC stream required // to receive a snapshot over the network. type incomingSnapshotStream interface { @@ -678,13 +687,19 @@ func (s *Store) reserveReceiveSnapshot( ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot") defer sp.Finish() - return s.throttleSnapshot(ctx, s.snapshotApplyQueue, - int(header.SenderQueueName), header.SenderQueuePriority, + return s.throttleSnapshot(ctx, + s.snapshotApplyQueue, + int(header.SenderQueueName), + header.SenderQueuePriority, -1, header.RangeSize, header.RaftMessageRequest.RangeID, - s.metrics.RangeSnapshotRecvQueueLength, - s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress, + snapshotMetrics{ + s.metrics.RangeSnapshotRecvQueueLength, + s.metrics.RangeSnapshotRecvQueueSize, + s.metrics.RangeSnapshotRecvInProgress, + s.metrics.RangeSnapshotRecvTotalInProgress, + }, ) } @@ -698,14 +713,19 @@ func (s *Store) reserveSendSnapshot( fn() } - return s.throttleSnapshot(ctx, s.snapshotSendQueue, + return s.throttleSnapshot(ctx, + s.snapshotSendQueue, int(req.SenderQueueName), req.SenderQueuePriority, req.QueueOnDelegateLen, rangeSize, req.RangeID, - s.metrics.RangeSnapshotSendQueueLength, - s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress, + snapshotMetrics{ + s.metrics.RangeSnapshotSendQueueLength, + s.metrics.RangeSnapshotSendQueueSize, + s.metrics.RangeSnapshotSendInProgress, + s.metrics.RangeSnapshotSendTotalInProgress, + }, ) } @@ -720,7 +740,7 @@ func (s *Store) throttleSnapshot( maxQueueLength int64, rangeSize int64, rangeID roachpb.RangeID, - waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge, + snapshotMetrics snapshotMetrics, ) (cleanup func(), funcErr error) { tBegin := timeutil.Now() @@ -742,8 +762,13 @@ func (s *Store) throttleSnapshot( } }() - waitingSnapshotMetric.Inc(1) - defer waitingSnapshotMetric.Dec(1) + // Total bytes of snapshots waiting in the snapshot queue + snapshotMetrics.QueueSize.Inc(rangeSize) + defer snapshotMetrics.QueueSize.Dec(rangeSize) + // Total number of snapshots waiting in the snapshot queue + snapshotMetrics.QueueLen.Inc(1) + defer snapshotMetrics.QueueLen.Dec(1) + queueCtx := ctx if deadline, ok := queueCtx.Deadline(); ok { // Enforce a more strict timeout for acquiring the snapshot reservation to @@ -778,10 +803,10 @@ func (s *Store) throttleSnapshot( } // Counts non-empty in-progress snapshots. - inProgressSnapshotMetric.Inc(1) + snapshotMetrics.InProgress.Inc(1) } // Counts all in-progress snapshots. - totalInProgressSnapshotMetric.Inc(1) + snapshotMetrics.TotalInProgress.Inc(1) // The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the // Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot. @@ -804,10 +829,10 @@ func (s *Store) throttleSnapshot( return func() { s.metrics.ReservedReplicaCount.Dec(1) s.metrics.Reserved.Dec(rangeSize) - totalInProgressSnapshotMetric.Dec(1) + snapshotMetrics.TotalInProgress.Dec(1) if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots { - inProgressSnapshotMetric.Dec(1) + snapshotMetrics.InProgress.Dec(1) snapshotQueue.Release(permit) } }, nil diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 086efddccf2f..fe80c9015a35 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -3157,7 +3157,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { s := tc.store cleanupNonEmpty1, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ - RangeSize: 1, + RangeSize: 10, }) if err != nil { t.Fatal(err) @@ -3167,6 +3167,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { } require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), @@ -3184,6 +3186,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { } require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(2), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), @@ -3205,6 +3209,10 @@ func TestReserveSnapshotThrottling(t *testing.T) { t.Errorf("unexpected snapshot queue length; expected: %d, got: %d", 1, s.Metrics().RangeSnapshotRecvQueueLength.Value()) } + if s.Metrics().RangeSnapshotRecvQueueSize.Value() != int64(10) { + t.Errorf("unexplected snapshot queue size; expected: %d, got: %d", 1, + s.Metrics().RangeSnapshotRecvQueueSize.Value()) + } if s.Metrics().RangeSnapshotRecvInProgress.Value() != int64(1) { t.Errorf("unexpected snapshots in progress; expected: %d, got: %d", 1, s.Metrics().RangeSnapshotRecvInProgress.Value()) @@ -3216,7 +3224,7 @@ func TestReserveSnapshotThrottling(t *testing.T) { }() cleanupNonEmpty3, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{ - RangeSize: 1, + RangeSize: 10, }) if err != nil { t.Fatal(err) @@ -3224,6 +3232,8 @@ func TestReserveSnapshotThrottling(t *testing.T) { atomic.StoreInt32(&boom, 1) require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(), "unexpected snapshot queue length") + require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(), + "unexpected snapshot queue size") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(), "unexpected snapshots in progress") require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(), diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 342df09ab129..f3cb6436dd92 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -666,6 +666,13 @@ var charts = []sectionDescription{ "range.snapshots.delegate.sent-bytes", }, }, + { + Title: "Snapshot Queue Bytes", + Metrics: []string{ + "range.snapshots.send-queue-bytes", + "range.snapshots.recv-queue-bytes", + }, + }, }, }, {