diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e962a94fbc71..0d7b2cd062de 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -395,6 +395,45 @@ func (dm *DistSenderMetrics) getDistSenderCounterMetrics( return metricCountMap, nil } +// updateMetricsOnReplicaAddressedBatchRequest updates DistSenderMetrics for +// batch requests that have been divided and are currently forwarding to a +// specific replica for the corresponding range. The metrics being updated +// include 1. cross-region metrics, which monitor activities across different +// regions, and 2. cross-zone metrics, which monitor activities across different +// zones within the same region or in cases where region tiers are not +// configured. These metrics may include batches that were not successfully sent +// but were terminated at an early stage. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchRequestBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateMetricsOnReplicaAddressedBatchResponse updates DistSenderMetrics for +// batch responses that are received back from transport rpc. It updates based +// on the comparisonResult parameter determined during the initial batch +// requests check. The underlying assumption is that the response should match +// the cross-region or cross-zone nature of the requests. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchResponseBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchResponseBytes.Inc(inc) + } +} + // FirstRangeProvider is capable of providing DistSender with the descriptor of // the first range in the cluster and notifying the DistSender when the first // range in the cluster has changed. @@ -465,14 +504,6 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch request byte count to the test. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch response byte count to the test. - BatchResponseInterceptor func(br *kvpb.BatchResponse) - // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -630,14 +661,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } - if cfg.TestingKnobs.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor - } - - if cfg.TestingKnobs.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor - } - return ds } @@ -2293,15 +2316,11 @@ func (ds *DistSender) sendToReplicas( (desc.Generation == 0 && routing.LeaseSeq() == 0), } - if ds.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor(ba) - } - comparisonResult := ds.checkAndUpdateCrossLocalityBatchMetrics(ctx, ba) + comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + br, err = transport.SendNext(ctx, ba) - if ds.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor(br) - } - ds.updateCrossLocalityBatchMetrics(br, comparisonResult) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2557,71 +2576,33 @@ func (ds *DistSender) sendToReplicas( } } -// getCrossLocalityComparison compares the localities of the current node and -// the destination range node to determine if the given batch request is -// cross-region and cross-zone. -func (ds *DistSender) getCrossLocalityComparison( - ctx context.Context, ba *kvpb.BatchRequest, +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (ds *DistSender) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { - gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) return roachpb.LocalityComparisonType_UNDEFINED } - destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) return roachpb.LocalityComparisonType_UNDEFINED } comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross zone %v", zoneErr) - } - return comparisonResult -} - -// checkAndUpdateCrossLocalityBatchMetrics updates the batch requests metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. The -// locality comparison result is returned here to avoid redundant check for -// metrics updates after receiving batch responses. -func (ds *DistSender) checkAndUpdateCrossLocalityBatchMetrics( - ctx context.Context, ba *kvpb.BatchRequest, -) roachpb.LocalityComparisonType { - ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) - comparisonResult := ds.getCrossLocalityComparison(ctx, ba) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// updateCrossLocalityBatchMetrics updates the batch response metrics based on -// the comparisonResult parameter determined during the initial batch requests -// check. The underlying assumption is that the response should match the -// cross-region or cross-zone nature of the request. -func (ds *DistSender) updateCrossLocalityBatchMetrics( - br *kvpb.BatchResponse, comparisonResult roachpb.LocalityComparisonType, -) { - ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } -} - // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index e8319fabc1a3..86cb8e8c5d9f 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5664,186 +5664,75 @@ func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[stri return diffMap } -// TestDistSenderBatchMetrics verifies that the DistSender.Send() -// correctly updates the cross-region, cross-zone byte count metrics. -func TestDistSenderBatchMetrics(t *testing.T) { +// TestDistSenderCrossLocalityMetrics verifies that +// updateMetricsOnReplicaAddressedBatch{Request|Response} correctly updates +// cross-region, cross-zone byte count metrics for batch requests sent and batch +// responses received. +func TestDistSenderCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - - // The initial setup ensures the correct setup for three nodes (with different - // localities), single-range, three replicas (on different nodes). - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - rangeDesc := testUserRangeDescriptor3Replicas - replicas := rangeDesc.InternalReplicas - - // The servers localities are configured so that the first batch request sent - // from server0 to server0 is same-region, same-zone. The second batch request - // sent from server0 to server1 is cross-region. The second batch request sent - // from server0 to server2 is cross-zone within the same region. - const numNodes = 3 - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - nodes := make([]roachpb.NodeDescriptor, 3) - for i := 0; i < numNodes; i++ { - nodes[i] = roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), - Address: util.UnresolvedAddr{}, - Locality: serverLocality[i], - } - } - ns := &mockNodeStore{nodes: nodes} - - var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - return ba.CreateReply(), nil - } - interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) - cfg := DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: ns, - RPCContext: rpcContext, - RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(transportFn), - BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { - interceptedBatchRequestBytes = int64(ba.Size()) - }, - BatchResponseInterceptor: func(br *kvpb.BatchResponse) { - interceptedBatchResponseBytes = int64(br.Size()) - }, - }, - Settings: cluster.MakeTestingClusterSettings(), - } + defer log.Scope(t).Close(t) + const expectedInc = 10 - distSender := NewDistSender(cfg) metricsNames := []string{ "distsender.batch_requests.replica_addressed.bytes", - "distsender.batch_responses.replica_addressed.bytes", "distsender.batch_requests.cross_region.bytes", - "distsender.batch_responses.cross_region.bytes", "distsender.batch_requests.cross_zone.bytes", - "distsender.batch_responses.cross_zone.bytes"} - - getExpectedDelta := func( - isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, - ) map[string]int64 { - ternaryOp := func(b bool, num int64) (res int64) { - if b { - res = num - } - return res - } - - expectedDelta := make(map[string]int64) - expectedDelta[metricsNames[0]] = interceptedRequest - expectedDelta[metricsNames[1]] = interceptedResponse - expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) - expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) - expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) - expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) - return expectedDelta + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_responses.cross_zone.bytes", } - - sameRegionSameZoneRequest := int64(0) - sameRegionSameZoneResponse := int64(0) - for _, tc := range []struct { - toReplica int - isCrossRegion bool - isCrossZone bool + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool }{ - // First test sets replica[0] as leaseholder, enforcing a within-region, - // within-zone batch request / response. - {toReplica: 0, isCrossRegion: false, isCrossZone: false}, - // Second test sets replica[1] as leaseholder, enforcing a cross-region, - // batch request / response. Note that although the request is cross-zone, - // the cross-zone metrics is not expected to increment. - {toReplica: 1, isCrossRegion: true, isCrossZone: false}, - // Third test sets replica[2] as leaseholder, enforcing a within-region, - // cross-zone batch request / response. Cross-zone metrics is only expected - // to increment when it is cross-zone, same-region activities. - {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, } { - t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { - beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeDistSenderMetrics() + beforeMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { - t.Fatal(err) + t.Error(err) } - - ba := &kvpb.BatchRequest{} - if tc.toReplica == 0 { - // Send a different request type for the first request to avoid having - // the same byte count for three requests and coincidental correct - // results. - get := &kvpb.GetRequest{} - get.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(get) + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(tc.crossLocalityType, expectedInc) } else { - put := &kvpb.PutRequest{} - put.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(put) - } - - ba.Header = kvpb.Header{ - // DistSender is set to be at the server0. - GatewayNodeID: 1, + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(tc.crossLocalityType, expectedInc) } - distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: rangeDesc, - Lease: roachpb.Lease{ - Replica: replicas[tc.toReplica], - }, - }) - if _, err := distSender.Send(ctx, ba); err != nil { - t.Fatal(err) - } - - require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), - "expected bytes not set correctly") - require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), - "expected bytes not set correctly") - if tc.toReplica == 0 { - // Record the first batch request and response that was sent same - // region, same zone for future testing. - sameRegionSameZoneRequest = interceptedBatchRequestBytes - sameRegionSameZoneResponse = interceptedBatchResponseBytes - } - - expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, - interceptedBatchRequestBytes, interceptedBatchResponseBytes) - afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + afterMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { t.Error(err) } - require.Equal(t, expected, diffMetrics) - }) - t.Run("SameRegionSameZone", func(t *testing.T) { - // Since the region and zone tiers are all configured in this test, we - // expect that the byte count of batch requests sent within the same - // region and same zone should equal to the total byte count of requests - // minus the combined byte count of cross-region and cross-zone requests - // metrics. Similar expectation for batch responses. - metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - if err != nil { - t.Error(err) + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc } - totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] - totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] - crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] - crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] - crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] - crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] - require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) - require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + require.Equal(t, metricsDiff, expectedDiff) }) } } diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index dc7b88927ad7..58bb2efafe1d 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,19 +61,6 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - - // Currently, BatchRequestInterceptor and BatchResponseInterceptor only - // intercepts DistSender.Send() to pass the actual batch request and response - // byte count to the test. However, it can be easily extended to validate - // other properties of batch requests / response if required. - - // BatchRequestInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchRequest properties. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchResponseInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchResponse properties. - BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index e0a323a80816..cf258e7ce05e 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3202,6 +3202,44 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { } } +// updateCrossLocalityMetricsOnSnapshotSent updates cross-locality related store +// metrics when outgoing snapshots are sent to the outgoingSnapshotStream. The +// metrics being updated include 1. cross-region metrics, which monitor +// activities across different regions, and 2. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotSent( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateCrossLocalityMetricsOnSnapshotRcvd updates cross-locality related store +// metrics when receiving SnapshotRequests through streaming and constructing +// incoming snapshots. The metrics being updated include 1. cross-region +// metrics, which monitor activities across different regions, and 2. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index af2b6fef7a73..8ba5d78d19ae 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3146,14 +3146,16 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + comparisonResult := r.store.getLocalityComparison(ctx, req.CoordinatorReplica.NodeID, + req.RecipientReplica.NodeID) + recordBytesSent := func(inc int64) { // Only counts for delegated bytes if we are not self-delegating. if r.NodeID() != req.CoordinatorReplica.NodeID { r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) - r.store.checkAndUpdateCrossLocalitySnapshotMetrics( - ctx, req.CoordinatorReplica, req.RecipientReplica, inc, true /* isSent */) + r.store.metrics.updateCrossLocalityMetricsOnSnapshotSent(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d2ac642b3418..ee1ea28e198c 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -973,56 +973,24 @@ func (s *Store) checkSnapshotOverlapLocked( return nil } -// getCrossLocalityComparison compares the localities of the two given replicas. -func (s *Store) getCrossLocalityComparison( - ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (s *Store) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { - firstLocality := s.cfg.StorePool.GetNodeLocality(firstReplica.NodeID) - secLocality := s.cfg.StorePool.GetNodeLocality(secReplica.NodeID) + firstLocality := s.cfg.StorePool.GetNodeLocality(fromNodeID) + secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID) comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross zone %v", zoneErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// checkAndUpdateCrossLocalitySnapshotMetrics updates the snapshot metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. -func (s *Store) checkAndUpdateCrossLocalitySnapshotMetrics( - ctx context.Context, - firstReplica roachpb.ReplicaDescriptor, - secReplica roachpb.ReplicaDescriptor, - inc int64, - isSent bool, -) { - comparisonResult := s.getCrossLocalityComparison(ctx, firstReplica, secReplica) - if isSent { - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - s.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - s.metrics.RangeSnapShotCrossZoneSentBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - } else { - // isReceived - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - s.metrics.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - } -} - // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, @@ -1137,10 +1105,12 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } + comparisonResult := s.getLocalityComparison(ctx, + header.RaftMessageRequest.FromReplica.NodeID, header.RaftMessageRequest.ToReplica.NodeID) + recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) - s.checkAndUpdateCrossLocalitySnapshotMetrics( - ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, inc, false /* isSent */) + s.metrics.updateCrossLocalityMetricsOnSnapshotRcvd(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 02cf4c88d016..fbe8be7c0ae2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -100,16 +100,6 @@ type StoreTestingKnobs struct { // no-op write, and the ForcedError field will be set. TestingPostApplyFilter kvserverbase.ReplicaApplyFilter - // TestingBatchRequestFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchRequestFilter func(*kvpb.BatchRequest) bool - - // TestingBatchResponseFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchResponseFilter func(*kvpb.BatchResponse) bool - // TestingResponseErrorEvent is called when an error is returned applying // a command. TestingResponseErrorEvent func(context.Context, *kvpb.BatchRequest, error) diff --git a/pkg/server/node.go b/pkg/server/node.go index 11fa232a403d..5ded46a0cf78 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -320,6 +320,44 @@ func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]in return metricCountMap, nil } +// updateCrossLocalityMetricsOnBatchRequest updates nodeMetrics for batch +// requests processed on the node. The metrics being updated include 1. +// cross-region metrics, which monitor activities across different regions, and +// 2. cross-zone metrics, which monitor activities across different zones within +// the same region or in cases where region tiers are not configured. These +// metrics may include batches that were not successfully sent but were +// terminated at an early stage. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchRequestsBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateCrossLocalityMetricsOnBatchResponse updates nodeMetrics for batch +// responses that are received back. It updates based on the comparisonResult +// parameter determined during the initial batch requests check. The underlying +// assumption is that the response should match the cross-region or cross-zone +// nature of the requests. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchResponsesBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchResponseBytes.Inc(inc) + } +} + // A Node manages a map of stores (by store ID) for which it serves // traffic. A node is the top-level data structure. There is one node // instance per process. A node accepts incoming RPCs and services @@ -1325,11 +1363,11 @@ func (n *Node) batchInternal( return br, nil } -// getCrossLocalityComparison compares the localities of the gateway node and -// the current node to determine if the given batch request is cross-region and -// cross-zone. -func (n *Node) getCrossLocalityComparison( - ctx context.Context, ba *kvpb.BatchRequest, +// getLocalityComparison takes gatewayNodeID as input and returns the locality +// comparison result between the gateway node and the current node. This result +// indicates whether the two nodes are located in different regions or zones. +func (n *Node) getLocalityComparison( + ctx context.Context, gatewayNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { gossip := n.storeCfg.Gossip if gossip == nil { @@ -1337,7 +1375,7 @@ func (n *Node) getCrossLocalityComparison( return roachpb.LocalityComparisonType_UNDEFINED } - gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := gossip.GetNodeDescriptor(gatewayNodeID) if err != nil { log.VEventf(ctx, 2, "failed to perform look up for node descriptor %v", err) @@ -1346,63 +1384,15 @@ func (n *Node) getCrossLocalityComparison( comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross zone %v", zoneErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// checkAndUpdateCrossLocalityBatchMetrics updates the batch requests metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. The -// locality comparison result is returned here to avoid redundant check for -// metrics updates after receiving batch responses. -func (n *Node) checkAndUpdateCrossLocalityBatchMetrics( - ctx context.Context, ba *kvpb.BatchRequest, shouldIncrement bool, -) roachpb.LocalityComparisonType { - if !shouldIncrement { - // shouldIncrement is set to false using testing knob in specific tests to - // filter out metrics changes caused by irrelevant batch requests. - return roachpb.LocalityComparisonType_UNDEFINED - } - n.metrics.BatchRequestsBytes.Inc(int64(ba.Size())) - comparisonResult := n.getCrossLocalityComparison(ctx, ba) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - n.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - return comparisonResult -} - -// updateCrossLocalityBatchMetrics updates the batch response metrics based on -// the comparisonResult parameter determined during the initial batch requests -// check. The underlying assumption is that the response should match the -// cross-region or cross-zone nature of the request. -func (n *Node) updateCrossLocalityBatchMetrics( - br *kvpb.BatchResponse, comparisonResult roachpb.LocalityComparisonType, shouldIncrement bool, -) { - if !shouldIncrement { - // shouldIncrement is set to false using testing knob in specific tests to - // filter out metrics changes caused by irrelevant batch requests. - return - } - n.metrics.BatchResponsesBytes.Inc(int64(br.Size())) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - n.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } -} - // incrementBatchCounters increments counters to track the batch and composite // request methods. func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { @@ -1417,20 +1407,14 @@ func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { n.incrementBatchCounters(args) - shouldIncrement := true - if fn := n.storeCfg.TestingKnobs.TestingBatchRequestFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(args) - } - comparisonResult := n.checkAndUpdateCrossLocalityBatchMetrics(ctx, args, shouldIncrement) - // NB: Node.Batch is called directly for "local" calls. We don't want to // carry the associated log tags forward as doing so makes adding additional // log tags more expensive and makes local calls differ from remote calls. ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx) + comparisonResult := n.getLocalityComparison(ctx, args.GatewayNodeID) + n.metrics.updateCrossLocalityMetricsOnBatchRequest(comparisonResult, int64(args.Size())) + tenantID, ok := roachpb.ClientTenantFromContext(ctx) if !ok { tenantID = roachpb.SystemTenantID @@ -1475,14 +1459,7 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR br.Error = kvpb.NewError(err) } - shouldIncrement = true - if fn := n.storeCfg.TestingKnobs.TestingBatchResponseFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(br) - } - n.updateCrossLocalityBatchMetrics(br, comparisonResult, shouldIncrement) + n.metrics.updateCrossLocalityMetricsOnBatchResponse(comparisonResult, int64(br.Size())) if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil { n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError)) } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 09436a8c6ef9..0372c5bd910a 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -39,10 +39,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -760,17 +759,6 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr) } -// getNodesMetrics retrieves the count of each node metric specified in -// metricNames associated with the specified serverIdx and returns the result as -// a map, along with any lookup errors. -func getNodeCounterMetrics( - tc serverutils.TestClusterInterface, serverIdx int, metricsName []string, -) (map[string]int64, error) { - ts := tc.Server(serverIdx).(*TestServer) - metrics, err := ts.node.metrics.getNodeCounterMetrics(metricsName) - return metrics, err -} - // getNodesMetricsDiff returns the difference between the values of // corresponding metrics in two maps. // Assumption: beforeMap and afterMap contain the same set of keys. @@ -784,153 +772,76 @@ func getNodesMetricsDiff(beforeMap map[string]int64, afterMap map[string]int64) return diffMap } -// TestNodeBatchMetrics verifies that node.Batch() correctly updates the +// TestNodeCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnBatch{Request|Response} correctly updates // cross-region, cross-zone byte count metrics for batch requests sent and batch // responses received. -func TestNodeBatchMetrics(t *testing.T) { +func TestNodeCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - - // The initial setup ensures the correct configuration for three nodes (with - // different localities), single-range, and three replicas (on different - // nodes). - const numNodes = 3 - zcfg := zonepb.DefaultZoneConfig() - zcfg.NumReplicas = proto.Int32(1) - - type InterceptedInfo struct { - syncutil.Mutex - BatchRequestSize int64 - BatchResponseSize int64 - } - - info := InterceptedInfo{} - requestFn := func(ba *kvpb.BatchRequest) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if ba != nil && ba.Txn != nil { - if baTxnName := ba.Txn.Name; baTxnName == "cross-locality-test" { - info.BatchRequestSize = int64(ba.Size()) - return true + const expectedInc = 10 + + metricsNames := []string{ + "batch_requests.bytes", + "batch_requests.cross_region.bytes", + "batch_requests.cross_zone.bytes", + "batch_responses.bytes", + "batch_responses.cross_region.bytes", + "batch_responses.cross_zone.bytes"} + for _, tc := range []struct { + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool + }{ + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, + } { + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeNodeMetrics(metric.NewRegistry(), 1) + beforeMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) } - } - return false - } - - responseFn := func(br *kvpb.BatchResponse) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if br != nil && br.Txn != nil { - if brTxnName := br.Txn.Name; brTxnName == "cross-locality-test" { - info.BatchResponseSize = int64(br.Size()) - return true + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnBatchRequest(tc.crossLocalityType, expectedInc) + } else { + metrics.updateCrossLocalityMetricsOnBatchResponse(tc.crossLocalityType, expectedInc) } - } - return false - } - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - serverArgs := make(map[int]base.TestServerArgs) - for i := 0; i < numNodes; i++ { - serverArgs[i] = base.TestServerArgs{ - Locality: serverLocality[i], - Knobs: base.TestingKnobs{ - Server: &TestingKnobs{ - DefaultZoneConfigOverride: &zcfg, - }, - Store: &kvserver.StoreTestingKnobs{ - TestingBatchRequestFilter: requestFn, - TestingBatchResponseFilter: responseFn, - }, - }, - } - } - - ctx := context.Background() - var clusterArgs = base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: serverArgs, - } - - tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs) - defer tc.Stopper().Stop(ctx) - - testKey := tc.ScratchRange(t) - desc := tc.LookupRangeOrFatal(t, testKey) - - metrics := []string{"batch_requests.bytes", "batch_responses.bytes", - "batch_requests.cross_region.bytes", "batch_responses.cross_region.bytes", - "batch_requests.cross_zone.bytes", "batch_responses.cross_zone.bytes"} - receiverBefore, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - - // sendBatchToServer is a testing helper that sends a batch request from - // server[serverIndex] to server[0] and returns the number of bytes a batch - // request sent and a batch response received. - sendFromServer := func(serverIndex int) (int64, int64) { - get := &kvpb.GetRequest{ - RequestHeader: kvpb.RequestHeader{Key: testKey}, - } - var ba kvpb.BatchRequest - ba.GatewayNodeID = tc.Server(serverIndex).NodeID() - ba.Add(get) - ba.RangeID = desc.RangeID - ba.Replica.StoreID = tc.Server(0).GetFirstStoreID() - txn := roachpb.MakeTransaction( - "cross-locality-test", testKey, 0, 0, - hlc.Timestamp{WallTime: 1}, 0, 0) - ba.Txn = &txn - _, err := tc.Server(0).(*TestServer).GetNode().Batch(ctx, &ba) - require.NoError(t, err) - info.Lock() - defer info.Unlock() - return info.BatchRequestSize, info.BatchResponseSize + afterMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + metricsDiff := getNodesMetricsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc + } + require.Equal(t, metricsDiff, expectedDiff) + }) } - // The first batch request is sent from server0 to server0, enforcing a - // within-region, within-zone batch request / response. - firstBatchRequest, firstBatchResponse := sendFromServer(0) - // The second batch request is sent from server1 to server0, enforcing a - // cross-region batch request / response. - secBatchRequest, secBatchResponse := sendFromServer(1) - // The third batch request is sent from server2 to server0, enforcing a - // cross-zone, within-region batch request / response. - thirdBatchRequest, thirdBatchResponse := sendFromServer(2) - totalRequest := firstBatchRequest + secBatchRequest + thirdBatchRequest - totalResponse := firstBatchResponse + secBatchResponse + thirdBatchResponse - - t.Run("receiver", func(t *testing.T) { - receiverAfter, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - receiverDelta := getNodesMetricsDiff(receiverBefore, receiverAfter) - receiverExpected := map[string]int64{ - "batch_requests.bytes": firstBatchRequest + secBatchRequest + thirdBatchRequest, - "batch_responses.bytes": firstBatchResponse + secBatchResponse + thirdBatchResponse, - "batch_requests.cross_region.bytes": secBatchRequest, - "batch_responses.cross_region.bytes": secBatchResponse, - "batch_requests.cross_zone.bytes": thirdBatchRequest, - "batch_responses.cross_zone.bytes": thirdBatchResponse, - } - require.Equal(t, receiverExpected, receiverDelta) - require.Equal(t, firstBatchRequest, totalRequest-secBatchRequest-thirdBatchRequest) - require.Equal(t, firstBatchResponse, totalResponse-secBatchResponse-thirdBatchResponse) - }) } func TestGetTenantWeights(t *testing.T) {