Skip to content

Commit

Permalink
kvserver: add cross-region snapshot byte metrics to StoreMetrics
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region snapshot traffic
between stores within a cluster.

To improve this issue, this commit adds two new store metrics -
`range.snapshots.cross-region.sent-bytes` and
`range.snapshots.cross-region.rcvd-bytes`. These metrics track the aggregate
snapshot bytes sent from and received at a store.

Fixes: #104124

Release note (ops change): Two new store metrics -
`range.snapshots.cross-region.sent-bytes` and
`range.snapshots.cross-region.rcvd-bytes` - are now added to track the aggregate
snapshot bytes sent from and received at a store. Note that these metrics
require nodes’ localities to include a “region” tier key. If a node lacks this
key but is involved in cross-region batch activities, an error message will be
logged.
  • Loading branch information
wenyihu6 committed Jun 1, 2023
1 parent 90b27e9 commit b685267
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 50 deletions.
39 changes: 33 additions & 6 deletions pkg/kv/kvserver/allocator/storepool/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1385,16 +1385,43 @@ func (sp *StorePool) GetLocalitiesPerReplica(
return localities
}

// getNodeLocalityWithString returns the locality information and the string
// format for the given node.
func (sp *StorePool) getNodeLocalityWithString(nodeID roachpb.NodeID) localityWithString {
nodeLocality := localityWithString{}
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
if locality, ok := sp.localitiesMu.nodeLocalities[nodeID]; ok {
nodeLocality = locality
}
// Return an empty localityWithString struct if nothing is found.
return nodeLocality
}

// GetNodeLocalityString returns the locality information for the given node
// in its string format.
func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string {
sp.localitiesMu.RLock()
defer sp.localitiesMu.RUnlock()
locality, ok := sp.localitiesMu.nodeLocalities[nodeID]
if !ok {
return ""
return sp.getNodeLocalityWithString(nodeID).str
}

// getNodeLocality returns the locality information for the given node.
func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality {
return sp.getNodeLocalityWithString(nodeID).locality
}

// IsCrossRegion takes in two replicas and compares the locality of them based
// on their replica node IDs. It returns (bool, error) indicating whether the
// two replicas’ nodes are in different regions and if any errors occurred
// during the lookup process.
func (sp *StorePool) IsCrossRegion(
firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) (bool, error) {
isCrossRegion, err := sp.getNodeLocality(firstReplica.NodeID).IsCrossRegion(
sp.getNodeLocality(secReplica.NodeID))
if err != nil {
return false, err
}
return locality.str
return isCrossRegion, nil
}

// IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,18 @@ var (
Measurement: "Snapshots",
Unit: metric.Unit_COUNT,
}
metaRangeSnapShotCrossRegionSentBytes = metric.Metadata{
Name: "range.snapshots.cross-region.sent-bytes",
Help: "Number of snapshot bytes sent cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapShotCrossRegionRcvdBytes = metric.Metadata{
Name: "range.snapshots.cross-region.rcvd-bytes",
Help: "Number of snapshot bytes received cross region",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaRangeSnapshotSendQueueLength = metric.Metadata{
Name: "range.snapshots.send-queue",
Help: "Number of snapshots queued to send",
Expand Down Expand Up @@ -2173,6 +2185,8 @@ type StoreMetrics struct {
RangeSnapshotRebalancingSentBytes *metric.Counter
RangeSnapshotRecvFailed *metric.Counter
RangeSnapshotRecvUnusable *metric.Counter
RangeSnapShotCrossRegionSentBytes *metric.Counter
RangeSnapShotCrossRegionRcvdBytes *metric.Counter

// Range snapshot queue metrics.
RangeSnapshotSendQueueLength *metric.Gauge
Expand Down Expand Up @@ -2789,6 +2803,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
RangeSnapshotRebalancingSentBytes: metric.NewCounter(metaRangeSnapshotRebalancingSentBytes),
RangeSnapshotRecvFailed: metric.NewCounter(metaRangeSnapshotRecvFailed),
RangeSnapshotRecvUnusable: metric.NewCounter(metaRangeSnapshotRecvUnusable),
RangeSnapShotCrossRegionSentBytes: metric.NewCounter(metaRangeSnapShotCrossRegionSentBytes),
RangeSnapShotCrossRegionRcvdBytes: metric.NewCounter(metaRangeSnapShotCrossRegionRcvdBytes),
RangeSnapshotSendQueueLength: metric.NewGauge(metaRangeSnapshotSendQueueLength),
RangeSnapshotRecvQueueLength: metric.NewGauge(metaRangeSnapshotRecvQueueLength),
RangeSnapshotSendInProgress: metric.NewGauge(metaRangeSnapshotSendInProgress),
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3151,6 +3151,9 @@ func (r *Replica) followerSendSnapshot(
r.store.metrics.DelegateSnapshotSendBytes.Inc(inc)
}
r.store.metrics.RangeSnapshotSentBytes.Inc(inc)
if r.store.shouldIncrementCrossRegionSnapshotMetrics(ctx, req.CoordinatorReplica, req.RecipientReplica) {
r.store.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc)
}

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
Expand Down
137 changes: 93 additions & 44 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1051,10 +1051,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)

senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: snapshotLength, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)

