Skip to content

Commit

Permalink
kvserver: add metrics to track snapshot queue size
Browse files Browse the repository at this point in the history
Previously, we had metrics to track the number of snapshots waiting in
the snapshot queue; however, snapshots may be of different sizes, so it
is also helpful to track the size of all snapshots in the queue. This
change adds the following metrics to track the total size of all
snapshots waiting in the queue:

    range.snapshots.send-queue-bytes
    range.snapshots.recv-queue-bytes

Informs: cockroachdb#85528
Release note (ops change): Added two new metrics,
range.snapshots.(send|recv)-queue-bytes, to track the total size of all
snapshots waiting in the snapshot queue.
  • Loading branch information
miraradeva committed Apr 11, 2023
1 parent 8e6f530 commit 9fc0ef6
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 16 deletions.
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotSendQueueSize = metric.Metadata{
Name: "range.snapshots.send-queue-bytes",
Help: "Total size of all snapshots in the snapshot send queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotRecvQueueSize = metric.Metadata{
Name: "range.snapshots.recv-queue-bytes",
Help: "Total size of all snapshots in the snapshot receive queue",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}

metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Expand Down Expand Up @@ -2046,6 +2058,8 @@ type StoreMetrics struct {
RangeSnapshotRecvInProgress *metric.Gauge
RangeSnapshotSendTotalInProgress *metric.Gauge
RangeSnapshotRecvTotalInProgress *metric.Gauge
RangeSnapshotSendQueueSize *metric.Gauge
RangeSnapshotRecvQueueSize *metric.Gauge

// Delegate snapshot metrics. These don't count self-delegated snapshots.
DelegateSnapshotSendBytes *metric.Counter
Expand Down Expand Up @@ -2645,6 +2659,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRecvInProgress: metric.NewGauge(metaRangeSnapshotRecvInProgress),
RangeSnapshotSendTotalInProgress: metric.NewGauge(metaRangeSnapshotSendTotalInProgress),
RangeSnapshotRecvTotalInProgress: metric.NewGauge(metaRangeSnapshotRecvTotalInProgress),
RangeSnapshotSendQueueSize: metric.NewGauge(metaRangeSnapshotSendQueueSize),
RangeSnapshotRecvQueueSize: metric.NewGauge(metaRangeSnapshotRecvQueueSize),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries),
DelegateSnapshotSendBytes: metric.NewCounter(metaDelegateSnapshotSendBytes),
Expand Down
53 changes: 39 additions & 14 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ var snapshotPrioritizationEnabled = settings.RegisterBoolSetting(
true,
)

// snapshotMetrics contains metrics on the number and size of snapshots in
// progress or in the snapshot queue.
type snapshotMetrics struct {
QueueLen *metric.Gauge
QueueSize *metric.Gauge
InProgress *metric.Gauge
TotalInProgress *metric.Gauge
}

