Skip to content

Commit

Permalink
kvserver: add cross-region snapshot byte metrics to StoreMetrics
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region snapshot traffic in a
store.

To improve this issue, this commit adds two new metrics to StoreMetrics -
RangeSnapShotCrossRegionSentBytes and RangeSnapShotCrossRegionRcvdBytes. These
metrics track the byte count for snapshots sent and received within a store.

Release note (ops change): Two new metrics - RangeSnapShotCrossRegionSentBytes,
RangeSnapShotCrossRegionRcvdBytes - are now added to StoreMetrics.

Fixes: #104124
  • Loading branch information
wenyihu6 committed May 30, 2023
1 parent b0db476 commit 6ba2566
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 33 deletions.
42 changes: 36 additions & 6 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,16 +1385,46 @@ func (sp *StorePool) GetLocalitiesPerReplica(
return localities
}

// GetNodeLocalityWithString returns the locality information and its string
// format for the given node.
func (sp *StorePool) GetNodeLocalityWithString(nodeID roachpb.NodeID) localityWithString {
nodeLocality := localityWithString{
str: "",
locality: roachpb.Locality{},
}
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
if locality, ok := sp.localitiesMu.nodeLocalities[nodeID]; ok {
nodeLocality = locality
}
// Return an empty localityWithString struct if nothing is found.
return nodeLocality
}

// GetNodeLocalityString returns the locality information for the given node
// in its string format.
func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string {
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
locality, ok := sp.localitiesMu.nodeLocalities[nodeID]
if !ok {
return ""
return sp.GetNodeLocalityWithString(nodeID).str
}

// GetNodeLocality returns the locality information for the given node.
func (sp *StorePool) GetNodeLocality(nodeID roachpb.NodeID) roachpb.Locality {
return sp.GetNodeLocalityWithString(nodeID).locality
}

// IsCrossRegion takes in two replicas and compares the locality of them based
// on their replica node IDs. It returns (bool, error) indicating whether the
// two replicas’ nodes are in different regions and if any errors occurred
// during the lookup process.
func (sp *StorePool) IsCrossRegion(
firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) (bool, error) {
isCrossRegion, err := sp.GetNodeLocality(firstReplica.NodeID).IsCrossRegion(
sp.GetNodeLocality(secReplica.NodeID))
if err != nil {
return false, err
}
return locality.str
return isCrossRegion, nil
}

// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapShotCrossRegionSentBytes = metric.Metadata{
Name: "range.snapshots.cross-region.sent-bytes",
Help: "Number of snapshot bytes sent that were cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapShotCrossRegionRcvdBytes = metric.Metadata{
Name: "range.snapshots.cross-region.rcvd-bytes",
Help: "Number of snapshot bytes received that were cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotSendQueueLength = metric.Metadata{
Name: "range.snapshots.send-queue",
Help: "Number of snapshots queued to send",
Expand Down Expand Up @@ -2173,6 +2185,8 @@ type StoreMetrics struct {
RangeSnapshotRebalancingSentBytes *metric.Counter
RangeSnapshotRecvFailed *metric.Counter
RangeSnapshotRecvUnusable *metric.Counter
RangeSnapShotCrossRegionSentBytes *metric.Counter
RangeSnapShotCrossRegionRcvdBytes *metric.Counter

// Range snapshot queue metrics.
RangeSnapshotSendQueueLength *metric.Gauge
Expand Down Expand Up @@ -2789,6 +2803,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes),
RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed),
RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable),
RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes),
RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes),
RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength),
RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength),
RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress),
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3151,7 +3151,11 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.DelegateSnapshotSendBytes.Inc(inc)
}
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)

if isCrossRegion, err := r.store.cfg.StorePool.IsCrossRegion(
req.CoordinatorReplica, req.RecipientReplica); isCrossRegion && err == nil {
// Increment if the snapshot was sent cross-region.
r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc)
}
switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
r.store.metrics.RangeSnapshotRecoverySentBytes.Inc(inc)
Expand Down
94 changes: 68 additions & 26 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

