Skip to content

Commit

Permalink
kvserver: add cross-region snapshot byte metrics to StoreMetrics
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region snapshot traffic
between stores within a cluster.

To improve this issue, this commit adds two new store metrics -
`range.snapshots.cross-region.sent-bytes` and
`range.snapshots.cross-region.rcvd-bytes`. These metrics track the aggregate
snapshot bytes sent from and received at a store.

Fixes: #104124

Release note (ops change): Two new store metrics -
`range.snapshots.cross-region.sent-bytes` and
`range.snapshots.cross-region.rcvd-bytes` - are now added to track the aggregate
snapshot bytes sent from and received at a store. Note that these metrics
require nodes’ localities to include a “region” tier key. If a node lacks this
key but is involved in cross-region batch activities, an error message will be
logged.
  • Loading branch information
wenyihu6 committed Jun 2, 2023
1 parent bd4aa8d commit 3f1351e
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 69 deletions.
39 changes: 33 additions & 6 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1349,16 +1349,43 @@ func (sp *StorePool) GetLocalitiesPerReplica(
return localities
}

// getNodeLocalityWithString returns the locality information and the string
// format for the given node.
func (sp *StorePool) getNodeLocalityWithString(nodeID roachpb.NodeID) localityWithString {
nodeLocality := localityWithString{}
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
if locality, ok := sp.localitiesMu.nodeLocalities[nodeID]; ok {
nodeLocality = locality
}
// Return an empty localityWithString struct if nothing is found.
return nodeLocality
}

// GetNodeLocalityString returns the locality information for the given node
// in its string format.
func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string {
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
locality, ok := sp.localitiesMu.nodeLocalities[nodeID]
if !ok {
return ""
return sp.getNodeLocalityWithString(nodeID).str
}

// getNodeLocality returns the locality information for the given node.
func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality {
return sp.getNodeLocalityWithString(nodeID).locality
}

// IsCrossRegion takes in two replicas and compares the locality of them based
// on their replica node IDs. It returns (bool, error) indicating whether the
// two replicas’ nodes are in different regions and if any errors occurred
// during the lookup process.
func (sp *StorePool) IsCrossRegion(
firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) (bool, error) {
isCrossRegion, err := sp.getNodeLocality(firstReplica.NodeID).IsCrossRegion(
sp.getNodeLocality(secReplica.NodeID))
if err != nil {
return false, err
}
return locality.str
return isCrossRegion, nil
}

// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapShotCrossRegionSentBytes = metric.Metadata{
Name: "range.snapshots.cross-region.sent-bytes",
Help: "Number of snapshot bytes sent cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapShotCrossRegionRcvdBytes = metric.Metadata{
Name: "range.snapshots.cross-region.rcvd-bytes",
Help: "Number of snapshot bytes received cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotSendQueueLength = metric.Metadata{
Name: "range.snapshots.send-queue",
Help: "Number of snapshots queued to send",
Expand Down Expand Up @@ -2181,6 +2193,8 @@ type StoreMetrics struct {
RangeSnapshotRebalancingSentBytes *metric.Counter
RangeSnapshotRecvFailed *metric.Counter
RangeSnapshotRecvUnusable *metric.Counter
RangeSnapShotCrossRegionSentBytes *metric.Counter
RangeSnapShotCrossRegionRcvdBytes *metric.Counter

// Range snapshot queue metrics.
RangeSnapshotSendQueueLength *metric.Gauge
Expand Down Expand Up @@ -2803,6 +2817,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes),
RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed),
RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable),
RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes),
RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes),
RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength),
RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength),
RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress),
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3151,6 +3151,9 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.DelegateSnapshotSendBytes.Inc(inc)
}
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)
if r.store.shouldIncrementCrossRegionSnapshotMetrics(ctx, req.CoordinatorReplica, req.RecipientReplica) {
r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc)
}

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
Expand Down
185 changes: 122 additions & 63 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

