diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 6098dd8eeda3..7ed210d98fc2 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -1385,16 +1385,43 @@ func (sp *StorePool) GetLocalitiesPerReplica( return localities } +// getNodeLocalityWithString returns the locality information and the string +// format for the given node. +func (sp *StorePool) getNodeLocalityWithString(nodeID roachpb.NodeID) localityWithString { + nodeLocality := localityWithString{} + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + if locality, ok := sp.localitiesMu.nodeLocalities[nodeID]; ok { + nodeLocality = locality + } + // Return an empty localityWithString struct if nothing is found. + return nodeLocality +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { - sp.localitiesMu.RLock() - defer sp.localitiesMu.RUnlock() - locality, ok := sp.localitiesMu.nodeLocalities[nodeID] - if !ok { - return "" + return sp.getNodeLocalityWithString(nodeID).str +} + +// getNodeLocality returns the locality information for the given node. +func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { + return sp.getNodeLocalityWithString(nodeID).locality +} + +// IsCrossRegion takes in two replicas and compares the locality of them based +// on their replica node IDs. It returns (bool, error) indicating whether the +// two replicas’ nodes are in different regions and if any errors occurred +// during the lookup process. +func (sp *StorePool) IsCrossRegion( + firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, +) (bool, error) { + isCrossRegion, err := sp.getNodeLocality(firstReplica.NodeID).IsCrossRegion( + sp.getNodeLocality(secReplica.NodeID)) + if err != nil { + return false, err } - return locality.str + return isCrossRegion, nil } // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 50c60fb5b43e..02dfda009154 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -946,6 +946,18 @@ var ( Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapShotCrossRegionSentBytes = metric.Metadata{ + Name: "range.snapshots.cross-region.sent-bytes", + Help: "Number of snapshot bytes sent cross region", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRangeSnapShotCrossRegionRcvdBytes = metric.Metadata{ + Name: "range.snapshots.cross-region.rcvd-bytes", + Help: "Number of snapshot bytes received cross region", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaRangeSnapshotSendQueueLength = metric.Metadata{ Name: "range.snapshots.send-queue", Help: "Number of snapshots queued to send", @@ -2173,6 +2185,8 @@ type StoreMetrics struct { RangeSnapshotRebalancingSentBytes *metric.Counter RangeSnapshotRecvFailed *metric.Counter RangeSnapshotRecvUnusable *metric.Counter + RangeSnapShotCrossRegionSentBytes *metric.Counter + RangeSnapShotCrossRegionRcvdBytes *metric.Counter // Range snapshot queue metrics. RangeSnapshotSendQueueLength *metric.Gauge @@ -2789,6 +2803,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed), RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable), + RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes), + RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes), RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength), RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength), RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress), diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 293f6693d082..890192cd25b1 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3151,6 +3151,9 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) + if r.store.shouldIncrementCrossRegionSnapshotMetrics(ctx, req.CoordinatorReplica, req.RecipientReplica) { + r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) + } switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index c5a0570fa040..f39b17114901 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -1051,10 +1051,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) senderMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, } require.Equal(t, senderMapExpected, senderMapDelta) @@ -1067,10 +1068,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) receiverMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, } require.Equal(t, receiverMapExpected, receiverMapDelta) } @@ -2208,8 +2210,8 @@ func getSnapshotBytesMetrics( rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr), } } - types := [4]string{".unknown", ".recovery", ".rebalancing", ""} - for _, v := range types { + + for _, v := range [5]string{".unknown", ".recovery", ".rebalancing", ".cross-region", ""} { metrics[v] = findSnapshotBytesMetrics(v) } return metrics @@ -2294,11 +2296,12 @@ func getExpectedSnapshotSizeBytes( return int64(b.Len()), err } -// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and -// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new -// replica to a cluster, and during the process, a learner snapshot is sent to -// the new replica. -func TestRebalancingSnapshotMetrics(t *testing.T) { +// Tests the accuracy of the +// 'range.snapshots.[rebalancing|cross-region].rcvd-bytes' and +// 'range.snapshots.[rebalancing|cross-region].sent-bytes' metrics. This test +// adds a new replica to a cluster, and during the process, a learner snapshot +// is sent to the new replica. +func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2321,11 +2324,36 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { } } + // The initial setup ensures the correct configuration for three nodes (with + // different localities), single-range. + const numNodes = 3 + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numNodes; i++ { + if i == 2 { + serverArgs[i] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "zone", Value: fmt.Sprintf("us-east-%va", i)}}, + }, + Knobs: knobs, + } + } else { + serverArgs[i] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{Key: "region", Value: fmt.Sprintf("us-east-%v", i)}}, + }, + Knobs: knobs, + } + } + } + ctx := context.Background() - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: knobs}, - ReplicationMode: base.ReplicationManual, - }) + tc := testcluster.StartTestCluster( + t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) // sendSnapshotFromServer is a testing helper function that sends a learner @@ -2365,38 +2393,59 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { } // Record the snapshot metrics before anything has been sent / received. - senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) - receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + senderBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + firstReceiverBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + secReceiverBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */) scratchStartKey := tc.ScratchRange(t) - // A learner snapshot should have been sent from the sender to the receiver. - _, snapshotLength := sendSnapshotToServer(scratchStartKey, 1) - - // Record the snapshot metrics for the sender after a voter has been added. - senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0) - // Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent - // have been recorded, and that it was not double counted in a different - // metric. - senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) - senderMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0}, + // A cross-region learner snapshot should have been sent from the + // sender(server[0]) to the server[1] and server[2]. + newRangeDesc, firstSnapshotLength := sendSnapshotToServer(scratchStartKey, 1) + _, secSnapshotLength := sendSnapshotToServer(newRangeDesc.StartKey.AsRawKey(), 2) + totalSnapshotLength := firstSnapshotLength + secSnapshotLength + + // Compare the snapshot metrics for the sender after sending two snapshots to + // server[1] and server[2]. + senderAfter := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) + senderDelta := getSnapshotMetricsDiff(senderBefore, senderAfter) + senderExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: totalSnapshotLength, rcvdBytes: 0}, ".recovery": {sentBytes: 0, rcvdBytes: 0}, ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: snapshotLength, rcvdBytes: 0}, + // Assert that the cross-region metrics should not be affected by the second + // snapshot (since server[2]'s locality does not include a "region" tier + // key). + ".cross-region": {sentBytes: firstSnapshotLength, rcvdBytes: 0}, + "": {sentBytes: totalSnapshotLength, rcvdBytes: 0}, } - require.Equal(t, senderMapExpected, senderMapDelta) + require.Equal(t, senderExpected, senderDelta) + + // Compare the snapshot metrics for server[1] after receiving the first + // cross-region and rebalancing snapshot. + firstReceiverMetricsAfter := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + firstReceiverDelta := getSnapshotMetricsDiff(firstReceiverBefore, firstReceiverMetricsAfter) + firstReceiverExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: firstSnapshotLength}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + ".cross-region": {sentBytes: 0, rcvdBytes: firstSnapshotLength}, + "": {sentBytes: 0, rcvdBytes: firstSnapshotLength}, + } + require.Equal(t, firstReceiverExpected, firstReceiverDelta) - // Record the snapshot metrics for the receiver after a voter has been added. - receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1) - // Asserts that the learner snapshot (aka rebalancing snapshot) bytes received - // have been recorded, and that it was not double counted in a different - // metric. - receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) - receiverMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength}, + // Compare the snapshot metrics for server[2] after receiving the second + // snapshot. + secReceiverAfter := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */) + secReceiverDelta := getSnapshotMetricsDiff(secReceiverBefore, secReceiverAfter) + secReceiverExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: secSnapshotLength}, ".recovery": {sentBytes: 0, rcvdBytes: 0}, ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: 0, rcvdBytes: snapshotLength}, + // Assert that the cross-region metrics should not be affected by the second + // snapshot (since server[2]'s locality does not include a "region" tier + // key). + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: secSnapshotLength}, } - require.Equal(t, receiverMapExpected, receiverMapDelta) + require.Equal(t, secReceiverExpected, secReceiverDelta) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 29923f245004..d9e196ce510a 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -972,6 +972,22 @@ func (s *Store) checkSnapshotOverlapLocked( return nil } +func (s *Store) shouldIncrementCrossRegionSnapshotMetrics( + ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, +) bool { + storePool := s.cfg.StorePool + if storePool == nil { + log.VEventf(ctx, 2, "store pool is nil") + return false + } + isCrossRegion, err := storePool.IsCrossRegion(firstReplica, secReplica) + if err != nil { + log.VEventf(ctx, 2, "%v", err) + return false + } + return isCrossRegion +} + // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, @@ -1089,6 +1105,11 @@ func (s *Store) receiveSnapshot( recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) + if s.shouldIncrementCrossRegionSnapshotMetrics( + ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica) { + s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + } + switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 30a6d4b5fa13..45a0201023ab 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) { return true, Tier{} } +// IsCrossRegion checks if both this and passed locality has a tier with "region" +// as the key. If either locality does not have a region tier, it returns +// (false, error). Otherwise, it compares their region values and returns (true, +// nil) if they are different, and (false, nil) otherwise. +func (l Locality) IsCrossRegion(other Locality) (bool, error) { + // It is unfortunate that the "region" tier key is hardcoded here. Ideally, we + // would prefer a more robust way to determine node locality regions. + region, hasRegion := l.Find("region") + otherRegion, hasRegionInOther := other.Find("region") + + if hasRegion && hasRegionInOther { + return region != otherRegion, nil + } + return false, errors.Errorf("locality must have a region tier key for cross-region comparison") +} + // SharedPrefix returns the number of this locality's tiers which match those of // the passed locality. func (l Locality) SharedPrefix(other Locality) int {