Expand All @@ -1067,10 +1068,11 @@ func testRaftSnapshotsToNonVoters(t *testing.T, drainReceivingNode bool) {
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)

receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".rebalancing": {sentBytes: 0, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: snapshotLength},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
".cross-region": {sentBytes: 0, rcvdBytes: 0},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
}
Expand Down Expand Up @@ -2208,8 +2210,8 @@ func getSnapshotBytesMetrics(
rcvdBytes: getFirstStoreMetric(t, tc.Server(serverIdx), rcvdMetricStr),
}
}
types := [4]string{".unknown", ".recovery", ".rebalancing", ""}
for _, v := range types {

for _, v := range [5]string{".unknown", ".recovery", ".rebalancing", ".cross-region", ""} {
metrics[v] = findSnapshotBytesMetrics(v)
}
return metrics
Expand Down Expand Up @@ -2294,11 +2296,12 @@ func getExpectedSnapshotSizeBytes(
return int64(b.Len()), err
}

// Tests the accuracy of the 'range.snapshots.rebalancing.rcvd-bytes' and
// 'range.snapshots.rebalancing.sent-bytes' metrics. This test adds a new
// replica to a cluster, and during the process, a learner snapshot is sent to
// the new replica.
func TestRebalancingSnapshotMetrics(t *testing.T) {
// Tests the accuracy of the
// 'range.snapshots.[rebalancing|cross-region].rcvd-bytes' and
// 'range.snapshots.[rebalancing|cross-region].sent-bytes' metrics. This test
// adds a new replica to a cluster, and during the process, a learner snapshot
// is sent to the new replica.
func TestRebalancingAndCrossRegionSnapshotMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -2321,11 +2324,36 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
}
}

// The initial setup ensures the correct configuration for three nodes (with
// different localities), single-range.
const numNodes = 3
serverArgs := make(map[int]base.TestServerArgs)
for i := 0; i < numNodes; i++ {
if i == 2 {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "zone", Value: fmt.Sprintf("us-east-%va", i)}},
},
Knobs: knobs,
}
} else {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: fmt.Sprintf("us-east-%v", i)}},
},
Knobs: knobs,
}
}
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 2, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
tc := testcluster.StartTestCluster(
t, numNodes, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
ReplicationMode: base.ReplicationManual,
},
)

defer tc.Stopper().Stop(ctx)

// sendSnapshotFromServer is a testing helper function that sends a learner
Expand Down Expand Up @@ -2365,38 +2393,59 @@ func TestRebalancingSnapshotMetrics(t *testing.T) {
}

// Record the snapshot metrics before anything has been sent / received.
senderMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
receiverMetricsMapBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
senderBefore := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
firstReceiverBefore := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
secReceiverBefore := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)
scratchStartKey := tc.ScratchRange(t)

// A learner snapshot should have been sent from the sender to the receiver.
_, snapshotLength := sendSnapshotToServer(scratchStartKey, 1)

// Record the snapshot metrics for the sender after a voter has been added.
senderMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 0)
// Asserts that the learner snapshot (aka rebalancing snapshot) bytes sent
// have been recorded, and that it was not double counted in a different
// metric.
senderMapDelta := getSnapshotMetricsDiff(senderMetricsMapBefore, senderMetricsMapAfter)
senderMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: snapshotLength, rcvdBytes: 0},
// A cross-region learner snapshot should have been sent from the
// sender(server[0]) to the server[1] and server[2].
newRangeDesc, firstSnapshotLength := sendSnapshotToServer(scratchStartKey, 1)
_, secSnapshotLength := sendSnapshotToServer(newRangeDesc.StartKey.AsRawKey(), 2)
totalSnapshotLength := firstSnapshotLength + secSnapshotLength

