From da864f092e152cee8d651c664162242079b7a7f4 Mon Sep 17 00:00:00 2001 From: Wenyi Date: Tue, 20 Jun 2023 16:06:07 -0400 Subject: [PATCH] kv: refactor metric updates test cases Previously, the metrics updates testing knobs were embedded within the logic of `DistSender.Send()` and `Node.Batch()`, which could confuse the readers of the code. This commit addresses the issue by removing end to end tests and replacing them with more focused unit tests. In addition, this commit also includes some refactoring of function parameters. Note that this commit does not change any existing functionality. Part of: https://github.com/cockroachdb/cockroach/issues/103983 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 128 +++++------- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 211 +++++-------------- pkg/kv/kvclient/kvcoord/testing_knobs.go | 13 -- pkg/kv/kvserver/metrics.go | 38 ++++ pkg/kv/kvserver/replica_command.go | 6 +- pkg/kv/kvserver/store_snapshot.go | 56 ++--- pkg/kv/kvserver/testing_knobs.go | 10 - pkg/server/node.go | 123 +++++------ pkg/server/node_test.go | 215 ++++++-------------- 9 files changed, 273 insertions(+), 527 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index e962a94fbc71..55ceb25a5715 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -395,6 +395,46 @@ func (dm *DistSenderMetrics) getDistSenderCounterMetrics( return metricCountMap, nil } +// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates +// DistSenderMetrics for batch requests that have been divided and are currently +// forwarding to a specific replica for the corresponding range. The metrics +// being updated include 1. total byte count of replica-addressed batch requests +// processed 2. cross-region metrics, which monitor activities across different +// regions, and 3. cross-zone metrics, which monitor activities across different +// zones within the same region or in cases where region tiers are not +// configured. These metrics may include batches that were not successfully sent +// but were terminated at an early stage. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchRequestBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateCrossLocalityMetricsOnReplicaAddressedBatchResponse updates +// DistSenderMetrics for batch responses that are received back from transport +// rpc. It updates based on the comparisonResult parameter determined during the +// initial batch requests check. The underlying assumption is that the response +// should match the cross-region or cross-zone nature of the requests. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchResponseBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchResponseBytes.Inc(inc) + } +} + // FirstRangeProvider is capable of providing DistSender with the descriptor of // the first range in the cluster and notifying the DistSender when the first // range in the cluster has changed. @@ -465,14 +505,6 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch request byte count to the test. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch response byte count to the test. - BatchResponseInterceptor func(br *kvpb.BatchResponse) - // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -630,14 +662,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } - if cfg.TestingKnobs.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor - } - - if cfg.TestingKnobs.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor - } - return ds } @@ -2293,15 +2317,11 @@ func (ds *DistSender) sendToReplicas( (desc.Generation == 0 && routing.LeaseSeq() == 0), } - if ds.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor(ba) - } - comparisonResult := ds.checkAndUpdateCrossLocalityBatchMetrics(ctx, ba) + comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + br, err = transport.SendNext(ctx, ba) - if ds.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor(br) - } - ds.updateCrossLocalityBatchMetrics(br, comparisonResult) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2557,71 +2577,33 @@ func (ds *DistSender) sendToReplicas( } } -// getCrossLocalityComparison compares the localities of the current node and -// the destination range node to determine if the given batch request is -// cross-region and cross-zone. -func (ds *DistSender) getCrossLocalityComparison( - ctx context.Context, ba *kvpb.BatchRequest, +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (ds *DistSender) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { - gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) return roachpb.LocalityComparisonType_UNDEFINED } - destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) return roachpb.LocalityComparisonType_UNDEFINED } comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross zone %v", zoneErr) - } - return comparisonResult -} - -// checkAndUpdateCrossLocalityBatchMetrics updates the batch requests metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. The -// locality comparison result is returned here to avoid redundant check for -// metrics updates after receiving batch responses. -func (ds *DistSender) checkAndUpdateCrossLocalityBatchMetrics( - ctx context.Context, ba *kvpb.BatchRequest, -) roachpb.LocalityComparisonType { - ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) - comparisonResult := ds.getCrossLocalityComparison(ctx, ba) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// updateCrossLocalityBatchMetrics updates the batch response metrics based on -// the comparisonResult parameter determined during the initial batch requests -// check. The underlying assumption is that the response should match the -// cross-region or cross-zone nature of the request. -func (ds *DistSender) updateCrossLocalityBatchMetrics( - br *kvpb.BatchResponse, comparisonResult roachpb.LocalityComparisonType, -) { - ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } -} - // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index e8319fabc1a3..c61c4636589e 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5664,186 +5664,75 @@ func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[stri return diffMap } -// TestDistSenderBatchMetrics verifies that the DistSender.Send() -// correctly updates the cross-region, cross-zone byte count metrics. -func TestDistSenderBatchMetrics(t *testing.T) { +// TestDistSenderCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnReplicaAddressedBatch{Request|Response} correctly +// updates cross-region, cross-zone byte count metrics for batch requests sent +// and batch responses received. +func TestDistSenderCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - - // The initial setup ensures the correct setup for three nodes (with different - // localities), single-range, three replicas (on different nodes). - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - rangeDesc := testUserRangeDescriptor3Replicas - replicas := rangeDesc.InternalReplicas - - // The servers localities are configured so that the first batch request sent - // from server0 to server0 is same-region, same-zone. The second batch request - // sent from server0 to server1 is cross-region. The second batch request sent - // from server0 to server2 is cross-zone within the same region. - const numNodes = 3 - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - nodes := make([]roachpb.NodeDescriptor, 3) - for i := 0; i < numNodes; i++ { - nodes[i] = roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), - Address: util.UnresolvedAddr{}, - Locality: serverLocality[i], - } - } - ns := &mockNodeStore{nodes: nodes} - - var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - return ba.CreateReply(), nil - } - interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) - cfg := DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: ns, - RPCContext: rpcContext, - RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(transportFn), - BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { - interceptedBatchRequestBytes = int64(ba.Size()) - }, - BatchResponseInterceptor: func(br *kvpb.BatchResponse) { - interceptedBatchResponseBytes = int64(br.Size()) - }, - }, - Settings: cluster.MakeTestingClusterSettings(), - } + defer log.Scope(t).Close(t) + const expectedInc = 10 - distSender := NewDistSender(cfg) metricsNames := []string{ "distsender.batch_requests.replica_addressed.bytes", - "distsender.batch_responses.replica_addressed.bytes", "distsender.batch_requests.cross_region.bytes", - "distsender.batch_responses.cross_region.bytes", "distsender.batch_requests.cross_zone.bytes", - "distsender.batch_responses.cross_zone.bytes"} - - getExpectedDelta := func( - isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, - ) map[string]int64 { - ternaryOp := func(b bool, num int64) (res int64) { - if b { - res = num - } - return res - } - - expectedDelta := make(map[string]int64) - expectedDelta[metricsNames[0]] = interceptedRequest - expectedDelta[metricsNames[1]] = interceptedResponse - expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) - expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) - expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) - expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) - return expectedDelta + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_responses.cross_zone.bytes", } - - sameRegionSameZoneRequest := int64(0) - sameRegionSameZoneResponse := int64(0) - for _, tc := range []struct { - toReplica int - isCrossRegion bool - isCrossZone bool + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool }{ - // First test sets replica[0] as leaseholder, enforcing a within-region, - // within-zone batch request / response. - {toReplica: 0, isCrossRegion: false, isCrossZone: false}, - // Second test sets replica[1] as leaseholder, enforcing a cross-region, - // batch request / response. Note that although the request is cross-zone, - // the cross-zone metrics is not expected to increment. - {toReplica: 1, isCrossRegion: true, isCrossZone: false}, - // Third test sets replica[2] as leaseholder, enforcing a within-region, - // cross-zone batch request / response. Cross-zone metrics is only expected - // to increment when it is cross-zone, same-region activities. - {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, } { - t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { - beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeDistSenderMetrics() + beforeMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { - t.Fatal(err) + t.Error(err) } - - ba := &kvpb.BatchRequest{} - if tc.toReplica == 0 { - // Send a different request type for the first request to avoid having - // the same byte count for three requests and coincidental correct - // results. - get := &kvpb.GetRequest{} - get.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(get) + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(tc.crossLocalityType, expectedInc) } else { - put := &kvpb.PutRequest{} - put.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(put) - } - - ba.Header = kvpb.Header{ - // DistSender is set to be at the server0. - GatewayNodeID: 1, + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(tc.crossLocalityType, expectedInc) } - distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: rangeDesc, - Lease: roachpb.Lease{ - Replica: replicas[tc.toReplica], - }, - }) - if _, err := distSender.Send(ctx, ba); err != nil { - t.Fatal(err) - } - - require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), - "expected bytes not set correctly") - require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), - "expected bytes not set correctly") - if tc.toReplica == 0 { - // Record the first batch request and response that was sent same - // region, same zone for future testing. - sameRegionSameZoneRequest = interceptedBatchRequestBytes - sameRegionSameZoneResponse = interceptedBatchResponseBytes - } - - expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, - interceptedBatchRequestBytes, interceptedBatchResponseBytes) - afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + afterMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { t.Error(err) } - require.Equal(t, expected, diffMetrics) - }) - t.Run("SameRegionSameZone", func(t *testing.T) { - // Since the region and zone tiers are all configured in this test, we - // expect that the byte count of batch requests sent within the same - // region and same zone should equal to the total byte count of requests - // minus the combined byte count of cross-region and cross-zone requests - // metrics. Similar expectation for batch responses. - metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - if err != nil { - t.Error(err) + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc } - totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] - totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] - crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] - crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] - crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] - crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] - require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) - require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + require.Equal(t, metricsDiff, expectedDiff) }) } } diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index dc7b88927ad7..58bb2efafe1d 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,19 +61,6 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - - // Currently, BatchRequestInterceptor and BatchResponseInterceptor only - // intercepts DistSender.Send() to pass the actual batch request and response - // byte count to the test. However, it can be easily extended to validate - // other properties of batch requests / response if required. - - // BatchRequestInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchRequest properties. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchResponseInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchResponse properties. - BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index e0a323a80816..cf258e7ce05e 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3202,6 +3202,44 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { } } +// updateCrossLocalityMetricsOnSnapshotSent updates cross-locality related store +// metrics when outgoing snapshots are sent to the outgoingSnapshotStream. The +// metrics being updated include 1. cross-region metrics, which monitor +// activities across different regions, and 2. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotSent( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateCrossLocalityMetricsOnSnapshotRcvd updates cross-locality related store +// metrics when receiving SnapshotRequests through streaming and constructing +// incoming snapshots. The metrics being updated include 1. cross-region +// metrics, which monitor activities across different regions, and 2. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index af2b6fef7a73..8ba5d78d19ae 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3146,14 +3146,16 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + comparisonResult := r.store.getLocalityComparison(ctx, req.CoordinatorReplica.NodeID, + req.RecipientReplica.NodeID) + recordBytesSent := func(inc int64) { // Only counts for delegated bytes if we are not self-delegating. if r.NodeID() != req.CoordinatorReplica.NodeID { r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) - r.store.checkAndUpdateCrossLocalitySnapshotMetrics( - ctx, req.CoordinatorReplica, req.RecipientReplica, inc, true /* isSent */) + r.store.metrics.updateCrossLocalityMetricsOnSnapshotSent(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index d2ac642b3418..ee1ea28e198c 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -973,56 +973,24 @@ func (s *Store) checkSnapshotOverlapLocked( return nil } -// getCrossLocalityComparison compares the localities of the two given replicas. -func (s *Store) getCrossLocalityComparison( - ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (s *Store) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { - firstLocality := s.cfg.StorePool.GetNodeLocality(firstReplica.NodeID) - secLocality := s.cfg.StorePool.GetNodeLocality(secReplica.NodeID) + firstLocality := s.cfg.StorePool.GetNodeLocality(fromNodeID) + secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID) comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross zone %v", zoneErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// checkAndUpdateCrossLocalitySnapshotMetrics updates the snapshot metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. -func (s *Store) checkAndUpdateCrossLocalitySnapshotMetrics( - ctx context.Context, - firstReplica roachpb.ReplicaDescriptor, - secReplica roachpb.ReplicaDescriptor, - inc int64, - isSent bool, -) { - comparisonResult := s.getCrossLocalityComparison(ctx, firstReplica, secReplica) - if isSent { - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - s.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - s.metrics.RangeSnapShotCrossZoneSentBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - } else { - // isReceived - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - s.metrics.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - } -} - // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. func (s *Store) receiveSnapshot( ctx context.Context, header *kvserverpb.SnapshotRequest_Header, stream incomingSnapshotStream, @@ -1137,10 +1105,12 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } + comparisonResult := s.getLocalityComparison(ctx, + header.RaftMessageRequest.FromReplica.NodeID, header.RaftMessageRequest.ToReplica.NodeID) + recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) - s.checkAndUpdateCrossLocalitySnapshotMetrics( - ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, inc, false /* isSent */) + s.metrics.updateCrossLocalityMetricsOnSnapshotRcvd(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 02cf4c88d016..fbe8be7c0ae2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -100,16 +100,6 @@ type StoreTestingKnobs struct { // no-op write, and the ForcedError field will be set. TestingPostApplyFilter kvserverbase.ReplicaApplyFilter - // TestingBatchRequestFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchRequestFilter func(*kvpb.BatchRequest) bool - - // TestingBatchResponseFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchResponseFilter func(*kvpb.BatchResponse) bool - // TestingResponseErrorEvent is called when an error is returned applying // a command. TestingResponseErrorEvent func(context.Context, *kvpb.BatchRequest, error) diff --git a/pkg/server/node.go b/pkg/server/node.go index 11fa232a403d..3e6139b4ea4f 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -320,6 +320,44 @@ func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]in return metricCountMap, nil } +// updateCrossLocalityMetricsOnBatchRequest updates nodeMetrics for batch +// requests processed on the node. The metrics being updated include 1. total +// byte count of batch requests processed 2. cross-region metrics, which monitor +// activities across different regions, and 3. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. These metrics may include batches that were +// not successfully sent but were terminated at an early stage. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchRequestsBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: + // No metrics or error reporting. + } +} + +// updateCrossLocalityMetricsOnBatchResponse updates nodeMetrics for batch +// responses that are received back. It updates based on the comparisonResult +// parameter determined during the initial batch requests check. The underlying +// assumption is that the response should match the cross-region or cross-zone +// nature of the requests. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchResponsesBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchResponseBytes.Inc(inc) + } +} + // A Node manages a map of stores (by store ID) for which it serves // traffic. A node is the top-level data structure. There is one node // instance per process. A node accepts incoming RPCs and services @@ -1325,11 +1363,11 @@ func (n *Node) batchInternal( return br, nil } -// getCrossLocalityComparison compares the localities of the gateway node and -// the current node to determine if the given batch request is cross-region and -// cross-zone. -func (n *Node) getCrossLocalityComparison( - ctx context.Context, ba *kvpb.BatchRequest, +// getLocalityComparison takes gatewayNodeID as input and returns the locality +// comparison result between the gateway node and the current node. This result +// indicates whether the two nodes are located in different regions or zones. +func (n *Node) getLocalityComparison( + ctx context.Context, gatewayNodeID roachpb.NodeID, ) roachpb.LocalityComparisonType { gossip := n.storeCfg.Gossip if gossip == nil { @@ -1337,7 +1375,7 @@ func (n *Node) getCrossLocalityComparison( return roachpb.LocalityComparisonType_UNDEFINED } - gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := gossip.GetNodeDescriptor(gatewayNodeID) if err != nil { log.VEventf(ctx, 2, "failed to perform look up for node descriptor %v", err) @@ -1346,63 +1384,15 @@ func (n *Node) getCrossLocalityComparison( comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if batch is cross zone %v", zoneErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } return comparisonResult } -// checkAndUpdateCrossLocalityBatchMetrics updates the batch requests metrics in -// a more meaningful way. Cross-region metrics monitor activities across -// different regions. Cross-zone metrics monitor cross-zone activities within -// the same region or in cases where region tiers are not configured. The -// locality comparison result is returned here to avoid redundant check for -// metrics updates after receiving batch responses. -func (n *Node) checkAndUpdateCrossLocalityBatchMetrics( - ctx context.Context, ba *kvpb.BatchRequest, shouldIncrement bool, -) roachpb.LocalityComparisonType { - if !shouldIncrement { - // shouldIncrement is set to false using testing knob in specific tests to - // filter out metrics changes caused by irrelevant batch requests. - return roachpb.LocalityComparisonType_UNDEFINED - } - n.metrics.BatchRequestsBytes.Inc(int64(ba.Size())) - comparisonResult := n.getCrossLocalityComparison(ctx, ba) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - n.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE: - // No metrics or error reporting. - } - return comparisonResult -} - -// updateCrossLocalityBatchMetrics updates the batch response metrics based on -// the comparisonResult parameter determined during the initial batch requests -// check. The underlying assumption is that the response should match the -// cross-region or cross-zone nature of the request. -func (n *Node) updateCrossLocalityBatchMetrics( - br *kvpb.BatchResponse, comparisonResult roachpb.LocalityComparisonType, shouldIncrement bool, -) { - if !shouldIncrement { - // shouldIncrement is set to false using testing knob in specific tests to - // filter out metrics changes caused by irrelevant batch requests. - return - } - n.metrics.BatchResponsesBytes.Inc(int64(br.Size())) - switch comparisonResult { - case roachpb.LocalityComparisonType_CROSS_REGION: - n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: - n.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } -} - // incrementBatchCounters increments counters to track the batch and composite // request methods. func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { @@ -1417,20 +1407,14 @@ func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { n.incrementBatchCounters(args) - shouldIncrement := true - if fn := n.storeCfg.TestingKnobs.TestingBatchRequestFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(args) - } - comparisonResult := n.checkAndUpdateCrossLocalityBatchMetrics(ctx, args, shouldIncrement) - // NB: Node.Batch is called directly for "local" calls. We don't want to // carry the associated log tags forward as doing so makes adding additional // log tags more expensive and makes local calls differ from remote calls. ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx) + comparisonResult := n.getLocalityComparison(ctx, args.GatewayNodeID) + n.metrics.updateCrossLocalityMetricsOnBatchRequest(comparisonResult, int64(args.Size())) + tenantID, ok := roachpb.ClientTenantFromContext(ctx) if !ok { tenantID = roachpb.SystemTenantID @@ -1475,14 +1459,7 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR br.Error = kvpb.NewError(err) } - shouldIncrement = true - if fn := n.storeCfg.TestingKnobs.TestingBatchResponseFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(br) - } - n.updateCrossLocalityBatchMetrics(br, comparisonResult, shouldIncrement) + n.metrics.updateCrossLocalityMetricsOnBatchResponse(comparisonResult, int64(br.Size())) if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil { n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError)) } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 09436a8c6ef9..0372c5bd910a 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -39,10 +39,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -760,17 +759,6 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr) } -// getNodesMetrics retrieves the count of each node metric specified in -// metricNames associated with the specified serverIdx and returns the result as -// a map, along with any lookup errors. -func getNodeCounterMetrics( - tc serverutils.TestClusterInterface, serverIdx int, metricsName []string, -) (map[string]int64, error) { - ts := tc.Server(serverIdx).(*TestServer) - metrics, err := ts.node.metrics.getNodeCounterMetrics(metricsName) - return metrics, err -} - // getNodesMetricsDiff returns the difference between the values of // corresponding metrics in two maps. // Assumption: beforeMap and afterMap contain the same set of keys. @@ -784,153 +772,76 @@ func getNodesMetricsDiff(beforeMap map[string]int64, afterMap map[string]int64) return diffMap } -// TestNodeBatchMetrics verifies that node.Batch() correctly updates the +// TestNodeCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnBatch{Request|Response} correctly updates // cross-region, cross-zone byte count metrics for batch requests sent and batch // responses received. -func TestNodeBatchMetrics(t *testing.T) { +func TestNodeCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - - // The initial setup ensures the correct configuration for three nodes (with - // different localities), single-range, and three replicas (on different - // nodes). - const numNodes = 3 - zcfg := zonepb.DefaultZoneConfig() - zcfg.NumReplicas = proto.Int32(1) - - type InterceptedInfo struct { - syncutil.Mutex - BatchRequestSize int64 - BatchResponseSize int64 - } - - info := InterceptedInfo{} - requestFn := func(ba *kvpb.BatchRequest) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if ba != nil && ba.Txn != nil { - if baTxnName := ba.Txn.Name; baTxnName == "cross-locality-test" { - info.BatchRequestSize = int64(ba.Size()) - return true + const expectedInc = 10 + + metricsNames := []string{ + "batch_requests.bytes", + "batch_requests.cross_region.bytes", + "batch_requests.cross_zone.bytes", + "batch_responses.bytes", + "batch_responses.cross_region.bytes", + "batch_responses.cross_zone.bytes"} + for _, tc := range []struct { + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool + }{ + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, + } { + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeNodeMetrics(metric.NewRegistry(), 1) + beforeMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) } - } - return false - } - - responseFn := func(br *kvpb.BatchResponse) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if br != nil && br.Txn != nil { - if brTxnName := br.Txn.Name; brTxnName == "cross-locality-test" { - info.BatchResponseSize = int64(br.Size()) - return true + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnBatchRequest(tc.crossLocalityType, expectedInc) + } else { + metrics.updateCrossLocalityMetricsOnBatchResponse(tc.crossLocalityType, expectedInc) } - } - return false - } - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - serverArgs := make(map[int]base.TestServerArgs) - for i := 0; i < numNodes; i++ { - serverArgs[i] = base.TestServerArgs{ - Locality: serverLocality[i], - Knobs: base.TestingKnobs{ - Server: &TestingKnobs{ - DefaultZoneConfigOverride: &zcfg, - }, - Store: &kvserver.StoreTestingKnobs{ - TestingBatchRequestFilter: requestFn, - TestingBatchResponseFilter: responseFn, - }, - }, - } - } - - ctx := context.Background() - var clusterArgs = base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: serverArgs, - } - - tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs) - defer tc.Stopper().Stop(ctx) - - testKey := tc.ScratchRange(t) - desc := tc.LookupRangeOrFatal(t, testKey) - - metrics := []string{"batch_requests.bytes", "batch_responses.bytes", - "batch_requests.cross_region.bytes", "batch_responses.cross_region.bytes", - "batch_requests.cross_zone.bytes", "batch_responses.cross_zone.bytes"} - receiverBefore, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - - // sendBatchToServer is a testing helper that sends a batch request from - // server[serverIndex] to server[0] and returns the number of bytes a batch - // request sent and a batch response received. - sendFromServer := func(serverIndex int) (int64, int64) { - get := &kvpb.GetRequest{ - RequestHeader: kvpb.RequestHeader{Key: testKey}, - } - var ba kvpb.BatchRequest - ba.GatewayNodeID = tc.Server(serverIndex).NodeID() - ba.Add(get) - ba.RangeID = desc.RangeID - ba.Replica.StoreID = tc.Server(0).GetFirstStoreID() - txn := roachpb.MakeTransaction( - "cross-locality-test", testKey, 0, 0, - hlc.Timestamp{WallTime: 1}, 0, 0) - ba.Txn = &txn - _, err := tc.Server(0).(*TestServer).GetNode().Batch(ctx, &ba) - require.NoError(t, err) - info.Lock() - defer info.Unlock() - return info.BatchRequestSize, info.BatchResponseSize + afterMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + metricsDiff := getNodesMetricsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc + } + require.Equal(t, metricsDiff, expectedDiff) + }) } - // The first batch request is sent from server0 to server0, enforcing a - // within-region, within-zone batch request / response. - firstBatchRequest, firstBatchResponse := sendFromServer(0) - // The second batch request is sent from server1 to server0, enforcing a - // cross-region batch request / response. - secBatchRequest, secBatchResponse := sendFromServer(1) - // The third batch request is sent from server2 to server0, enforcing a - // cross-zone, within-region batch request / response. - thirdBatchRequest, thirdBatchResponse := sendFromServer(2) - totalRequest := firstBatchRequest + secBatchRequest + thirdBatchRequest - totalResponse := firstBatchResponse + secBatchResponse + thirdBatchResponse - - t.Run("receiver", func(t *testing.T) { - receiverAfter, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - receiverDelta := getNodesMetricsDiff(receiverBefore, receiverAfter) - receiverExpected := map[string]int64{ - "batch_requests.bytes": firstBatchRequest + secBatchRequest + thirdBatchRequest, - "batch_responses.bytes": firstBatchResponse + secBatchResponse + thirdBatchResponse, - "batch_requests.cross_region.bytes": secBatchRequest, - "batch_responses.cross_region.bytes": secBatchResponse, - "batch_requests.cross_zone.bytes": thirdBatchRequest, - "batch_responses.cross_zone.bytes": thirdBatchResponse, - } - require.Equal(t, receiverExpected, receiverDelta) - require.Equal(t, firstBatchRequest, totalRequest-secBatchRequest-thirdBatchRequest) - require.Equal(t, firstBatchResponse, totalResponse-secBatchResponse-thirdBatchResponse) - }) } func TestGetTenantWeights(t *testing.T) {