diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 6098dd8eeda3..c342520c3888 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -1385,16 +1385,46 @@ func (sp *StorePool) GetLocalitiesPerReplica( return localities } +// GetNodeLocalityWithString returns the locality information and its string +// format for the given node. +func (sp *StorePool) GetNodeLocalityWithString(nodeID roachpb.NodeID) localityWithString { + nodeLocality := localityWithString{ + str: "", + locality: roachpb.Locality{}, + } + sp.localitiesMu.RLock() + defer sp.localitiesMu.RUnlock() + if locality, ok := sp.localitiesMu.nodeLocalities[nodeID]; ok { + nodeLocality = locality + } + // Return an empty localityWithString struct if nothing is found. + return nodeLocality +} + // GetNodeLocalityString returns the locality information for the given node // in its string format. func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { - sp.localitiesMu.RLock() - defer sp.localitiesMu.RUnlock() - locality, ok := sp.localitiesMu.nodeLocalities[nodeID] - if !ok { - return "" + return sp.GetNodeLocalityWithString(nodeID).str +} + +// GetNodeLocality returns the locality information for the given node. +func (sp *StorePool) GetNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { + return sp.GetNodeLocalityWithString(nodeID).locality +} + +// IsCrossRegion takes in two replicas and compares the locality of them based +// on their replica node IDs. It returns (bool, error) indicating whether the +// two replicas’ nodes are in different regions and if any errors occurred +// during the lookup process. +func (sp *StorePool) IsCrossRegion( + firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, +) (bool, error) { + isCrossRegion, err := sp.GetNodeLocality(firstReplica.NodeID).IsCrossRegion( + sp.GetNodeLocality(secReplica.NodeID)) + if err != nil { + return false, err } - return locality.str + return isCrossRegion, nil } // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 50c60fb5b43e..cf487d261ab8 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -946,6 +946,18 @@ var ( Measurement: "Snapshots", Unit: metric.Unit_COUNT, } + metaRangeSnapShotCrossRegionSentBytes = metric.Metadata{ + Name: "range.snapshots.cross-region.sent-bytes", + Help: "Number of snapshot bytes sent that were cross region", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaRangeSnapShotCrossRegionRcvdBytes = metric.Metadata{ + Name: "range.snapshots.cross-region.rcvd-bytes", + Help: "Number of snapshot bytes received that were cross region", + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaRangeSnapshotSendQueueLength = metric.Metadata{ Name: "range.snapshots.send-queue", Help: "Number of snapshots queued to send", @@ -2173,6 +2185,8 @@ type StoreMetrics struct { RangeSnapshotRebalancingSentBytes *metric.Counter RangeSnapshotRecvFailed *metric.Counter RangeSnapshotRecvUnusable *metric.Counter + RangeSnapShotCrossRegionSentBytes *metric.Counter + RangeSnapShotCrossRegionRcvdBytes *metric.Counter // Range snapshot queue metrics. RangeSnapshotSendQueueLength *metric.Gauge @@ -2789,6 +2803,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes), RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed), RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable), + RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes), + RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes), RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength), RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength), RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress), diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 293f6693d082..f7cf971ab258 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3151,7 +3151,11 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) - + if isCrossRegion, err := r.store.cfg.StorePool.IsCrossRegion( + req.CoordinatorReplica, req.RecipientReplica); isCrossRegion && err == nil { + // Increment if the snapshot was sent cross-region. + r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) + } switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc) diff --git a/pkg/kv/kvserver/replica_learner_test.go b/pkg/kv/kvserver/replica_learner_test.go index 44943b3584a6..691286f9b2ce 100644 --- a/pkg/kv/kvserver/replica_learner_test.go +++ b/pkg/kv/kvserver/replica_learner_test.go @@ -1051,10 +1051,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) senderMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, } require.Equal(t, senderMapExpected, senderMapDelta) @@ -1067,10 +1068,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) { receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) receiverMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, - ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, } require.Equal(t, receiverMapExpected, receiverMapDelta) } @@ -2208,7 +2210,7 @@ func getSnapshotBytesMetrics( rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr), } } - types := [4]string{".unknown", ".recovery", ".rebalancing", ""} + types := [5]string{".unknown", ".recovery", ".rebalancing", ".cross-region", ""} for _, v := range types { metrics[v] = findSnapshotBytesMetrics(v) } @@ -2294,11 +2296,12 @@ func getExpectedSnapshotSizeBytes( return int64(b.Len()), err } -// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and -// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new -// replica to a cluster, and during the process, a learner snapshot is sent to -// the new replica. -func TestRebalancingSnapshotMetrics(t *testing.T) { +// Tests the accuracy of the +// 'range.snapshots.[rebalancing|cross-region].rcvd-bytes' and +// 'range.snapshots.[rebalancing|cross-region].sent-bytes' metrics. This test +// adds a new replica to a cluster, and during the process, a learner snapshot +// is sent to the new replica. +func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2318,21 +2321,45 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { } } + // The initial setup ensures the correct configuration for three nodes (with + // different localities), single-range. + const numNodes = 3 + serverArgs := make(map[int]base.TestServerArgs) + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", Value: fmt.Sprintf("us-east-%v", i), + }, + }, + }, + Knobs: knobs, + } + } + ctx := context.Background() - tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: knobs}, - ReplicationMode: base.ReplicationManual, - }) + tc := testcluster.StartTestCluster( + t, numNodes, base.TestClusterArgs{ + ServerArgsPerNode: serverArgs, + ReplicationMode: base.ReplicationManual, + }, + ) + defer tc.Stopper().Stop(ctx) + // Note that a range is scratched off from servers[0]. scratchStartKey := tc.ScratchRange(t) // Record the snapshot metrics before anything has been sent / received. senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */) receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */) + dummyNodeMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */) g := ctxgroup.WithContext(ctx) g.GoCtx(func(ctx context.Context) error { + // A new replica at servers[1] is now added to the cluster, resulting in a + // learner snapshot to be sent cross-region from servers[0] to servers[1]. _, err := tc.AddVoters(scratchStartKey, tc.Target(1)) return err }) @@ -2359,10 +2386,11 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter) senderMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0}, - ".recovery": {sentBytes: 0, rcvdBytes: 0}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + ".cross-region": {sentBytes: snapshotLength, rcvdBytes: 0}, + "": {sentBytes: snapshotLength, rcvdBytes: 0}, } require.Equal(t, senderMapExpected, senderMapDelta) @@ -2375,10 +2403,24 @@ func TestRebalancingSnapshotMetrics(t *testing.T) { receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter) receiverMapExpected := map[string]snapshotBytesMetrics{ - ".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength}, - ".recovery": {sentBytes: 0, rcvdBytes: 0}, - ".unknown": {sentBytes: 0, rcvdBytes: 0}, - "": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + ".cross-region": {sentBytes: 0, rcvdBytes: snapshotLength}, + "": {sentBytes: 0, rcvdBytes: snapshotLength}, } require.Equal(t, receiverMapExpected, receiverMapDelta) + + // Asserts that the dummy node sends/receives no cross-region snapshots, and + // its metrics should remain unchanged. + dummyNodeMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */) + dummyNodeMapDelta := getSnapshotMetricsDiff(dummyNodeMetricsMapBefore, dummyNodeMetricsMapAfter) + dummyNodeMapExpected := map[string]snapshotBytesMetrics{ + ".rebalancing": {sentBytes: 0, rcvdBytes: 0}, + ".recovery": {sentBytes: 0, rcvdBytes: 0}, + ".unknown": {sentBytes: 0, rcvdBytes: 0}, + ".cross-region": {sentBytes: 0, rcvdBytes: 0}, + "": {sentBytes: 0, rcvdBytes: 0}, + } + require.Equal(t, dummyNodeMapExpected, dummyNodeMapDelta) } diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 29923f245004..f43be725c839 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -1089,6 +1089,13 @@ func (s *Store) receiveSnapshot( recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) + if isCrossRegion, _ := s.cfg.StorePool.IsCrossRegion( + // Increment if the snapshot was from cross-region. + header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, + ); isCrossRegion && err == nil { + s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + } + switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 30a6d4b5fa13..d42b65d62eee 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) { return true, Tier{} } +// IsCrossRegion checks if both this and passed locality has a tier with "region" +// as the key. If either locality does not have a region tier, it returns +// (false, error). Otherwise, it compares their region values and returns (true, +// nil) if they are different, and (false, nil) otherwise. +func (l Locality) IsCrossRegion(other Locality) (bool, error) { + // It is unfortunate that the "region" tier key is hardcoded here. Ideally, we + // would prefer a more robust way to determine node locality regions. + region, hasRegion := l.Find("region") + otherRegion, hasRegionOther := other.Find("region") + + if hasRegion && hasRegionOther { + return region != otherRegion, nil + } + return false, errors.Errorf("locality must have a tier with key region") +} + // SharedPrefix returns the number of this locality's tiers which match those of // the passed locality. func (l Locality) SharedPrefix(other Locality) int {