Expand All @@ -1067,10 +1068,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
}
Expand Down Expand Up @@ -2208,7 +2210,7 @@ func getSnapshotBytesMetrics(
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr),
}
}
types := [4]string{".unknown", ".recovery", ".rebalancing", ""}
types := [5]string{".unknown", ".recovery", ".rebalancing", ".cross-region", ""}
for _, v := range types {
metrics[v] = findSnapshotBytesMetrics(v)
}
Expand Down Expand Up @@ -2294,11 +2296,12 @@ func getExpectedSnapshotSizeBytes(
return int64(b.Len()), err
}

// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and
// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new
// replica to a cluster, and during the process, a learner snapshot is sent to
// the new replica.
func TestRebalancingSnapshotMetrics(t *testing.T) {
// Tests the accuracy of the
// 'range.snapshots.[rebalancing|cross-region].rcvd-bytes' and
// 'range.snapshots.[rebalancing|cross-region].sent-bytes' metrics. This test
// adds a new replica to a cluster, and during the process, a learner snapshot
// is sent to the new replica.
func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -2318,21 +2321,45 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
}
}

// The initial setup ensures the correct configuration for three nodes (with
// different localities), single-range.
const numNodes = 3
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{
Key: "region", Value: fmt.Sprintf("us-east-%v", i),
},
},
},
Knobs: knobs,
}
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
tc := testcluster.StartTestCluster(
t, numNodes, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ReplicationMode: base.ReplicationManual,
},
)

defer tc.Stopper().Stop(ctx)

// Note that a range is scratched off from servers[0].
scratchStartKey := tc.ScratchRange(t)

// Record the snapshot metrics before anything has been sent / received.
senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
dummyNodeMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)

g := ctxgroup.WithContext(ctx)
g.GoCtx(func(ctx context.Context) error {
// A new replica at servers[1] is now added to the cluster, resulting in a
// learner snapshot to be sent cross-region from servers[0] to servers[1].
_, err := tc.AddVoters(scratchStartKey, tc.Target(1))
return err
})
Expand All @@ -2359,10 +2386,11 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".cross-region": {sentBytes: snapshotLength, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

Expand All @@ -2375,10 +2403,24 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: snapshotLength},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)

// Asserts that the dummy node sends/receives no cross-region snapshots, and
// its metrics should remain unchanged.
dummyNodeMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)
dummyNodeMapDelta := getSnapshotMetricsDiff(dummyNodeMetricsMapBefore, dummyNodeMetricsMapAfter)
dummyNodeMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, dummyNodeMapExpected, dummyNodeMapDelta)
}
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,13 @@ func (s *Store) receiveSnapshot(
recordBytesReceived := func(inc int64) {
s.metrics.RangeSnapshotRcvdBytes.Inc(inc)

if isCrossRegion, _ := s.cfg.StorePool.IsCrossRegion(
// Increment if the snapshot was from cross-region.
header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica,
); isCrossRegion && err == nil {
s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc)
}

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc)
Expand Down
16 changes: 16 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) {
return true, Tier{}
}

// IsCrossRegion checks if both this and passed locality has a tier with "region"
// as the key. If either locality does not have a region tier, it returns
// (false, error). Otherwise, it compares their region values and returns (true,
// nil) if they are different, and (false, nil) otherwise.
func (l Locality) IsCrossRegion(other Locality) (bool, error) {
// It is unfortunate that the "region" tier key is hardcoded here. Ideally, we
// would prefer a more robust way to determine node locality regions.
region, hasRegion := l.Find("region")
otherRegion, hasRegionOther := other.Find("region")

if hasRegion && hasRegionOther {
return region != otherRegion, nil
}
return false, errors.Errorf("locality must have a tier with key region")
}

// SharedPrefix returns the number of this locality's tiers which match those of
// the passed locality.
func (l Locality) SharedPrefix(other Locality) int {
Expand Down

0 comments on commit 6ba2566

Please sign in to comment.