// incomingSnapshotStream is the minimal interface on a GRPC stream required
// to receive a snapshot over the network.
type incomingSnapshotStream interface {
Expand Down Expand Up @@ -678,13 +687,19 @@ func (s *Store) reserveReceiveSnapshot(
ctx, sp := tracing.EnsureChildSpan(ctx, s.cfg.Tracer(), "reserveReceiveSnapshot")
defer sp.Finish()

return s.throttleSnapshot(ctx, s.snapshotApplyQueue,
int(header.SenderQueueName), header.SenderQueuePriority,
return s.throttleSnapshot(ctx,
s.snapshotApplyQueue,
int(header.SenderQueueName),
header.SenderQueuePriority,
-1,
header.RangeSize,
header.RaftMessageRequest.RangeID,
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvInProgress, s.metrics.RangeSnapshotRecvTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotRecvQueueLength,
s.metrics.RangeSnapshotRecvQueueSize,
s.metrics.RangeSnapshotRecvInProgress,
s.metrics.RangeSnapshotRecvTotalInProgress,
},
)
}

Expand All @@ -698,14 +713,19 @@ func (s *Store) reserveSendSnapshot(
fn()
}

return s.throttleSnapshot(ctx, s.snapshotSendQueue,
return s.throttleSnapshot(ctx,
s.snapshotSendQueue,
int(req.SenderQueueName),
req.SenderQueuePriority,
req.QueueOnDelegateLen,
rangeSize,
req.RangeID,
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendInProgress, s.metrics.RangeSnapshotSendTotalInProgress,
snapshotMetrics{
s.metrics.RangeSnapshotSendQueueLength,
s.metrics.RangeSnapshotSendQueueSize,
s.metrics.RangeSnapshotSendInProgress,
s.metrics.RangeSnapshotSendTotalInProgress,
},
)
}

Expand All @@ -720,7 +740,7 @@ func (s *Store) throttleSnapshot(
maxQueueLength int64,
rangeSize int64,
rangeID roachpb.RangeID,
waitingSnapshotMetric, inProgressSnapshotMetric, totalInProgressSnapshotMetric *metric.Gauge,
snapshotMetrics snapshotMetrics,
) (cleanup func(), funcErr error) {

tBegin := timeutil.Now()
Expand All @@ -742,8 +762,13 @@ func (s *Store) throttleSnapshot(
}
}()

waitingSnapshotMetric.Inc(1)
defer waitingSnapshotMetric.Dec(1)
// Total bytes of snapshots waiting in the snapshot queue
snapshotMetrics.QueueSize.Inc(rangeSize)
defer snapshotMetrics.QueueSize.Dec(rangeSize)
// Total number of snapshots waiting in the snapshot queue
snapshotMetrics.QueueLen.Inc(1)
defer snapshotMetrics.QueueLen.Dec(1)

queueCtx := ctx
if deadline, ok := queueCtx.Deadline(); ok {
// Enforce a more strict timeout for acquiring the snapshot reservation to
Expand Down Expand Up @@ -778,10 +803,10 @@ func (s *Store) throttleSnapshot(
}

// Counts non-empty in-progress snapshots.
inProgressSnapshotMetric.Inc(1)
snapshotMetrics.InProgress.Inc(1)
}
// Counts all in-progress snapshots.
totalInProgressSnapshotMetric.Inc(1)
snapshotMetrics.TotalInProgress.Inc(1)

// The choice here is essentially arbitrary, but with a default range size of 128mb-512mb and the
// Raft snapshot rate limiting of 32mb/s, we expect to spend less than 16s per snapshot.
Expand All @@ -804,10 +829,10 @@ func (s *Store) throttleSnapshot(
return func() {
s.metrics.ReservedReplicaCount.Dec(1)
s.metrics.Reserved.Dec(rangeSize)
totalInProgressSnapshotMetric.Dec(1)
snapshotMetrics.TotalInProgress.Dec(1)

if rangeSize != 0 || s.cfg.TestingKnobs.ThrottleEmptySnapshots {
inProgressSnapshotMetric.Dec(1)
snapshotMetrics.InProgress.Dec(1)
snapshotQueue.Release(permit)
}
}, nil
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3157,7 +3157,7 @@ func TestReserveSnapshotThrottling(t *testing.T) {
s := tc.store

cleanupNonEmpty1, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
Expand All @@ -3167,6 +3167,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3184,6 +3186,8 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(2), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand All @@ -3205,6 +3209,10 @@ func TestReserveSnapshotThrottling(t *testing.T) {
t.Errorf("unexpected snapshot queue length; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueLength.Value())
}
if s.Metrics().RangeSnapshotRecvQueueSize.Value() != int64(10) {
t.Errorf("unexplected snapshot queue size; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvQueueSize.Value())
}
if s.Metrics().RangeSnapshotRecvInProgress.Value() != int64(1) {
t.Errorf("unexpected snapshots in progress; expected: %d, got: %d", 1,
s.Metrics().RangeSnapshotRecvInProgress.Value())
Expand All @@ -3216,14 +3224,16 @@ func TestReserveSnapshotThrottling(t *testing.T) {
}()

cleanupNonEmpty3, err := s.reserveReceiveSnapshot(ctx, &kvserverpb.SnapshotRequest_Header{
RangeSize: 1,
RangeSize: 10,
})
if err != nil {
t.Fatal(err)
}
atomic.StoreInt32(&boom, 1)
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueLength.Value(),
"unexpected snapshot queue length")
require.Equal(t, int64(0), s.Metrics().RangeSnapshotRecvQueueSize.Value(),
"unexpected snapshot queue size")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvInProgress.Value(),
"unexpected snapshots in progress")
require.Equal(t, int64(1), s.Metrics().RangeSnapshotRecvTotalInProgress.Value(),
Expand Down
7 changes: 7 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,13 @@ var charts = []sectionDescription{
"range.snapshots.delegate.sent-bytes",
},
},
{
Title: "Snapshot Queue Bytes",
Metrics: []string{
"range.snapshots.send-queue-bytes",
"range.snapshots.recv-queue-bytes",
},
},
},
},
{
Expand Down

0 comments on commit 9fc0ef6

Please sign in to comment.