// Compare the snapshot metrics for the sender after sending two snapshots to
// server[1] and server[2].
senderAfter := getSnapshotBytesMetrics(t, tc, 0 /* serverIdx */)
senderDelta := getSnapshotMetricsDiff(senderBefore, senderAfter)
senderExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: snapshotLength, rcvdBytes: 0},
// Assert that the cross-region metrics should not be affected by the second
// snapshot (since server[2]'s locality does not include a "region" tier
// key).
".cross-region": {sentBytes: firstSnapshotLength, rcvdBytes: 0},
"": {sentBytes: totalSnapshotLength, rcvdBytes: 0},
}
require.Equal(t, senderMapExpected, senderMapDelta)
require.Equal(t, senderExpected, senderDelta)

// Compare the snapshot metrics for server[1] after receiving the first
// cross-region and rebalancing snapshot.
firstReceiverMetricsAfter := getSnapshotBytesMetrics(t, tc, 1 /* serverIdx */)
firstReceiverDelta := getSnapshotMetricsDiff(firstReceiverBefore, firstReceiverMetricsAfter)
firstReceiverExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
".cross-region": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
"": {sentBytes: 0, rcvdBytes: firstSnapshotLength},
}
require.Equal(t, firstReceiverExpected, firstReceiverDelta)

// Record the snapshot metrics for the receiver after a voter has been added.
receiverMetricsMapAfter := getSnapshotBytesMetrics(t, tc, 1)
// Asserts that the learner snapshot (aka rebalancing snapshot) bytes received
// have been recorded, and that it was not double counted in a different
// metric.
receiverMapDelta := getSnapshotMetricsDiff(receiverMetricsMapBefore, receiverMetricsMapAfter)
receiverMapExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: snapshotLength},
// Compare the snapshot metrics for server[2] after receiving the second
// snapshot.
secReceiverAfter := getSnapshotBytesMetrics(t, tc, 2 /* serverIdx */)
secReceiverDelta := getSnapshotMetricsDiff(secReceiverBefore, secReceiverAfter)
secReceiverExpected := map[string]snapshotBytesMetrics{
".rebalancing": {sentBytes: 0, rcvdBytes: secSnapshotLength},
".recovery": {sentBytes: 0, rcvdBytes: 0},
".unknown": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: snapshotLength},
// Assert that the cross-region metrics should not be affected by the second
// snapshot (since server[2]'s locality does not include a "region" tier
// key).
".cross-region": {sentBytes: 0, rcvdBytes: 0},
"": {sentBytes: 0, rcvdBytes: secSnapshotLength},
}
require.Equal(t, receiverMapExpected, receiverMapDelta)
require.Equal(t, secReceiverExpected, secReceiverDelta)
}
21 changes: 21 additions & 0 deletions pkg/kv/kvserver/store_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,22 @@ func (s *Store) checkSnapshotOverlapLocked(
return nil
}

func (s *Store) shouldIncrementCrossRegionSnapshotMetrics(
ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor,
) bool {
storePool := s.cfg.StorePool
if storePool == nil {
log.VEventf(ctx, 2, "store pool is nil")
return false
}
isCrossRegion, err := storePool.IsCrossRegion(firstReplica, secReplica)
if err != nil {
log.VEventf(ctx, 2, "%v", err)
return false
}
return isCrossRegion
}

// receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream.
func (s *Store) receiveSnapshot(
ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream,
Expand Down Expand Up @@ -1089,6 +1105,11 @@ func (s *Store) receiveSnapshot(
recordBytesReceived := func(inc int64) {
s.metrics.RangeSnapshotRcvdBytes.Inc(inc)

if s.shouldIncrementCrossRegionSnapshotMetrics(
ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica) {
s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc)
}

switch header.Priority {
case kvserverpb.SnapshotRequest_RECOVERY:
s.metrics.RangeSnapshotRecoveryRcvdBytes.Inc(inc)
Expand Down
16 changes: 16 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) {
return true, Tier{}
}

// IsCrossRegion checks if both this and passed locality has a tier with "region"
// as the key. If either locality does not have a region tier, it returns
// (false, error). Otherwise, it compares their region values and returns (true,
// nil) if they are different, and (false, nil) otherwise.
func (l Locality) IsCrossRegion(other Locality) (bool, error) {
// It is unfortunate that the "region" tier key is hardcoded here. Ideally, we
// would prefer a more robust way to determine node locality regions.
region, hasRegion := l.Find("region")
otherRegion, hasRegionInOther := other.Find("region")

if hasRegion && hasRegionInOther {
return region != otherRegion, nil
}
return false, errors.Errorf("locality must have a region tier key for cross-region comparison")
}

// SharedPrefix returns the number of this locality's tiers which match those of
// the passed locality.
func (l Locality) SharedPrefix(other Locality) int {
Expand Down

0 comments on commit b685267

Please sign in to comment.