Expand All @@ -1067,10 +1068,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
}
Expand Down Expand Up @@ -2208,8 +2210,8 @@ func getSnapshotBytesMetrics(
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr),
}
}
types := [4]string{".unknown", ".recovery", ".rebalancing", ""}
for _, v := range types {

for _, v := range [5]string{".unknown", ".recovery", ".rebalancing", ".cross-region", ""} {
metrics[v] = findSnapshotBytesMetrics(v)
}
return metrics
Expand Down Expand Up @@ -2294,11 +2296,13 @@ func getExpectedSnapshotSizeBytes(
return int64(b.Len()), err
}

// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and
// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new
// replica to a cluster, and during the process, a learner snapshot is sent to
// the new replica.
func TestRebalancingSnapshotMetrics(t *testing.T) {
// This test verifies the accuracy of snapshot metrics -
// `range.snapshots.[rebalancing|cross-region].rcvd-bytes` and
// `range.snapshots.[rebalancing|cross-region].sent-bytes`. It involves adding
// two new replicas on different nodes within the cluster, resulting in two
// learner snapshots sent cross region. The test then compares the metrics prior
// to and after sending the snapshot to verify the accuracy.
func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -2312,7 +2316,7 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
// by unblocking the current goroutine when `HandleDelegatedSnapshot` is
// about to send the snapshot. In addition, it also blocks the new
// goroutine, which was created to send the snapshot, until the calculation
// is complete.
// is complete (more info below).
close(blockUntilSnapshotSendCh)
select {
case <-blockSnapshotSendCh:
Expand All @@ -2321,82 +2325,137 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
}
}

// The initial setup ensures the correct configuration for three nodes (with
// different localities), single-range. Note that server[2] is configured
// without the inclusion of a "region" tier key.
const numNodes = 3
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
if i == 2 {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "zone", Value: fmt.Sprintf("us-east-%va", i)}},
},
Knobs: knobs,
}
} else {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: fmt.Sprintf("us-east-%v", i)}},
},
Knobs: knobs,
}
}
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
tc := testcluster.StartTestCluster(
t, numNodes, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ReplicationMode: base.ReplicationManual,
},
)

defer tc.Stopper().Stop(ctx)

// sendSnapshotFromServer is a testing helper function that sends a learner
// snapshot from sever[0] to server[serverIndex]. It adds a replica of the
// given key range on server[serverIndex] as a voter, resulting in a learner
// snapshot being sent. The function returns the expected snapshot length and
// the updated range descriptor.
sendSnapshotToServer := func(key roachpb.Key, serverIndex int) (roachpb.RangeDescriptor, int64) {
scratchStartKey := tc.ScratchRange(t)
// sendSnapshotFromServer is a testing helper that sends a learner snapshot
// from server[0] to server[serverIndex] and returns the expected size (in
// bytes) of the snapshot sent.
sendSnapshotToServer := func(serverIndex int, changeReplicaFn func(roachpb.Key, ...roachpb.ReplicationTarget) (roachpb.RangeDescriptor, error)) int64 {
blockUntilSnapshotSendCh = make(chan struct{})
blockSnapshotSendCh = make(chan struct{})
g := ctxgroup.WithContext(ctx)
rangeDesc := roachpb.RangeDescriptor{}
g.GoCtx(func(ctx context.Context) error {
// A new replica at servers[serverIndex] is now added to the cluster,
// resulting in a learner snapshot to be sent from servers[0] to
// servers[serverIndex]. This function is executed in a new goroutine to
// help us capture the expected snapshot bytes count accurately.
rangeDesc = tc.AddVotersOrFatal(t, key, tc.Target(serverIndex))
return nil
desc := tc.LookupRangeOrFatal(t, scratchStartKey)
desc, err := changeReplicaFn(scratchStartKey, tc.Target(serverIndex))
scratchStartKey = desc.StartKey.AsRawKey()
return err
})

// The current goroutine is blocked until the new goroutine, which has just
// been added, is about to send the snapshot (see the testing knob above).
// This allows us to calculate the snapshot bytes count accurately,
// accounting for any state changes that happen between calling
// AddVotersOrFatal and the snapshot being sent.
// changeReplicaFn and the snapshot being sent.
<-blockUntilSnapshotSendCh
store, repl := getFirstStoreReplica(t, tc.Server(0), key)
store, repl := getFirstStoreReplica(t, tc.Server(0), scratchStartKey)
snapshotLength, err := getExpectedSnapshotSizeBytes(ctx, store, repl, kvserverpb.SnapshotRequest_INITIAL)
require.NoError(t, err)

close(blockSnapshotSendCh)
// Wait the new goroutine (sending the snapshot) to complete before
// measuring the after-sending-snapshot metrics.
require.NoError(t, g.Wait())
return rangeDesc, snapshotLength
return snapshotLength
}

// Record the snapshot metrics before anything has been sent / received.
senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
scratchStartKey := tc.ScratchRange(t)
senderBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
firstReceiverBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
secReceiverBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)

