Skip to content

Commit

Permalink
kvserver: refactor getSnapshotBytesMetrics
Browse files Browse the repository at this point in the history
This commit refactors `getSnapshotBytesMetrics` in `replica_learner_test`
to return a `map[string]snapshotBytesMetrics` instead of
`map[SnapShotRequest_Priority]snapshotBytesMetrics`. This allows us to include
and compare different types of snapshot metrics, removing the constraint of
being limited to `SnapShotRequest_Priority`. This commit does not change any
existing functionality, and the main purpose is to make future commits cleaner.

Part of: #104124
Release note: none
  • Loading branch information
wenyihu6 committed May 30, 2023
1 parent 48c2bc6 commit b0db476
Showing 1 changed file with 55 additions and 80 deletions.
135 changes: 55 additions & 80 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,8 +967,8 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
g, ctx := errgroup.WithContext(ctx)

// Record the snapshot metrics before anything has been sent / received.
senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)

// Add a new voting replica, but don't initialize it. Note that
// `tc.AddNonVoters` will not return until the newly added non-voter is
Expand Down Expand Up @@ -1044,36 +1044,34 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
require.NoError(t, g.Wait())

// Record the snapshot metrics for the sender after the raft snapshot was sent.
senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)

// Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been
// recorded and that it was not double counted in a different metric.
senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter)
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0}
senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{
kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0},
kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: snapshotLength, rcvdBytes: 0},
kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0},
senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderTotalExpected, senderTotalDelta)
require.Equal(t, senderMapExpected, senderMapDelta)

// Record the snapshot metrics for the receiver after the raft snapshot was
// received.
receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)

// Asserts that the raft snapshot (aka recovery snapshot) bytes received have
// been recorded and that it was not double counted in a different metric.
receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter)
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength}
receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{
kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0},
kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: snapshotLength},
kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0},
receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverTotalExpected, receiverTotalDelta)
require.Equal(t, receiverMapExpected, receiverMapDelta)
}

Expand Down Expand Up @@ -2199,28 +2197,22 @@ type snapshotBytesMetrics struct {
// and granularMetrics is the map mentioned above.
func getSnapshotBytesMetrics(
t *testing.T, tc *testcluster.TestCluster, serverIdx int,
) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) {
granularMetrics := make(map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics)

granularMetrics[kvserverpb.SnapshotRequest_UNKNOWN] = snapshotBytesMetrics{
sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.sent-bytes"),
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.rcvd-bytes"),
}
granularMetrics[kvserverpb.SnapshotRequest_RECOVERY] = snapshotBytesMetrics{
sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.sent-bytes"),
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.rcvd-bytes"),
}
granularMetrics[kvserverpb.SnapshotRequest_REBALANCE] = snapshotBytesMetrics{
sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.sent-bytes"),
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.rcvd-bytes"),
) map[string]snapshotBytesMetrics {
metrics := make(map[string]snapshotBytesMetrics)

findSnapshotBytesMetrics := func(metricName string) snapshotBytesMetrics {
sentMetricStr := fmt.Sprintf("range.snapshots%v.sent-bytes", metricName)
rcvdMetricStr := fmt.Sprintf("range.snapshots%v.rcvd-bytes", metricName)
return snapshotBytesMetrics{
sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), sentMetricStr),
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr),
}
}

totalBytes := snapshotBytesMetrics{
sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.sent-bytes"),
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rcvd-bytes"),
types := [4]string{".unknown", ".recovery", ".rebalancing", ""}
for _, v := range types {
metrics[v] = findSnapshotBytesMetrics(v)
}

return totalBytes, granularMetrics
return metrics
}

// getSnapshotMetricsDiff returns the delta between snapshot byte metrics
Expand All @@ -2231,31 +2223,16 @@ func getSnapshotBytesMetrics(
// sent/received, and granularMetrics is the map of snapshotBytesMetrics structs
// containing deltas for each type of snapshot.
func getSnapshotMetricsDiff(
beforeTotal snapshotBytesMetrics,
beforeMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics,
afterTotal snapshotBytesMetrics,
afterMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics,
) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) {
diffTotal := snapshotBytesMetrics{
sentBytes: afterTotal.sentBytes - beforeTotal.sentBytes,
rcvdBytes: afterTotal.rcvdBytes - beforeTotal.rcvdBytes,
}
diffMap := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{
kvserverpb.SnapshotRequest_REBALANCE: {
sentBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes,
rcvdBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes,
},
kvserverpb.SnapshotRequest_RECOVERY: {
sentBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes,
rcvdBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes,
},
kvserverpb.SnapshotRequest_UNKNOWN: {
sentBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes,
rcvdBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes,
},
beforeMap map[string]snapshotBytesMetrics, afterMap map[string]snapshotBytesMetrics,
) map[string]snapshotBytesMetrics {
diffMap := make(map[string]snapshotBytesMetrics)
for metricName, beforeValue := range beforeMap {
diffMap[metricName] = snapshotBytesMetrics{
afterMap[metricName].sentBytes - beforeValue.sentBytes,
afterMap[metricName].rcvdBytes - beforeValue.rcvdBytes,
}
}

return diffTotal, diffMap
return diffMap
}

// This function returns the number of bytes sent for a snapshot. It follows the
Expand Down Expand Up @@ -2351,8 +2328,8 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
scratchStartKey := tc.ScratchRange(t)

// Record the snapshot metrics before anything has been sent / received.
senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
Expand All @@ -2374,36 +2351,34 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {

// Record the snapshot metrics for the sender after a voter has been added. A
// learner snapshot should have been sent from the sender to the receiver.
senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)

// Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent
// have been recorded and that it was not double counted in a different
// metric.
senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter)
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0}
senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{
kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: snapshotLength, rcvdBytes: 0},
kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0},
kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0},
senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderTotalExpected, senderTotalDelta)
require.Equal(t, senderMapExpected, senderMapDelta)

// Record the snapshot metrics for the receiver after a voter has been added.
receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)

// Asserts that the learner snapshot (aka rebalancing snapshot) bytes received
// have been recorded and that it was not double counted in a different
// metric.
receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter)
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength}
receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{
kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: snapshotLength},
kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0},
kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0},
receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverTotalExpected, receiverTotalDelta)
require.Equal(t, receiverMapExpected, receiverMapDelta)
}

0 comments on commit b0db476

Please sign in to comment.