diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index f243945bcc04..3a2b23a78fe5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -13,7 +13,6 @@ package kvcoord import ( "context" "fmt" - "reflect" "runtime" "runtime/pprof" "strings" @@ -358,41 +357,42 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } -// getDistSenderCounterMetrics fetches the count of each specified DisSender -// metric from the `metricNames` parameter and returns the result as a map. The -// keys in the map represent the metric metadata names, while the corresponding -// values indicate the count of each metric. If any of the specified metric -// cannot be found or is not a counter, the function will return an error. -// -// Assumption: 1. The metricNames parameter should consist of string literals -// that match the metadata names used for metric counters. 2. Each metric name -// provided in `metricNames` must exist, unique and be a counter type. -func (dm *DistSenderMetrics) getDistSenderCounterMetrics( - metricsName []string, -) (map[string]int64, error) { - metricCountMap := make(map[string]int64) - getFirstDistSenderMetric := func(metricName string) int64 { - metricsStruct := reflect.ValueOf(*dm) - for i := 0; i < metricsStruct.NumField(); i++ { - field := metricsStruct.Field(i) - switch t := field.Interface().(type) { - case *metric.Counter: - if t.Name == metricName { - return t.Count() - } - } - } - return -1 +// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates +// DistSenderMetrics for batch requests that have been divided and are currently +// forwarding to a specific replica for the corresponding range. The metrics +// being updated include 1. total byte count of replica-addressed batch requests +// processed 2. cross-region metrics, which monitor activities across different +// regions, and 3. cross-zone metrics, which monitor activities across different +// zones within the same region or in cases where region tiers are not +// configured. These metrics may include batches that were not successfully sent +// but were terminated at an early stage. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchRequestBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchRequestBytes.Inc(inc) } +} - for _, metricName := range metricsName { - count := getFirstDistSenderMetric(metricName) - if count == -1 { - return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) - } - metricCountMap[metricName] = count +// updateCrossLocalityMetricsOnReplicaAddressedBatchResponse updates +// DistSenderMetrics for batch responses that are received back from transport +// rpc. It updates based on the comparisonResult parameter determined during the +// initial batch requests check. The underlying assumption is that the response +// should match the cross-region or cross-zone nature of the requests. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchResponseBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchResponseBytes.Inc(inc) } - return metricCountMap, nil } // FirstRangeProvider is capable of providing DistSender with the descriptor of @@ -465,14 +465,6 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch request byte count to the test. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch response byte count to the test. - BatchResponseInterceptor func(br *kvpb.BatchResponse) - // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -630,14 +622,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } - if cfg.TestingKnobs.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor - } - - if cfg.TestingKnobs.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor - } - return ds } @@ -2293,15 +2277,11 @@ func (ds *DistSender) sendToReplicas( (desc.Generation == 0 && routing.LeaseSeq() == 0), } - if ds.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor(ba) - } - shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba) + comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + br, err = transport.SendNext(ctx, ba) - if ds.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor(br) - } - ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2557,73 +2537,31 @@ func (ds *DistSender) sendToReplicas( } } -// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given -// batch request is cross-region and cross-zone respectively. -func (ds *DistSender) isCrossRegionCrossZoneBatch( - ctx context.Context, ba *kvpb.BatchRequest, -) (bool, bool) { - gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (ds *DistSender) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) - return false, false + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) - return false, false + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality) + + comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "%v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "%v", zoneErr) - } - return isCrossRegion, isCrossZone -} - -// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a -// more meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor cross-zone activities within the same -// region or in cases where region tiers are not configured. The check result is -// returned here to avoid redundant check for metrics updates after receiving -// batch responses. -func (ds *DistSender) checkAndUpdateBatchRequestMetrics( - ctx context.Context, ba *kvpb.BatchRequest, -) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { - ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) - isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba) - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossRegion = true - } - } else { - if isCrossZone { - ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossZone = true - } - } - return shouldIncCrossRegion, shouldIncCrossZone -} - -// checkAndUpdateBatchResponseMetrics updates the batch response metrics based -// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These -// parameters are determined during the initial check for batch requests. The -// underlying assumption is that if requests were cross-region or cross-zone, -// the response should be as well. -func (ds *DistSender) checkAndUpdateBatchResponseMetrics( - br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, -) { - ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) - if shouldIncCrossRegion { - ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - } - if shouldIncCrossZone { - ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } + return comparisonResult } // getCostControllerConfig returns the config for the tenant cost model. This diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index e8319fabc1a3..47e1eb88dc27 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,199 +5651,75 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } -// getMapsDiff returns the difference between the values of corresponding -// metrics in two maps. Assumption: beforeMap and afterMap contain the same set -// of keys. -func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { - diffMap := make(map[string]int64) - for metricName, beforeValue := range beforeMap { - if v, ok := afterMap[metricName]; ok { - diffMap[metricName] = v - beforeValue - } - } - return diffMap -} - -// TestDistSenderBatchMetrics verifies that the DistSender.Send() -// correctly updates the cross-region, cross-zone byte count metrics. -func TestDistSenderBatchMetrics(t *testing.T) { +// TestDistSenderCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnReplicaAddressedBatch{Request|Response} correctly +// updates cross-region, cross-zone byte count metrics for batch requests sent +// and batch responses received. +func TestDistSenderCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - - // The initial setup ensures the correct setup for three nodes (with different - // localities), single-range, three replicas (on different nodes). - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - rangeDesc := testUserRangeDescriptor3Replicas - replicas := rangeDesc.InternalReplicas - - // The servers localities are configured so that the first batch request sent - // from server0 to server0 is same-region, same-zone. The second batch request - // sent from server0 to server1 is cross-region. The second batch request sent - // from server0 to server2 is cross-zone within the same region. - const numNodes = 3 - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - nodes := make([]roachpb.NodeDescriptor, 3) - for i := 0; i < numNodes; i++ { - nodes[i] = roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), - Address: util.UnresolvedAddr{}, - Locality: serverLocality[i], - } - } - ns := &mockNodeStore{nodes: nodes} - - var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - return ba.CreateReply(), nil - } - interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) - cfg := DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: ns, - RPCContext: rpcContext, - RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(transportFn), - BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { - interceptedBatchRequestBytes = int64(ba.Size()) - }, - BatchResponseInterceptor: func(br *kvpb.BatchResponse) { - interceptedBatchResponseBytes = int64(br.Size()) - }, - }, - Settings: cluster.MakeTestingClusterSettings(), - } + defer log.Scope(t).Close(t) + const expectedInc = 10 - distSender := NewDistSender(cfg) metricsNames := []string{ "distsender.batch_requests.replica_addressed.bytes", - "distsender.batch_responses.replica_addressed.bytes", "distsender.batch_requests.cross_region.bytes", - "distsender.batch_responses.cross_region.bytes", "distsender.batch_requests.cross_zone.bytes", - "distsender.batch_responses.cross_zone.bytes"} - - getExpectedDelta := func( - isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, - ) map[string]int64 { - ternaryOp := func(b bool, num int64) (res int64) { - if b { - res = num - } - return res - } - - expectedDelta := make(map[string]int64) - expectedDelta[metricsNames[0]] = interceptedRequest - expectedDelta[metricsNames[1]] = interceptedResponse - expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) - expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) - expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) - expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) - return expectedDelta + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_responses.cross_zone.bytes", } - - sameRegionSameZoneRequest := int64(0) - sameRegionSameZoneResponse := int64(0) - for _, tc := range []struct { - toReplica int - isCrossRegion bool - isCrossZone bool + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool }{ - // First test sets replica[0] as leaseholder, enforcing a within-region, - // within-zone batch request / response. - {toReplica: 0, isCrossRegion: false, isCrossZone: false}, - // Second test sets replica[1] as leaseholder, enforcing a cross-region, - // batch request / response. Note that although the request is cross-zone, - // the cross-zone metrics is not expected to increment. - {toReplica: 1, isCrossRegion: true, isCrossZone: false}, - // Third test sets replica[2] as leaseholder, enforcing a within-region, - // cross-zone batch request / response. Cross-zone metrics is only expected - // to increment when it is cross-zone, same-region activities. - {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, } { - t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { - beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeDistSenderMetrics() + beforeMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { - t.Fatal(err) + t.Error(err) } - - ba := &kvpb.BatchRequest{} - if tc.toReplica == 0 { - // Send a different request type for the first request to avoid having - // the same byte count for three requests and coincidental correct - // results. - get := &kvpb.GetRequest{} - get.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(get) + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(tc.crossLocalityType, expectedInc) } else { - put := &kvpb.PutRequest{} - put.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(put) - } - - ba.Header = kvpb.Header{ - // DistSender is set to be at the server0. - GatewayNodeID: 1, - } - distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: rangeDesc, - Lease: roachpb.Lease{ - Replica: replicas[tc.toReplica], - }, - }) - - if _, err := distSender.Send(ctx, ba); err != nil { - t.Fatal(err) - } - - require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), - "expected bytes not set correctly") - require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), - "expected bytes not set correctly") - if tc.toReplica == 0 { - // Record the first batch request and response that was sent same - // region, same zone for future testing. - sameRegionSameZoneRequest = interceptedBatchRequestBytes - sameRegionSameZoneResponse = interceptedBatchResponseBytes + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(tc.crossLocalityType, expectedInc) } - expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, - interceptedBatchRequestBytes, interceptedBatchResponseBytes) - afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + afterMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { t.Error(err) } - require.Equal(t, expected, diffMetrics) - }) - t.Run("SameRegionSameZone", func(t *testing.T) { - // Since the region and zone tiers are all configured in this test, we - // expect that the byte count of batch requests sent within the same - // region and same zone should equal to the total byte count of requests - // minus the combined byte count of cross-region and cross-zone requests - // metrics. Similar expectation for batch responses. - metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - if err != nil { - t.Error(err) + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc } - totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] - totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] - crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] - crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] - crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] - crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] - require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) - require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + require.Equal(t, metricsDiff, expectedDiff) }) } } diff --git a/pkg/kv/kvclient/kvcoord/helpers_test.go b/pkg/kv/kvclient/kvcoord/helpers_test.go index 5df39946255a..65de340820e3 100644 --- a/pkg/kv/kvclient/kvcoord/helpers_test.go +++ b/pkg/kv/kvclient/kvcoord/helpers_test.go @@ -11,9 +11,12 @@ package kvcoord import ( + "reflect" "sort" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/errors" ) // asSortedSlice returns the set data in sorted order. @@ -61,3 +64,53 @@ func (tcf *TxnCoordSenderFactory) TestingSetMetrics(metrics TxnMetrics) { func (tcf *TxnCoordSenderFactory) TestingSetCommitWaitFilter(filter func()) { tcf.testingKnobs.CommitWaitFilter = filter } + +// getDistSenderCounterMetrics fetches the count of each specified DisSender +// metric from the `metricNames` parameter and returns the result as a map. The +// keys in the map represent the metric metadata names, while the corresponding +// values indicate the count of each metric. If any of the specified metric +// cannot be found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (dm *DistSenderMetrics) getDistSenderCounterMetrics( + metricsName []string, +) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstDistSenderMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(*dm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstDistSenderMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index dc7b88927ad7..58bb2efafe1d 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,19 +61,6 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - - // Currently, BatchRequestInterceptor and BatchResponseInterceptor only - // intercepts DistSender.Send() to pass the actual batch request and response - // byte count to the test. However, it can be easily extended to validate - // other properties of batch requests / response if required. - - // BatchRequestInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchRequest properties. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchResponseInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchResponse properties. - BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 6ab97df90bad..dcc62c68b103 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -1277,22 +1277,11 @@ func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { return sp.getNodeLocalityWithString(nodeID).str } -// getNodeLocality returns the locality information for the given node. -func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { +// GetNodeLocality returns the locality information for the given node. +func (sp *StorePool) GetNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { return sp.getNodeLocalityWithString(nodeID).locality } -// IsCrossRegionCrossZone takes in two replicas and compares the locality of -// them based on their replica node IDs. It returns (bool, error, bool, error) -// where the boolean values indicate whether the two replicas' nodes are in -// different regions, different zones, along with any lookup errors. -func (sp *StorePool) IsCrossRegionCrossZone( - firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, -) (bool, error, bool, error) { - return sp.getNodeLocality(firstReplica.NodeID).IsCrossRegionCrossZone( - sp.getNodeLocality(secReplica.NodeID)) -} - // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate // to receive a replica. diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 8be761c78bb9..a82dec31be5f 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3238,6 +3238,40 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { } } +// updateCrossLocalityMetricsOnSnapshotSent updates cross-locality related store +// metrics when outgoing snapshots are sent to the outgoingSnapshotStream. The +// metrics being updated include 1. cross-region metrics, which monitor +// activities across different regions, and 2. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotSent( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneSentBytes.Inc(inc) + } +} + +// updateCrossLocalityMetricsOnSnapshotRcvd updates cross-locality related store +// metrics when receiving SnapshotRequests through streaming and constructing +// incoming snapshots. The metrics being updated include 1. cross-region +// metrics, which monitor activities across different regions, and 2. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3b8921f6f7f7..06b90bdcf7c3 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3151,6 +3151,8 @@ func (r *Replica) followerSendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + comparisonResult := r.store.getLocalityComparison(ctx, req.CoordinatorReplica.NodeID, + req.RecipientReplica.NodeID) recordBytesSent := func(inc int64) { // Only counts for delegated bytes if we are not self-delegating. @@ -3158,8 +3160,7 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) - r.store.updateCrossLocalitySnapshotMetrics( - ctx, req.CoordinatorReplica, req.RecipientReplica, inc, true /* isSent */) + r.store.metrics.updateCrossLocalityMetricsOnSnapshotSent(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index f8bce0e375e6..ee1ea28e198c 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -973,60 +973,22 @@ func (s *Store) checkSnapshotOverlapLocked( return nil } -// shouldIncrementCrossLocalitySnapshotMetrics returns (bool, bool) - indicating -// if the two given replicas are cross-region and cross-zone respectively. -func (s *Store) shouldIncrementCrossLocalitySnapshotMetrics( - ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, -) (bool, bool) { - isCrossRegion, regionErr, isCrossZone, zoneErr := s.cfg.StorePool.IsCrossRegionCrossZone( - firstReplica, secReplica) +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (s *Store) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { + firstLocality := s.cfg.StorePool.GetNodeLocality(fromNodeID) + secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID) + comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross zone %v", zoneErr) - } - return isCrossRegion, isCrossZone -} - -// updateCrossLocalitySnapshotMetrics updates the snapshot metrics in a more -// meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor any cross-zone activities within the same -// region or if the region tiers are not configured. -func (s *Store) updateCrossLocalitySnapshotMetrics( - ctx context.Context, - firstReplica roachpb.ReplicaDescriptor, - secReplica roachpb.ReplicaDescriptor, - inc int64, - isSent bool, -) { - isCrossRegion, isCrossZone := s.shouldIncrementCrossLocalitySnapshotMetrics(ctx, firstReplica, secReplica) - if isSent { - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - s.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) - } - } else { - if isCrossZone { - s.metrics.RangeSnapShotCrossZoneSentBytes.Inc(inc) - } - } - } else { - // isReceived - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) - } - } else { - if isCrossZone { - s.metrics.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) - } - } + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } + return comparisonResult } // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. @@ -1143,10 +1105,12 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } + comparisonResult := s.getLocalityComparison(ctx, + header.RaftMessageRequest.FromReplica.NodeID, header.RaftMessageRequest.ToReplica.NodeID) + recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) - s.updateCrossLocalitySnapshotMetrics( - ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, inc, false /* isSent */) + s.metrics.updateCrossLocalityMetricsOnSnapshotRcvd(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 02cf4c88d016..fbe8be7c0ae2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -100,16 +100,6 @@ type StoreTestingKnobs struct { // no-op write, and the ForcedError field will be set. TestingPostApplyFilter kvserverbase.ReplicaApplyFilter - // TestingBatchRequestFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchRequestFilter func(*kvpb.BatchRequest) bool - - // TestingBatchResponseFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchResponseFilter func(*kvpb.BatchResponse) bool - // TestingResponseErrorEvent is called when an error is returned applying // a command. TestingResponseErrorEvent func(context.Context, *kvpb.BatchRequest, error) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index ee56469552e1..91d2b1427c19 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -675,14 +675,11 @@ func (l Locality) getFirstRegionFirstZone() ( return firstRegionValue, hasRegion, firstZoneKey, firstZoneValue, hasZone } -// IsCrossRegionCrossZone returns multiple values containing: -// 1. A boolean value indicating if this and the provided locality are -// cross-region. -// 2. Error indicating if either locality does not have a "region" tier key. -// 3. A boolean value indicating if this and the provided locality are -// cross-zone. -// 4. Error indicating if either locality does not have a "zone" tier key or if -// the first "zone" tier keys used by two localities are different. +// CompareWithLocality returns the comparison result between this and the +// provided other locality along with any lookup errors. Possible errors include +// 1. if either locality does not have a "region" tier key. 2. if either +// locality does not have a "zone" tier key or if the first "zone" tier keys +// used by two localities are different. // // Limitation: // - It is unfortunate that the tier key is hardcoded here. Ideally, we would @@ -699,14 +696,14 @@ func (l Locality) getFirstRegionFirstZone() ( // a single function to avoid overhead. If you are adding additional locality // tiers comparisons, it is recommended to handle them within one tier list // iteration. -func (l Locality) IsCrossRegionCrossZone( +func (l Locality) CompareWithLocality( other Locality, -) (isCrossRegion bool, regionErr error, isCrossZone bool, zoneErr error) { +) (_ LocalityComparisonType, regionErr error, zoneErr error) { firstRegionValue, hasRegion, firstZoneKey, firstZone, hasZone := l.getFirstRegionFirstZone() firstRegionValueOther, hasRegionOther, firstZoneKeyOther, firstZoneOther, hasZoneOther := other.getFirstRegionFirstZone() - isCrossRegion = firstRegionValue != firstRegionValueOther - isCrossZone = firstZone != firstZoneOther + isCrossRegion := firstRegionValue != firstRegionValueOther + isCrossZone := firstZone != firstZoneOther if !hasRegion || !hasRegionOther { isCrossRegion = false @@ -718,7 +715,15 @@ func (l Locality) IsCrossRegionCrossZone( zoneErr = errors.Errorf("localities must have a valid zone tier key for cross-zone comparison") } - return isCrossRegion, regionErr, isCrossZone, zoneErr + if isCrossRegion { + return LocalityComparisonType_CROSS_REGION, regionErr, zoneErr + } else { + if isCrossZone { + return LocalityComparisonType_SAME_REGION_CROSS_ZONE, regionErr, zoneErr + } else { + return LocalityComparisonType_SAME_REGION_SAME_ZONE, regionErr, zoneErr + } + } } // SharedPrefix returns the number of this locality's tiers which match those of diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index 9d363885977b..6bf392f31e8e 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -425,6 +425,22 @@ message Tier { optional string value = 2 [(gogoproto.nullable) = false]; } +// LocalityComparisonType represents different types of comparison results that +// indicate the relationship between two localities. +enum LocalityComparisonType { + // UNDEFINED represents an undefined comparison result, indicating error + // behavior. + UNDEFINED = 0; + // CROSS_REGION indicates that the two localities have different region tiers. + CROSS_REGION = 1; + // SAME_REGION_CROSS_ZONE indicates that the two localities have the same + // region tiers but different zone tiers. + SAME_REGION_CROSS_ZONE = 2; + // SAME_REGION_SAME_ZONE indicates that the two localities have same region + // and same zone tiers. + SAME_REGION_SAME_ZONE = 3; +} + message Version { option (gogoproto.equal) = true; option (gogoproto.goproto_stringer) = false; diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index de8940fbfaed..cc95cff2c4fa 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -221,7 +221,7 @@ func TestLocalityMatches(t *testing.T) { } } -func TestLocalityIsCrossRegionCrossZone(t *testing.T) { +func TestLocalityCompareWithLocality(t *testing.T) { regionErrStr := "localities must have a valid region tier key for cross-region comparison" zoneErrStr := "localities must have a valid zone tier key for cross-zone comparison" @@ -251,82 +251,81 @@ func TestLocalityIsCrossRegionCrossZone(t *testing.T) { for _, tc := range []struct { l string other string - isCrossRegion bool - isCrossZone bool + localityType LocalityComparisonType crossRegionErr string crossZoneErr string }{ // -------- Part 1: check for different zone tier alternatives -------- // Valid tier keys, same regions and same zones. {l: "region=us-west,zone=us-west-1", other: "region=us-west,zone=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, different regions and different zones. {l: "region=us-west,zone=us-west-1", other: "region=us-east,zone=us-west-2", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, different regions and different zones. {l: "region=us-west,availability-zone=us-west-1", other: "region=us-east,availability-zone=us-east-1", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, same regions and different zones. {l: "region=us-west,az=us-west-1", other: "region=us-west,other-keys=us,az=us-east-1", - isCrossRegion: false, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Invalid zone tier key and different regions. {l: "region=us-west,availability-zone=us-west-1", other: "region=us-east,zone=us-east-1", - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Valid zone tier key (edge case), different zones and regions. {l: "region=us-west,zone=us-west-1", other: "region=us-east,zone=us-west-2,az=us-west-1", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Missing zone tier key and different regions. {l: "region=us-west,zone=us-west-1", other: "region=us-east", - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Different region and different zones with non-unique & invalid zone tier key. {l: "region=us-west,zone=us-west-1,az=us-west-2", other: "az=us-west-1,region=us-west,zone=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Different regions and different zones with non-unique & valid zone tier key. {l: "region=us-west,az=us-west-2,zone=us-west-1", other: "region=us-west,az=us-west-1", - isCrossRegion: false, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Invalid region tier key and different zones. {l: "country=us,zone=us-west-1", other: "country=us,zone=us-west-2", - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // Missing region tier key and different zones. {l: "az=us-west-1", other: "region=us-east,az=us-west-2", - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // Invalid region and zone tier key. {l: "invalid-key=us-west,zone=us-west-1", other: "region=us-east,invalid-key=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // Invalid region and zone tier key. {l: "country=us,dc=us-west-2", other: "country=us,dc=us-west-2", - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // -------- Part 2: single region, single zone -------- // One: (both) Two: (region) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr(secRegionStr, ""), - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // One: (both) Two: (zone) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // One: (region) Two: (region) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr(secRegionStr, ""), - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // One: (zone) Two: (zone) {l: makeLocalityStr("", firstZoneStr), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // One: (region) Two: (zone) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (both) Two: (both) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr(secRegionStr, secZoneStr), - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // One: (none) Two: (none) {l: makeLocalityStr("", ""), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (region) Two: (none) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (zone) Two: (none) {l: makeLocalityStr("", firstZoneStr), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (both) Two: (none) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, } { t.Run(fmt.Sprintf("%s-crosslocality-%s", tc.l, tc.other), func(t *testing.T) { var l Locality @@ -334,24 +333,21 @@ func TestLocalityIsCrossRegionCrossZone(t *testing.T) { require.NoError(t, l.Set(tc.l)) require.NoError(t, other.Set(tc.other)) type localities struct { - isCrossRegion bool - isCrossZone bool + localityType LocalityComparisonType crossRegionErr string crossZoneErr string } - isCrossRegion, crossRegionErr, isCrossZone, crossZoneErr := l.IsCrossRegionCrossZone(other) - crossRegionErrStr := "" - if crossRegionErr != nil { - crossRegionErrStr = crossRegionErr.Error() + localityType, regionErr, zoneErr := l.CompareWithLocality(other) + regionErrStr := "" + if regionErr != nil { + regionErrStr = regionErr.Error() } - crossZoneErrStr := "" - if crossZoneErr != nil { - crossZoneErrStr = crossZoneErr.Error() + zoneErrStr := "" + if zoneErr != nil { + zoneErrStr = zoneErr.Error() } - actual := localities{isCrossRegion, isCrossZone, - crossRegionErrStr, crossZoneErrStr} - expected := localities{tc.isCrossRegion, tc.isCrossZone, - tc.crossRegionErr, tc.crossZoneErr} + actual := localities{localityType, regionErrStr, zoneErrStr} + expected := localities{tc.localityType, tc.crossRegionErr, tc.crossZoneErr} require.Equal(t, expected, actual) }) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d44fae2ab7d8..3083a532039d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "decommission_test.go", "drain_test.go", "graphite_test.go", + "helpers_test.go", "index_usage_stats_test.go", "init_handshake_test.go", "intent_test.go", diff --git a/pkg/server/helpers_test.go b/pkg/server/helpers_test.go new file mode 100644 index 000000000000..f5613344b0ad --- /dev/null +++ b/pkg/server/helpers_test.go @@ -0,0 +1,66 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "reflect" + + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/errors" +) + +// getNodeCounterMetrics fetches the count of each specified node metric from +// the `metricNames` parameter and returns the result as a map. The keys in the +// map represent the metric metadata names, while the corresponding values +// indicate the count of each metric. If any of the specified metric cannot be +// found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstNodeMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(nm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstNodeMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. +// Assumption: beforeMap and afterMap contain the same set of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} diff --git a/pkg/server/node.go b/pkg/server/node.go index ad35e68ad0b5..5c2f822164c7 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "net" - "reflect" "sort" "strings" "sync" @@ -285,39 +284,40 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *kvpb.Error) { nm.Latency.RecordValue(d.Nanoseconds()) } -// getNodeCounterMetrics fetches the count of each specified node metric from -// the `metricNames` parameter and returns the result as a map. The keys in the -// map represent the metric metadata names, while the corresponding values -// indicate the count of each metric. If any of the specified metric cannot be -// found or is not a counter, the function will return an error. -// -// Assumption: 1. The metricNames parameter should consist of string literals -// that match the metadata names used for metric counters. 2. Each metric name -// provided in `metricNames` must exist, unique and be a counter type. -func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]int64, error) { - metricCountMap := make(map[string]int64) - getFirstNodeMetric := func(metricName string) int64 { - metricsStruct := reflect.ValueOf(nm) - for i := 0; i < metricsStruct.NumField(); i++ { - field := metricsStruct.Field(i) - switch t := field.Interface().(type) { - case *metric.Counter: - if t.Name == metricName { - return t.Count() - } - } - } - return -1 - } - - for _, metricName := range metricsName { - count := getFirstNodeMetric(metricName) - if count == -1 { - return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) - } - metricCountMap[metricName] = count +// updateCrossLocalityMetricsOnBatchRequest updates nodeMetrics for batch +// requests processed on the node. The metrics being updated include 1. total +// byte count of batch requests processed 2. cross-region metrics, which monitor +// activities across different regions, and 3. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. These metrics may include batches that were +// not successfully sent but were terminated at an early stage. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchRequestsBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchRequestBytes.Inc(inc) + } +} + +// updateCrossLocalityMetricsOnBatchResponse updates nodeMetrics for batch +// responses that are received back. It updates based on the comparisonResult +// parameter determined during the initial batch requests check. The underlying +// assumption is that the response should match the cross-region or cross-zone +// nature of the requests. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchResponsesBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchResponseBytes.Inc(inc) } - return metricCountMap, nil } // A Node manages a map of stores (by store ID) for which it serves @@ -1325,84 +1325,34 @@ func (n *Node) batchInternal( return br, nil } -// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given -// batch request is cross-region and cross-zone respectively. -func (n *Node) isCrossRegionCrossZoneBatch( - ctx context.Context, ba *kvpb.BatchRequest, -) (bool, bool) { +// getLocalityComparison takes gatewayNodeID as input and returns the locality +// comparison result between the gateway node and the current node. This result +// indicates whether the two nodes are located in different regions or zones. +func (n *Node) getLocalityComparison( + ctx context.Context, gatewayNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { gossip := n.storeCfg.Gossip if gossip == nil { log.VEventf(ctx, 2, "gossip is not configured") - return false, false + return roachpb.LocalityComparisonType_UNDEFINED } - gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := gossip.GetNodeDescriptor(gatewayNodeID) if err != nil { log.VEventf(ctx, 2, - "failed to perform look up for node descriptor %+v", err) - return false, false + "failed to perform look up for node descriptor %v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - isCrossRegion, regionErr, isCrossZone, zoneErr := n.Descriptor.Locality. - IsCrossRegionCrossZone(gatewayNodeDesc.Locality) + comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "%v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "%v", zoneErr) - } - - return isCrossRegion, isCrossZone -} - -// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a -// more meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor cross-zone activities within the same -// region or in cases where region tiers are not configured. The check result is -// returned here to avoid redundant check for metrics updates after receiving -// batch responses. -func (n *Node) checkAndUpdateBatchRequestMetrics( - ctx context.Context, ba *kvpb.BatchRequest, shouldIncrement bool, -) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { - if !shouldIncrement { - return false, false - } - n.metrics.BatchRequestsBytes.Inc(int64(ba.Size())) - isCrossRegion, isCrossZone := n.isCrossRegionCrossZoneBatch(ctx, ba) - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossRegion = true - } - } else { - if isCrossZone { - n.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossZone = true - } + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } - return shouldIncCrossRegion, shouldIncCrossZone -} -// checkAndUpdateBatchResponseMetrics updates the batch response metrics based -// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These -// parameters are determined during the initial check for batch requests. The -// underlying assumption is that if requests were cross-region or cross-zone, -// the response should be as well. -func (n *Node) checkAndUpdateBatchResponseMetrics( - br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, shouldIncrement bool, -) { - if !shouldIncrement { - return - } - n.metrics.BatchResponsesBytes.Inc(int64(br.Size())) - if shouldIncCrossRegion { - n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - } - if shouldIncCrossZone { - n.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } + return comparisonResult } // incrementBatchCounters increments counters to track the batch and composite @@ -1419,20 +1369,14 @@ func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { n.incrementBatchCounters(args) - shouldIncrement := true - if fn := n.storeCfg.TestingKnobs.TestingBatchRequestFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(args) - } - shouldIncCrossRegion, shouldIncCrossZone := n.checkAndUpdateBatchRequestMetrics(ctx, args, shouldIncrement) - // NB: Node.Batch is called directly for "local" calls. We don't want to // carry the associated log tags forward as doing so makes adding additional // log tags more expensive and makes local calls differ from remote calls. ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx) + comparisonResult := n.getLocalityComparison(ctx, args.GatewayNodeID) + n.metrics.updateCrossLocalityMetricsOnBatchRequest(comparisonResult, int64(args.Size())) + tenantID, ok := roachpb.ClientTenantFromContext(ctx) if !ok { tenantID = roachpb.SystemTenantID @@ -1477,14 +1421,7 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR br.Error = kvpb.NewError(err) } - shouldIncrement = true - if fn := n.storeCfg.TestingKnobs.TestingBatchResponseFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(br) - } - n.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone, shouldIncrement) + n.metrics.updateCrossLocalityMetricsOnBatchResponse(comparisonResult, int64(br.Size())) if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil { n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError)) } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 09436a8c6ef9..4ea8481bb159 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -39,10 +39,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -760,177 +759,76 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr) } -// getNodesMetrics retrieves the count of each node metric specified in -// metricNames associated with the specified serverIdx and returns the result as -// a map, along with any lookup errors. -func getNodeCounterMetrics( - tc serverutils.TestClusterInterface, serverIdx int, metricsName []string, -) (map[string]int64, error) { - ts := tc.Server(serverIdx).(*TestServer) - metrics, err := ts.node.metrics.getNodeCounterMetrics(metricsName) - return metrics, err -} - -// getNodesMetricsDiff returns the difference between the values of -// corresponding metrics in two maps. -// Assumption: beforeMap and afterMap contain the same set of keys. -func getNodesMetricsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { - diffMap := make(map[string]int64) - for metricName, beforeValue := range beforeMap { - if v, ok := afterMap[metricName]; ok { - diffMap[metricName] = v - beforeValue - } - } - return diffMap -} - -// TestNodeBatchMetrics verifies that node.Batch() correctly updates the +// TestNodeCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnBatch{Request|Response} correctly updates // cross-region, cross-zone byte count metrics for batch requests sent and batch // responses received. -func TestNodeBatchMetrics(t *testing.T) { +func TestNodeCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - - // The initial setup ensures the correct configuration for three nodes (with - // different localities), single-range, and three replicas (on different - // nodes). - const numNodes = 3 - zcfg := zonepb.DefaultZoneConfig() - zcfg.NumReplicas = proto.Int32(1) - - type InterceptedInfo struct { - syncutil.Mutex - BatchRequestSize int64 - BatchResponseSize int64 - } - - info := InterceptedInfo{} - requestFn := func(ba *kvpb.BatchRequest) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if ba != nil && ba.Txn != nil { - if baTxnName := ba.Txn.Name; baTxnName == "cross-locality-test" { - info.BatchRequestSize = int64(ba.Size()) - return true + const expectedInc = 10 + + metricsNames := []string{ + "batch_requests.bytes", + "batch_requests.cross_region.bytes", + "batch_requests.cross_zone.bytes", + "batch_responses.bytes", + "batch_responses.cross_region.bytes", + "batch_responses.cross_zone.bytes"} + for _, tc := range []struct { + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool + }{ + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, + } { + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeNodeMetrics(metric.NewRegistry(), 1) + beforeMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) } - } - return false - } - - responseFn := func(br *kvpb.BatchResponse) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if br != nil && br.Txn != nil { - if brTxnName := br.Txn.Name; brTxnName == "cross-locality-test" { - info.BatchResponseSize = int64(br.Size()) - return true + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnBatchRequest(tc.crossLocalityType, expectedInc) + } else { + metrics.updateCrossLocalityMetricsOnBatchResponse(tc.crossLocalityType, expectedInc) } - } - return false - } - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - serverArgs := make(map[int]base.TestServerArgs) - for i := 0; i < numNodes; i++ { - serverArgs[i] = base.TestServerArgs{ - Locality: serverLocality[i], - Knobs: base.TestingKnobs{ - Server: &TestingKnobs{ - DefaultZoneConfigOverride: &zcfg, - }, - Store: &kvserver.StoreTestingKnobs{ - TestingBatchRequestFilter: requestFn, - TestingBatchResponseFilter: responseFn, - }, - }, - } - } - - ctx := context.Background() - var clusterArgs = base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: serverArgs, - } - - tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs) - defer tc.Stopper().Stop(ctx) - - testKey := tc.ScratchRange(t) - desc := tc.LookupRangeOrFatal(t, testKey) - - metrics := []string{"batch_requests.bytes", "batch_responses.bytes", - "batch_requests.cross_region.bytes", "batch_responses.cross_region.bytes", - "batch_requests.cross_zone.bytes", "batch_responses.cross_zone.bytes"} - receiverBefore, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - - // sendBatchToServer is a testing helper that sends a batch request from - // server[serverIndex] to server[0] and returns the number of bytes a batch - // request sent and a batch response received. - sendFromServer := func(serverIndex int) (int64, int64) { - get := &kvpb.GetRequest{ - RequestHeader: kvpb.RequestHeader{Key: testKey}, - } - var ba kvpb.BatchRequest - ba.GatewayNodeID = tc.Server(serverIndex).NodeID() - ba.Add(get) - ba.RangeID = desc.RangeID - ba.Replica.StoreID = tc.Server(0).GetFirstStoreID() - txn := roachpb.MakeTransaction( - "cross-locality-test", testKey, 0, 0, - hlc.Timestamp{WallTime: 1}, 0, 0) - ba.Txn = &txn - _, err := tc.Server(0).(*TestServer).GetNode().Batch(ctx, &ba) - require.NoError(t, err) - info.Lock() - defer info.Unlock() - return info.BatchRequestSize, info.BatchResponseSize + afterMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc + } + require.Equal(t, metricsDiff, expectedDiff) + }) } - // The first batch request is sent from server0 to server0, enforcing a - // within-region, within-zone batch request / response. - firstBatchRequest, firstBatchResponse := sendFromServer(0) - // The second batch request is sent from server1 to server0, enforcing a - // cross-region batch request / response. - secBatchRequest, secBatchResponse := sendFromServer(1) - // The third batch request is sent from server2 to server0, enforcing a - // cross-zone, within-region batch request / response. - thirdBatchRequest, thirdBatchResponse := sendFromServer(2) - totalRequest := firstBatchRequest + secBatchRequest + thirdBatchRequest - totalResponse := firstBatchResponse + secBatchResponse + thirdBatchResponse - - t.Run("receiver", func(t *testing.T) { - receiverAfter, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - receiverDelta := getNodesMetricsDiff(receiverBefore, receiverAfter) - receiverExpected := map[string]int64{ - "batch_requests.bytes": firstBatchRequest + secBatchRequest + thirdBatchRequest, - "batch_responses.bytes": firstBatchResponse + secBatchResponse + thirdBatchResponse, - "batch_requests.cross_region.bytes": secBatchRequest, - "batch_responses.cross_region.bytes": secBatchResponse, - "batch_requests.cross_zone.bytes": thirdBatchRequest, - "batch_responses.cross_zone.bytes": thirdBatchResponse, - } - require.Equal(t, receiverExpected, receiverDelta) - require.Equal(t, firstBatchRequest, totalRequest-secBatchRequest-thirdBatchRequest) - require.Equal(t, firstBatchResponse, totalResponse-secBatchResponse-thirdBatchResponse) - }) } func TestGetTenantWeights(t *testing.T) {