// The first replica is added as a non-voter to help avoid issues in stress
// testing. A possible explanation is - if the first replica was added as a
// non-voter, it can be stuck in a state to receive the snapshot. This can
// cause failure to reach quorum during the second snapshot transfer.
firstSnapshotLength := sendSnapshotToServer(1, tc.AddNonVoters)
secSnapshotLength := sendSnapshotToServer(2, tc.AddVoters)
totalSnapshotLength := firstSnapshotLength + secSnapshotLength

// A learner snapshot should have been sent from the sender(server[0]) to the
// server[1] and server[2].
t.Run("sender", func(t *testing.T) {
// Compare the snapshot metrics for the sender after sending two snapshots to
// server[1] and server[2].
senderAfter := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
senderDelta := getSnapshotMetricsDiff(senderBefore, senderAfter)
senderExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
// Assert that the cross-region metrics should remain unchanged (since
// server[2]'s locality does not include a "region" tier key).
".cross-region": {sentBytes: firstSnapshotLength, rcvdBytes: 0},
"": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderExpected, senderDelta)
})

// A learner snapshot should have been sent from the sender to the receiver.
_, snapshotLength := sendSnapshotToServer(scratchStartKey, 1)
t.Run("first receiver", func(t *testing.T) {
// Compare the snapshot metrics for server[1] after receiving the first
// snapshot.
firstReceiverMetricsAfter := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
firstReceiverDelta := getSnapshotMetricsDiff(firstReceiverBefore, firstReceiverMetricsAfter)
firstReceiverExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
"": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
}
require.Equal(t, firstReceiverExpected, firstReceiverDelta)
})

// Record the snapshot metrics for the sender after a voter has been added.
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)
// Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent
// have been recorded, and that it was not double counted in a different
// metric.
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)
senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)
t.Run("second receiver", func(t *testing.T) {
// Compare the snapshot metrics for server[2] after receiving the second
// snapshot.
secReceiverAfter := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)
secReceiverDelta := getSnapshotMetricsDiff(secReceiverBefore, secReceiverAfter)
secReceiverExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: secSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
// Assert that the cross-region metrics should remain unchanged (since
// server[2]'s locality does not include a "region" tier key).
".cross-region": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: secSnapshotLength},
}
require.Equal(t, secReceiverExpected, secReceiverDelta)
})

// Record the snapshot metrics for the receiver after a voter has been added.
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)
// Asserts that the learner snapshot (aka rebalancing snapshot) bytes received
// have been recorded, and that it was not double counted in a different
// metric.
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)
receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
}
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,19 @@ func (s *Store) checkSnapshotOverlapLocked(
return nil
}

// shouldIncrementCrossRegionSnapshotMetrics returns true if the two replicas
// given are cross-region, and false otherwise.
func (s *Store) shouldIncrementCrossRegionSnapshotMetrics(
ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) bool {
isCrossRegion, err := s.cfg.StorePool.IsCrossRegion(firstReplica, secReplica)
if err != nil {
log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", err)
return false
}
return isCrossRegion
}

// receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream.
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
Expand Down Expand Up @@ -1089,6 +1102,11 @@ func (s *Store) receiveSnapshot(
recordBytesReceived := func(inc int64) {
s.metrics.RangeSnapshotRcvdBytes.Inc(inc)

if s.shouldIncrementCrossRegionSnapshotMetrics(
ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica) {
s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc)
}

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc)
Expand Down
Loading

0 comments on commit 3f1351e

Please sign in to comment.