Skip to content

Commit

Permalink
kvserver: implement granular metrics for snapshot bytes sent/rcvd
Browse files Browse the repository at this point in the history
This commit adds the following new metrics
- `range.snapshots.unknown.rcvd-bytes`
- `range.snapshots.unknown.sent-bytes`
- `range.snapshots.rebalancing.rcvd-bytes`
- `range.snapshots.rebalancing.sent-bytes`
- `range.snapshots.recovery.rcvd-bytes`
- `range.snapshots.recovery.sent-bytes`
These metrics tracks the bytes send/received for each type of
snapshot (rebalance, recovery, and unknown snapshots).
The sum of these three new metrics should equal the existing
`range.snapshots.(sent|rcvd)-bytes` that tracks the total number
of snapshot bytes sent and received.

Additionally, this commit changes the `snapshotStrategy` interface
such that the `Receive` and `Send` methods take
`snapshotBytesMetricFn` as a parameter rather than `*metric.Counter`

Finally, this commit adds a new SendSnapshot TestingKnob that hooks
into the send snapshot flow after a DelegateSnapshotRequest is
handled but before any throttling or sending logic takes place.

Resolves cockroachdb#81047
Release Note: None
  • Loading branch information
KnightAsterial committed May 31, 2022
1 parent fa277e9 commit c8017e1
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 22 deletions.
48 changes: 48 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,42 @@ var (
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotUnknownRcvdBytes = metric.Metadata{
Name: "range.snapshots.unknown.rcvd-bytes",
Help: "Number of unknown snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotUnknownSentBytes = metric.Metadata{
Name: "range.snapshots.unknown.sent-bytes",
Help: "Number of unknown snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRebalancingRcvdBytes = metric.Metadata{
Name: "range.snapshots.rebalancing.rcvd-bytes",
Help: "Number of rebalancing snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRebalancingSentBytes = metric.Metadata{
Name: "range.snapshots.rebalancing.sent-bytes",
Help: "Number of rebalancing snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRecoveryRcvdBytes = metric.Metadata{
Name: "range.snapshots.recovery.rcvd-bytes",
Help: "Number of recovery snapshot bytes received",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeSnapshotRecoverySentBytes = metric.Metadata{
Name: "range.snapshots.recovery.sent-bytes",
Help: "Number of recovery snapshot bytes sent",
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaRangeRaftLeaderTransfers = metric.Metadata{
Name: "range.raftleadertransfers",
Help: "Number of raft leader transfers",
Expand Down Expand Up @@ -1522,6 +1558,12 @@ type StoreMetrics struct {
RangeSnapshotsAppliedByNonVoters *metric.Counter
RangeSnapshotRcvdBytes *metric.Counter
RangeSnapshotSentBytes *metric.Counter
RangeSnapshotUnknownRcvdBytes *metric.Counter
RangeSnapshotUnknownSentBytes *metric.Counter
RangeSnapshotRecoveryRcvdBytes *metric.Counter
RangeSnapshotRecoverySentBytes *metric.Counter
RangeSnapshotRebalancingRcvdBytes *metric.Counter
RangeSnapshotRebalancingSentBytes *metric.Counter

// Raft processing metrics.
RaftTicks *metric.Counter
Expand Down Expand Up @@ -1977,6 +2019,12 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotsAppliedByNonVoters: metric.NewCounter(metaRangeSnapshotsAppliedByNonVoter),
RangeSnapshotRcvdBytes: metric.NewCounter(metaRangeSnapshotRcvdBytes),
RangeSnapshotSentBytes: metric.NewCounter(metaRangeSnapshotSentBytes),
RangeSnapshotUnknownRcvdBytes: metric.NewCounter(metaRangeSnapshotUnknownRcvdBytes),
RangeSnapshotUnknownSentBytes: metric.NewCounter(metaRangeSnapshotUnknownSentBytes),
RangeSnapshotRecoveryRcvdBytes: metric.NewCounter(metaRangeSnapshotRecoveryRcvdBytes),
RangeSnapshotRecoverySentBytes: metric.NewCounter(metaRangeSnapshotRecoverySentBytes),
RangeSnapshotRebalancingRcvdBytes: metric.NewCounter(metaRangeSnapshotRebalancingRcvdBytes),
RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes),
RangeRaftLeaderTransfers: metric.NewCounter(metaRangeRaftLeaderTransfers),
RangeLossOfQuorumRecoveries: metric.NewCounter(metaRangeLossOfQuorumRecoveries),

Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -775,7 +774,7 @@ func (t *RaftTransport) SendSnapshot(
snap *OutgoingSnapshot,
newBatch func() storage.Batch,
sent func(),
bytesSentCounter *metric.Counter,
recordBytesSent snapshotRecordMetrics,
) error {
nodeID := header.RaftMessageRequest.ToReplica.NodeID

Expand All @@ -794,7 +793,7 @@ func (t *RaftTransport) SendSnapshot(
log.Warningf(ctx, "failed to close snapshot stream: %+v", err)
}
}()
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, bytesSentCounter)
return sendSnapshot(ctx, t.st, stream, storePool, header, snap, newBatch, sent, recordBytesSent)
}

// DelegateSnapshot creates a rpc stream between the leaseholder and the
Expand Down
17 changes: 16 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2767,6 +2767,21 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.RangeSnapshotsGenerated.Inc(1)
}

recordBytesSent := func(inc int64) {
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc)
case kvserverpb.SnapshotRequest_REBALANCE:
r.store.metrics.RangeSnapshotRebalancingSentBytes.Inc(inc)
default:
// If a snapshot is not a RECOVERY or REBALANCE snapshot, it must be of
// type UNKNOWN.
r.store.metrics.RangeSnapshotUnknownSentBytes.Inc(inc)
}
}

err = contextutil.RunWithTimeout(
ctx, "send-snapshot", sendSnapshotTimeout, func(ctx context.Context) error {
return r.store.cfg.Transport.SendSnapshot(
Expand All @@ -2776,7 +2791,7 @@ func (r *Replica) followerSendSnapshot(
snap,
newBatchFn,
sent,
r.store.metrics.RangeSnapshotSentBytes,
recordBytesSent,
)
},
)
Expand Down
Loading

0 comments on commit c8017e1

Please sign in to comment.