diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 80299b6344f6..44943b3584a6 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -967,8 +967,8 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { g, ctx := errgroup.WithContext(ctx) // Record the snapshot metrics before anything has been sent / received. - senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) - receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) // Add a new voting replica, but don't initialize it. Note that // `tc.AddNonVoters` will not return until the newly added non-voter is @@ -1044,36 +1044,34 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { require.NoError(t, g.Wait()) // Record the snapshot metrics for the sender after the raft snapshot was sent. - senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) // Asserts that the raft snapshot (aka recovery snapshot) bytes sent have been // recorded and that it was not double counted in a different metric. - senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) - senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} - senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ - kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: snapshotLength, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + senderMapExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: snapshotLength, rcvdBytes: 0}, } - require.Equal(t, senderTotalExpected, senderTotalDelta) require.Equal(t, senderMapExpected, senderMapDelta) // Record the snapshot metrics for the receiver after the raft snapshot was // received. - receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) // Asserts that the raft snapshot (aka recovery snapshot) bytes received have // been recorded and that it was not double counted in a different metric. - receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) - receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} - receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ - kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: snapshotLength}, - kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + receiverMapExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: snapshotLength}, } - require.Equal(t, receiverTotalExpected, receiverTotalDelta) require.Equal(t, receiverMapExpected, receiverMapDelta) } @@ -2199,28 +2197,22 @@ type snapshotBytesMetrics struct { // and granularMetrics is the map mentioned above. func getSnapshotBytesMetrics( t *testing.T, tc *testcluster.TestCluster, serverIdx int, -) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { - granularMetrics := make(map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) - - granularMetrics[kvserverpb.SnapshotRequest_UNKNOWN] = snapshotBytesMetrics{ - sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.sent-bytes"), - rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.unknown.rcvd-bytes"), - } - granularMetrics[kvserverpb.SnapshotRequest_RECOVERY] = snapshotBytesMetrics{ - sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.sent-bytes"), - rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.recovery.rcvd-bytes"), - } - granularMetrics[kvserverpb.SnapshotRequest_REBALANCE] = snapshotBytesMetrics{ - sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.sent-bytes"), - rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rebalancing.rcvd-bytes"), +) map[string]snapshotBytesMetrics { + metrics := make(map[string]snapshotBytesMetrics) + + findSnapshotBytesMetrics := func(metricName string) snapshotBytesMetrics { + sentMetricStr := fmt.Sprintf("range.snapshots%v.sent-bytes", metricName) + rcvdMetricStr := fmt.Sprintf("range.snapshots%v.rcvd-bytes", metricName) + return snapshotBytesMetrics{ + sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), sentMetricStr), + rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr), + } } - - totalBytes := snapshotBytesMetrics{ - sentBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.sent-bytes"), - rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), "range.snapshots.rcvd-bytes"), + types := [4]string{".unknown", ".recovery", ".rebalancing", ""} + for _, v := range types { + metrics[v] = findSnapshotBytesMetrics(v) } - - return totalBytes, granularMetrics + return metrics } // getSnapshotMetricsDiff returns the delta between snapshot byte metrics @@ -2231,31 +2223,16 @@ func getSnapshotBytesMetrics( // sent/received, and granularMetrics is the map of snapshotBytesMetrics structs // containing deltas for each type of snapshot. func getSnapshotMetricsDiff( - beforeTotal snapshotBytesMetrics, - beforeMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, - afterTotal snapshotBytesMetrics, - afterMap map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics, -) (snapshotBytesMetrics, map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics) { - diffTotal := snapshotBytesMetrics{ - sentBytes: afterTotal.sentBytes - beforeTotal.sentBytes, - rcvdBytes: afterTotal.rcvdBytes - beforeTotal.rcvdBytes, - } - diffMap := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ - kvserverpb.SnapshotRequest_REBALANCE: { - sentBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].sentBytes, - rcvdBytes: afterMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_REBALANCE].rcvdBytes, - }, - kvserverpb.SnapshotRequest_RECOVERY: { - sentBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].sentBytes, - rcvdBytes: afterMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_RECOVERY].rcvdBytes, - }, - kvserverpb.SnapshotRequest_UNKNOWN: { - sentBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].sentBytes, - rcvdBytes: afterMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes - beforeMap[kvserverpb.SnapshotRequest_UNKNOWN].rcvdBytes, - }, + beforeMap map[string]snapshotBytesMetrics, afterMap map[string]snapshotBytesMetrics, +) map[string]snapshotBytesMetrics { + diffMap := make(map[string]snapshotBytesMetrics) + for metricName, beforeValue := range beforeMap { + diffMap[metricName] = snapshotBytesMetrics{ + afterMap[metricName].sentBytes - beforeValue.sentBytes, + afterMap[metricName].rcvdBytes - beforeValue.rcvdBytes, + } } - - return diffTotal, diffMap + return diffMap } // This function returns the number of bytes sent for a snapshot. It follows the @@ -2351,8 +2328,8 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { scratchStartKey := tc.ScratchRange(t) // Record the snapshot metrics before anything has been sent / received. - senderTotalBefore, senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) - receiverTotalBefore, receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { @@ -2374,36 +2351,34 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { // Record the snapshot metrics for the sender after a voter has been added. A // learner snapshot should have been sent from the sender to the receiver. - senderTotalAfter, senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) + senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) // Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent // have been recorded and that it was not double counted in a different // metric. - senderTotalDelta, senderMapDelta := getSnapshotMetricsDiff(senderTotalBefore, senderMetricsMapBefore, senderTotalAfter, senderMetricsMapAfter) + senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) - senderTotalExpected := snapshotBytesMetrics{sentBytes: snapshotLength, rcvdBytes: 0} - senderMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ - kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: snapshotLength, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + senderMapExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: snapshotLength, rcvdBytes: 0}, } - require.Equal(t, senderTotalExpected, senderTotalDelta) require.Equal(t, senderMapExpected, senderMapDelta) // Record the snapshot metrics for the receiver after a voter has been added. - receiverTotalAfter, receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) + receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) // Asserts that the learner snapshot (aka rebalancing snapshot) bytes received // have been recorded and that it was not double counted in a different // metric. - receiverTotalDelta, receiverMapDelta := getSnapshotMetricsDiff(receiverTotalBefore, receiverMetricsMapBefore, receiverTotalAfter, receiverMetricsMapAfter) + receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) - receiverTotalExpected := snapshotBytesMetrics{sentBytes: 0, rcvdBytes: snapshotLength} - receiverMapExpected := map[kvserverpb.SnapshotRequest_Priority]snapshotBytesMetrics{ - kvserverpb.SnapshotRequest_REBALANCE: {sentBytes: 0, rcvdBytes: snapshotLength}, - kvserverpb.SnapshotRequest_RECOVERY: {sentBytes: 0, rcvdBytes: 0}, - kvserverpb.SnapshotRequest_UNKNOWN: {sentBytes: 0, rcvdBytes: 0}, + receiverMapExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: snapshotLength}, } - require.Equal(t, receiverTotalExpected, receiverTotalDelta) require.Equal(t, receiverMapExpected, receiverMapDelta) }