From af9cbad86f3a76a29ed0bf6297366ca3a23a9328 Mon Sep 17 00:00:00 2001 From: Wenyi Date: Thu, 15 Jun 2023 04:24:45 -0400 Subject: [PATCH] kv: refactor CompareWithLocality to use enum MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Prior to this commit, `CompareWithLocality` returned two boolean values indicating whether two localities were cross-region and cross-zone. However, this required callers to perform additional cross-comparison of these boolean values to make meaningful metrics updates. To simplify this, this commit introduces a new enum type `LocalityComparisonType`. It provides four locality comparison results: cross region, same region cross zone, same region same zone, and undefined (indicating error behavior). This refactoring allow the caller to directly use the comparison result without additional operations. In addition, this commit also updates the logic to classify activities between different regions as cross-regional, regardless of the zone tiers’ configuration. Initially, cross-region but same-zone tiers activities were flagged as misconfiguration. After some discussion, we have decided that regions should be non-overlapping. Hence, same zone tiers from different regions should still be considered as different zones. In addition, this commit also includes some refactoring of function parameters. Note that this commit does not change any existing functionality. Part of: https://github.com/cockroachdb/cockroach/issues/103983 Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender.go | 170 +++++-------- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 224 ++++------------- pkg/kv/kvclient/kvcoord/helpers_test.go | 53 ++++ pkg/kv/kvclient/kvcoord/testing_knobs.go | 13 - .../allocator/storepool/store_pool.go | 15 +- pkg/kv/kvserver/metrics.go | 34 +++ pkg/kv/kvserver/replica_command.go | 5 +- pkg/kv/kvserver/store_snapshot.go | 68 ++---- pkg/kv/kvserver/testing_knobs.go | 10 - pkg/roachpb/metadata.go | 31 ++- pkg/roachpb/metadata.proto | 16 ++ pkg/roachpb/metadata_test.go | 74 +++--- pkg/server/BUILD.bazel | 1 + pkg/server/helpers_test.go | 66 +++++ pkg/server/node.go | 165 ++++--------- pkg/server/node_test.go | 228 +++++------------- 16 files changed, 462 insertions(+), 711 deletions(-) create mode 100644 pkg/server/helpers_test.go diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index f243945bcc04..3a2b23a78fe5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -13,7 +13,6 @@ package kvcoord import ( "context" "fmt" - "reflect" "runtime" "runtime/pprof" "strings" @@ -358,41 +357,42 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } -// getDistSenderCounterMetrics fetches the count of each specified DisSender -// metric from the `metricNames` parameter and returns the result as a map. The -// keys in the map represent the metric metadata names, while the corresponding -// values indicate the count of each metric. If any of the specified metric -// cannot be found or is not a counter, the function will return an error. -// -// Assumption: 1. The metricNames parameter should consist of string literals -// that match the metadata names used for metric counters. 2. Each metric name -// provided in `metricNames` must exist, unique and be a counter type. -func (dm *DistSenderMetrics) getDistSenderCounterMetrics( - metricsName []string, -) (map[string]int64, error) { - metricCountMap := make(map[string]int64) - getFirstDistSenderMetric := func(metricName string) int64 { - metricsStruct := reflect.ValueOf(*dm) - for i := 0; i < metricsStruct.NumField(); i++ { - field := metricsStruct.Field(i) - switch t := field.Interface().(type) { - case *metric.Counter: - if t.Name == metricName { - return t.Count() - } - } - } - return -1 +// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates +// DistSenderMetrics for batch requests that have been divided and are currently +// forwarding to a specific replica for the corresponding range. The metrics +// being updated include 1. total byte count of replica-addressed batch requests +// processed 2. cross-region metrics, which monitor activities across different +// regions, and 3. cross-zone metrics, which monitor activities across different +// zones within the same region or in cases where region tiers are not +// configured. These metrics may include batches that were not successfully sent +// but were terminated at an early stage. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchRequestBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchRequestBytes.Inc(inc) } +} - for _, metricName := range metricsName { - count := getFirstDistSenderMetric(metricName) - if count == -1 { - return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) - } - metricCountMap[metricName] = count +// updateCrossLocalityMetricsOnReplicaAddressedBatchResponse updates +// DistSenderMetrics for batch responses that are received back from transport +// rpc. It updates based on the comparisonResult parameter determined during the +// initial batch requests check. The underlying assumption is that the response +// should match the cross-region or cross-zone nature of the requests. +func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + dm.ReplicaAddressedBatchResponseBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + dm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + dm.CrossZoneBatchResponseBytes.Inc(inc) } - return metricCountMap, nil } // FirstRangeProvider is capable of providing DistSender with the descriptor of @@ -465,14 +465,6 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch request byte count to the test. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual - // batch response byte count to the test. - BatchResponseInterceptor func(br *kvpb.BatchResponse) - // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -630,14 +622,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch } - if cfg.TestingKnobs.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor - } - - if cfg.TestingKnobs.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor - } - return ds } @@ -2293,15 +2277,11 @@ func (ds *DistSender) sendToReplicas( (desc.Generation == 0 && routing.LeaseSeq() == 0), } - if ds.BatchRequestInterceptor != nil { - ds.BatchRequestInterceptor(ba) - } - shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba) + comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + br, err = transport.SendNext(ctx, ba) - if ds.BatchResponseInterceptor != nil { - ds.BatchResponseInterceptor(br) - } - ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone) + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2557,73 +2537,31 @@ func (ds *DistSender) sendToReplicas( } } -// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given -// batch request is cross-region and cross-zone respectively. -func (ds *DistSender) isCrossRegionCrossZoneBatch( - ctx context.Context, ba *kvpb.BatchRequest, -) (bool, bool) { - gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (ds *DistSender) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) - return false, false + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID) if err != nil { - log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) - return false, false + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality) + + comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "%v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "%v", zoneErr) - } - return isCrossRegion, isCrossZone -} - -// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a -// more meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor cross-zone activities within the same -// region or in cases where region tiers are not configured. The check result is -// returned here to avoid redundant check for metrics updates after receiving -// batch responses. -func (ds *DistSender) checkAndUpdateBatchRequestMetrics( - ctx context.Context, ba *kvpb.BatchRequest, -) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { - ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) - isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba) - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossRegion = true - } - } else { - if isCrossZone { - ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossZone = true - } - } - return shouldIncCrossRegion, shouldIncCrossZone -} - -// checkAndUpdateBatchResponseMetrics updates the batch response metrics based -// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These -// parameters are determined during the initial check for batch requests. The -// underlying assumption is that if requests were cross-region or cross-zone, -// the response should be as well. -func (ds *DistSender) checkAndUpdateBatchResponseMetrics( - br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, -) { - ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) - if shouldIncCrossRegion { - ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - } - if shouldIncCrossZone { - ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } + return comparisonResult } // getCostControllerConfig returns the config for the tenant cost model. This diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index e8319fabc1a3..47e1eb88dc27 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,199 +5651,75 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } -// getMapsDiff returns the difference between the values of corresponding -// metrics in two maps. Assumption: beforeMap and afterMap contain the same set -// of keys. -func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { - diffMap := make(map[string]int64) - for metricName, beforeValue := range beforeMap { - if v, ok := afterMap[metricName]; ok { - diffMap[metricName] = v - beforeValue - } - } - return diffMap -} - -// TestDistSenderBatchMetrics verifies that the DistSender.Send() -// correctly updates the cross-region, cross-zone byte count metrics. -func TestDistSenderBatchMetrics(t *testing.T) { +// TestDistSenderCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnReplicaAddressedBatch{Request|Response} correctly +// updates cross-region, cross-zone byte count metrics for batch requests sent +// and batch responses received. +func TestDistSenderCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() - ctx := context.Background() - stopper := stop.NewStopper() - defer stopper.Stop(ctx) - - // The initial setup ensures the correct setup for three nodes (with different - // localities), single-range, three replicas (on different nodes). - clock := hlc.NewClockForTesting(nil) - rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) - rangeDesc := testUserRangeDescriptor3Replicas - replicas := rangeDesc.InternalReplicas - - // The servers localities are configured so that the first batch request sent - // from server0 to server0 is same-region, same-zone. The second batch request - // sent from server0 to server1 is cross-region. The second batch request sent - // from server0 to server2 is cross-zone within the same region. - const numNodes = 3 - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - nodes := make([]roachpb.NodeDescriptor, 3) - for i := 0; i < numNodes; i++ { - nodes[i] = roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), - Address: util.UnresolvedAddr{}, - Locality: serverLocality[i], - } - } - ns := &mockNodeStore{nodes: nodes} - - var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - return ba.CreateReply(), nil - } - interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) - cfg := DistSenderConfig{ - AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), - Clock: clock, - NodeDescs: ns, - RPCContext: rpcContext, - RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), - TestingKnobs: ClientTestingKnobs{ - TransportFactory: adaptSimpleTransport(transportFn), - BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { - interceptedBatchRequestBytes = int64(ba.Size()) - }, - BatchResponseInterceptor: func(br *kvpb.BatchResponse) { - interceptedBatchResponseBytes = int64(br.Size()) - }, - }, - Settings: cluster.MakeTestingClusterSettings(), - } + defer log.Scope(t).Close(t) + const expectedInc = 10 - distSender := NewDistSender(cfg) metricsNames := []string{ "distsender.batch_requests.replica_addressed.bytes", - "distsender.batch_responses.replica_addressed.bytes", "distsender.batch_requests.cross_region.bytes", - "distsender.batch_responses.cross_region.bytes", "distsender.batch_requests.cross_zone.bytes", - "distsender.batch_responses.cross_zone.bytes"} - - getExpectedDelta := func( - isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, - ) map[string]int64 { - ternaryOp := func(b bool, num int64) (res int64) { - if b { - res = num - } - return res - } - - expectedDelta := make(map[string]int64) - expectedDelta[metricsNames[0]] = interceptedRequest - expectedDelta[metricsNames[1]] = interceptedResponse - expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) - expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) - expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) - expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) - return expectedDelta + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_responses.cross_zone.bytes", } - - sameRegionSameZoneRequest := int64(0) - sameRegionSameZoneResponse := int64(0) - for _, tc := range []struct { - toReplica int - isCrossRegion bool - isCrossZone bool + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool }{ - // First test sets replica[0] as leaseholder, enforcing a within-region, - // within-zone batch request / response. - {toReplica: 0, isCrossRegion: false, isCrossZone: false}, - // Second test sets replica[1] as leaseholder, enforcing a cross-region, - // batch request / response. Note that although the request is cross-zone, - // the cross-zone metrics is not expected to increment. - {toReplica: 1, isCrossRegion: true, isCrossZone: false}, - // Third test sets replica[2] as leaseholder, enforcing a within-region, - // cross-zone batch request / response. Cross-zone metrics is only expected - // to increment when it is cross-zone, same-region activities. - {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, } { - t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { - beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeDistSenderMetrics() + beforeMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { - t.Fatal(err) + t.Error(err) } - - ba := &kvpb.BatchRequest{} - if tc.toReplica == 0 { - // Send a different request type for the first request to avoid having - // the same byte count for three requests and coincidental correct - // results. - get := &kvpb.GetRequest{} - get.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(get) + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(tc.crossLocalityType, expectedInc) } else { - put := &kvpb.PutRequest{} - put.Key = rangeDesc.StartKey.AsRawKey() - ba.Add(put) - } - - ba.Header = kvpb.Header{ - // DistSender is set to be at the server0. - GatewayNodeID: 1, - } - distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ - Desc: rangeDesc, - Lease: roachpb.Lease{ - Replica: replicas[tc.toReplica], - }, - }) - - if _, err := distSender.Send(ctx, ba); err != nil { - t.Fatal(err) - } - - require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), - "expected bytes not set correctly") - require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), - "expected bytes not set correctly") - if tc.toReplica == 0 { - // Record the first batch request and response that was sent same - // region, same zone for future testing. - sameRegionSameZoneRequest = interceptedBatchRequestBytes - sameRegionSameZoneResponse = interceptedBatchResponseBytes + metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(tc.crossLocalityType, expectedInc) } - expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, - interceptedBatchRequestBytes, interceptedBatchResponseBytes) - afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + afterMetrics, err := metrics.getDistSenderCounterMetrics(metricsNames) if err != nil { t.Error(err) } - require.Equal(t, expected, diffMetrics) - }) - t.Run("SameRegionSameZone", func(t *testing.T) { - // Since the region and zone tiers are all configured in this test, we - // expect that the byte count of batch requests sent within the same - // region and same zone should equal to the total byte count of requests - // minus the combined byte count of cross-region and cross-zone requests - // metrics. Similar expectation for batch responses. - metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) - if err != nil { - t.Error(err) + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc } - totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] - totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] - crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] - crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] - crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] - crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] - require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) - require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + require.Equal(t, metricsDiff, expectedDiff) }) } } diff --git a/pkg/kv/kvclient/kvcoord/helpers_test.go b/pkg/kv/kvclient/kvcoord/helpers_test.go index 5df39946255a..65de340820e3 100644 --- a/pkg/kv/kvclient/kvcoord/helpers_test.go +++ b/pkg/kv/kvclient/kvcoord/helpers_test.go @@ -11,9 +11,12 @@ package kvcoord import ( + "reflect" "sort" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/errors" ) // asSortedSlice returns the set data in sorted order. @@ -61,3 +64,53 @@ func (tcf *TxnCoordSenderFactory) TestingSetMetrics(metrics TxnMetrics) { func (tcf *TxnCoordSenderFactory) TestingSetCommitWaitFilter(filter func()) { tcf.testingKnobs.CommitWaitFilter = filter } + +// getDistSenderCounterMetrics fetches the count of each specified DisSender +// metric from the `metricNames` parameter and returns the result as a map. The +// keys in the map represent the metric metadata names, while the corresponding +// values indicate the count of each metric. If any of the specified metric +// cannot be found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (dm *DistSenderMetrics) getDistSenderCounterMetrics( + metricsName []string, +) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstDistSenderMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(*dm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstDistSenderMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index dc7b88927ad7..58bb2efafe1d 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,19 +61,6 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error - - // Currently, BatchRequestInterceptor and BatchResponseInterceptor only - // intercepts DistSender.Send() to pass the actual batch request and response - // byte count to the test. However, it can be easily extended to validate - // other properties of batch requests / response if required. - - // BatchRequestInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchRequest properties. - BatchRequestInterceptor func(ba *kvpb.BatchRequest) - - // BatchResponseInterceptor is designed to intercept calls to DistSender - // function calls to validate BatchResponse properties. - BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/kv/kvserver/allocator/storepool/store_pool.go b/pkg/kv/kvserver/allocator/storepool/store_pool.go index 6ab97df90bad..dcc62c68b103 100644 --- a/pkg/kv/kvserver/allocator/storepool/store_pool.go +++ b/pkg/kv/kvserver/allocator/storepool/store_pool.go @@ -1277,22 +1277,11 @@ func (sp *StorePool) GetNodeLocalityString(nodeID roachpb.NodeID) string { return sp.getNodeLocalityWithString(nodeID).str } -// getNodeLocality returns the locality information for the given node. -func (sp *StorePool) getNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { +// GetNodeLocality returns the locality information for the given node. +func (sp *StorePool) GetNodeLocality(nodeID roachpb.NodeID) roachpb.Locality { return sp.getNodeLocalityWithString(nodeID).locality } -// IsCrossRegionCrossZone takes in two replicas and compares the locality of -// them based on their replica node IDs. It returns (bool, error, bool, error) -// where the boolean values indicate whether the two replicas' nodes are in -// different regions, different zones, along with any lookup errors. -func (sp *StorePool) IsCrossRegionCrossZone( - firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, -) (bool, error, bool, error) { - return sp.getNodeLocality(firstReplica.NodeID).IsCrossRegionCrossZone( - sp.getNodeLocality(secReplica.NodeID)) -} - // IsStoreReadyForRoutineReplicaTransfer returns true iff the store's node is // live (as indicated by its `NodeLivenessStatus`) and thus a legal candidate // to receive a replica. diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 8be761c78bb9..a82dec31be5f 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -3238,6 +3238,40 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) { } } +// updateCrossLocalityMetricsOnSnapshotSent updates cross-locality related store +// metrics when outgoing snapshots are sent to the outgoingSnapshotStream. The +// metrics being updated include 1. cross-region metrics, which monitor +// activities across different regions, and 2. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotSent( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionSentBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneSentBytes.Inc(inc) + } +} + +// updateCrossLocalityMetricsOnSnapshotRcvd updates cross-locality related store +// metrics when receiving SnapshotRequests through streaming and constructing +// incoming snapshots. The metrics being updated include 1. cross-region +// metrics, which monitor activities across different regions, and 2. cross-zone +// metrics, which monitor activities across different zones within the same +// region or in cases where region tiers are not configured. +func (sm *StoreMetrics) updateCrossLocalityMetricsOnSnapshotRcvd( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + sm.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + sm.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) + } +} + func (sm *StoreMetrics) updateEnvStats(stats storage.EnvStats) { sm.EncryptionAlgorithm.Update(int64(stats.EncryptionType)) } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 3b8921f6f7f7..06b90bdcf7c3 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -3151,6 +3151,8 @@ func (r *Replica) followerSendSnapshot( sent := func() { r.store.metrics.RangeSnapshotsGenerated.Inc(1) } + comparisonResult := r.store.getLocalityComparison(ctx, req.CoordinatorReplica.NodeID, + req.RecipientReplica.NodeID) recordBytesSent := func(inc int64) { // Only counts for delegated bytes if we are not self-delegating. @@ -3158,8 +3160,7 @@ func (r *Replica) followerSendSnapshot( r.store.metrics.DelegateSnapshotSendBytes.Inc(inc) } r.store.metrics.RangeSnapshotSentBytes.Inc(inc) - r.store.updateCrossLocalitySnapshotMetrics( - ctx, req.CoordinatorReplica, req.RecipientReplica, inc, true /* isSent */) + r.store.metrics.updateCrossLocalityMetricsOnSnapshotSent(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index f8bce0e375e6..ee1ea28e198c 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -973,60 +973,22 @@ func (s *Store) checkSnapshotOverlapLocked( return nil } -// shouldIncrementCrossLocalitySnapshotMetrics returns (bool, bool) - indicating -// if the two given replicas are cross-region and cross-zone respectively. -func (s *Store) shouldIncrementCrossLocalitySnapshotMetrics( - ctx context.Context, firstReplica roachpb.ReplicaDescriptor, secReplica roachpb.ReplicaDescriptor, -) (bool, bool) { - isCrossRegion, regionErr, isCrossZone, zoneErr := s.cfg.StorePool.IsCrossRegionCrossZone( - firstReplica, secReplica) +// getLocalityComparison takes two nodeIDs as input and returns the locality +// comparison result between their corresponding nodes. This result indicates +// whether the two nodes are located in different regions or zones. +func (s *Store) getLocalityComparison( + ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { + firstLocality := s.cfg.StorePool.GetNodeLocality(fromNodeID) + secLocality := s.cfg.StorePool.GetNodeLocality(toNodeID) + comparisonResult, regionErr, zoneErr := firstLocality.CompareWithLocality(secLocality) if regionErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross region %v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "unable to determine if snapshot is cross zone %v", zoneErr) - } - return isCrossRegion, isCrossZone -} - -// updateCrossLocalitySnapshotMetrics updates the snapshot metrics in a more -// meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor any cross-zone activities within the same -// region or if the region tiers are not configured. -func (s *Store) updateCrossLocalitySnapshotMetrics( - ctx context.Context, - firstReplica roachpb.ReplicaDescriptor, - secReplica roachpb.ReplicaDescriptor, - inc int64, - isSent bool, -) { - isCrossRegion, isCrossZone := s.shouldIncrementCrossLocalitySnapshotMetrics(ctx, firstReplica, secReplica) - if isSent { - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - s.metrics.RangeSnapShotCrossRegionSentBytes.Inc(inc) - } - } else { - if isCrossZone { - s.metrics.RangeSnapShotCrossZoneSentBytes.Inc(inc) - } - } - } else { - // isReceived - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - s.metrics.RangeSnapShotCrossRegionRcvdBytes.Inc(inc) - } - } else { - if isCrossZone { - s.metrics.RangeSnapShotCrossZoneRcvdBytes.Inc(inc) - } - } + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } + return comparisonResult } // receiveSnapshot receives an incoming snapshot via a pre-opened GRPC stream. @@ -1143,10 +1105,12 @@ func (s *Store) receiveSnapshot( log.Infof(ctx, "accepted snapshot reservation for r%d", header.State.Desc.RangeID) } + comparisonResult := s.getLocalityComparison(ctx, + header.RaftMessageRequest.FromReplica.NodeID, header.RaftMessageRequest.ToReplica.NodeID) + recordBytesReceived := func(inc int64) { s.metrics.RangeSnapshotRcvdBytes.Inc(inc) - s.updateCrossLocalitySnapshotMetrics( - ctx, header.RaftMessageRequest.FromReplica, header.RaftMessageRequest.ToReplica, inc, false /* isSent */) + s.metrics.updateCrossLocalityMetricsOnSnapshotRcvd(comparisonResult, inc) switch header.Priority { case kvserverpb.SnapshotRequest_RECOVERY: diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index 02cf4c88d016..fbe8be7c0ae2 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -100,16 +100,6 @@ type StoreTestingKnobs struct { // no-op write, and the ForcedError field will be set. TestingPostApplyFilter kvserverbase.ReplicaApplyFilter - // TestingBatchRequestFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchRequestFilter func(*kvpb.BatchRequest) bool - - // TestingBatchResponseFilter intercepts Node.Batch() to pass the actual batch - // request byte count to the test. A boolean value is returned here to filter - // out changes in node metrics that are irrelevant to the test. - TestingBatchResponseFilter func(*kvpb.BatchResponse) bool - // TestingResponseErrorEvent is called when an error is returned applying // a command. TestingResponseErrorEvent func(context.Context, *kvpb.BatchRequest, error) diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index ee56469552e1..91d2b1427c19 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -675,14 +675,11 @@ func (l Locality) getFirstRegionFirstZone() ( return firstRegionValue, hasRegion, firstZoneKey, firstZoneValue, hasZone } -// IsCrossRegionCrossZone returns multiple values containing: -// 1. A boolean value indicating if this and the provided locality are -// cross-region. -// 2. Error indicating if either locality does not have a "region" tier key. -// 3. A boolean value indicating if this and the provided locality are -// cross-zone. -// 4. Error indicating if either locality does not have a "zone" tier key or if -// the first "zone" tier keys used by two localities are different. +// CompareWithLocality returns the comparison result between this and the +// provided other locality along with any lookup errors. Possible errors include +// 1. if either locality does not have a "region" tier key. 2. if either +// locality does not have a "zone" tier key or if the first "zone" tier keys +// used by two localities are different. // // Limitation: // - It is unfortunate that the tier key is hardcoded here. Ideally, we would @@ -699,14 +696,14 @@ func (l Locality) getFirstRegionFirstZone() ( // a single function to avoid overhead. If you are adding additional locality // tiers comparisons, it is recommended to handle them within one tier list // iteration. -func (l Locality) IsCrossRegionCrossZone( +func (l Locality) CompareWithLocality( other Locality, -) (isCrossRegion bool, regionErr error, isCrossZone bool, zoneErr error) { +) (_ LocalityComparisonType, regionErr error, zoneErr error) { firstRegionValue, hasRegion, firstZoneKey, firstZone, hasZone := l.getFirstRegionFirstZone() firstRegionValueOther, hasRegionOther, firstZoneKeyOther, firstZoneOther, hasZoneOther := other.getFirstRegionFirstZone() - isCrossRegion = firstRegionValue != firstRegionValueOther - isCrossZone = firstZone != firstZoneOther + isCrossRegion := firstRegionValue != firstRegionValueOther + isCrossZone := firstZone != firstZoneOther if !hasRegion || !hasRegionOther { isCrossRegion = false @@ -718,7 +715,15 @@ func (l Locality) IsCrossRegionCrossZone( zoneErr = errors.Errorf("localities must have a valid zone tier key for cross-zone comparison") } - return isCrossRegion, regionErr, isCrossZone, zoneErr + if isCrossRegion { + return LocalityComparisonType_CROSS_REGION, regionErr, zoneErr + } else { + if isCrossZone { + return LocalityComparisonType_SAME_REGION_CROSS_ZONE, regionErr, zoneErr + } else { + return LocalityComparisonType_SAME_REGION_SAME_ZONE, regionErr, zoneErr + } + } } // SharedPrefix returns the number of this locality's tiers which match those of diff --git a/pkg/roachpb/metadata.proto b/pkg/roachpb/metadata.proto index 9d363885977b..6bf392f31e8e 100644 --- a/pkg/roachpb/metadata.proto +++ b/pkg/roachpb/metadata.proto @@ -425,6 +425,22 @@ message Tier { optional string value = 2 [(gogoproto.nullable) = false]; } +// LocalityComparisonType represents different types of comparison results that +// indicate the relationship between two localities. +enum LocalityComparisonType { + // UNDEFINED represents an undefined comparison result, indicating error + // behavior. + UNDEFINED = 0; + // CROSS_REGION indicates that the two localities have different region tiers. + CROSS_REGION = 1; + // SAME_REGION_CROSS_ZONE indicates that the two localities have the same + // region tiers but different zone tiers. + SAME_REGION_CROSS_ZONE = 2; + // SAME_REGION_SAME_ZONE indicates that the two localities have same region + // and same zone tiers. + SAME_REGION_SAME_ZONE = 3; +} + message Version { option (gogoproto.equal) = true; option (gogoproto.goproto_stringer) = false; diff --git a/pkg/roachpb/metadata_test.go b/pkg/roachpb/metadata_test.go index de8940fbfaed..cc95cff2c4fa 100644 --- a/pkg/roachpb/metadata_test.go +++ b/pkg/roachpb/metadata_test.go @@ -221,7 +221,7 @@ func TestLocalityMatches(t *testing.T) { } } -func TestLocalityIsCrossRegionCrossZone(t *testing.T) { +func TestLocalityCompareWithLocality(t *testing.T) { regionErrStr := "localities must have a valid region tier key for cross-region comparison" zoneErrStr := "localities must have a valid zone tier key for cross-zone comparison" @@ -251,82 +251,81 @@ func TestLocalityIsCrossRegionCrossZone(t *testing.T) { for _, tc := range []struct { l string other string - isCrossRegion bool - isCrossZone bool + localityType LocalityComparisonType crossRegionErr string crossZoneErr string }{ // -------- Part 1: check for different zone tier alternatives -------- // Valid tier keys, same regions and same zones. {l: "region=us-west,zone=us-west-1", other: "region=us-west,zone=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, different regions and different zones. {l: "region=us-west,zone=us-west-1", other: "region=us-east,zone=us-west-2", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, different regions and different zones. {l: "region=us-west,availability-zone=us-west-1", other: "region=us-east,availability-zone=us-east-1", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Valid tier keys, same regions and different zones. {l: "region=us-west,az=us-west-1", other: "region=us-west,other-keys=us,az=us-east-1", - isCrossRegion: false, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Invalid zone tier key and different regions. {l: "region=us-west,availability-zone=us-west-1", other: "region=us-east,zone=us-east-1", - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Valid zone tier key (edge case), different zones and regions. {l: "region=us-west,zone=us-west-1", other: "region=us-east,zone=us-west-2,az=us-west-1", - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // Missing zone tier key and different regions. {l: "region=us-west,zone=us-west-1", other: "region=us-east", - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Different region and different zones with non-unique & invalid zone tier key. {l: "region=us-west,zone=us-west-1,az=us-west-2", other: "az=us-west-1,region=us-west,zone=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: "", crossZoneErr: zoneErrStr}, // Different regions and different zones with non-unique & valid zone tier key. {l: "region=us-west,az=us-west-2,zone=us-west-1", other: "region=us-west,az=us-west-1", - isCrossRegion: false, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: "", crossZoneErr: ""}, // Invalid region tier key and different zones. {l: "country=us,zone=us-west-1", other: "country=us,zone=us-west-2", - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // Missing region tier key and different zones. {l: "az=us-west-1", other: "region=us-east,az=us-west-2", - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // Invalid region and zone tier key. {l: "invalid-key=us-west,zone=us-west-1", other: "region=us-east,invalid-key=us-west-1", - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // Invalid region and zone tier key. {l: "country=us,dc=us-west-2", other: "country=us,dc=us-west-2", - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // -------- Part 2: single region, single zone -------- // One: (both) Two: (region) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr(secRegionStr, ""), - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // One: (both) Two: (zone) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // One: (region) Two: (region) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr(secRegionStr, ""), - isCrossRegion: true, isCrossZone: false, crossRegionErr: "", crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: zoneErrStr}, // One: (zone) Two: (zone) {l: makeLocalityStr("", firstZoneStr), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: true, crossRegionErr: regionErrStr, crossZoneErr: ""}, + localityType: LocalityComparisonType_SAME_REGION_CROSS_ZONE, crossRegionErr: regionErrStr, crossZoneErr: ""}, // One: (region) Two: (zone) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr("", secZoneStr), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (both) Two: (both) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr(secRegionStr, secZoneStr), - isCrossRegion: true, isCrossZone: true, crossRegionErr: "", crossZoneErr: ""}, + localityType: LocalityComparisonType_CROSS_REGION, crossRegionErr: "", crossZoneErr: ""}, // One: (none) Two: (none) {l: makeLocalityStr("", ""), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (region) Two: (none) {l: makeLocalityStr(firstRegionStr, ""), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (zone) Two: (none) {l: makeLocalityStr("", firstZoneStr), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, // One: (both) Two: (none) {l: makeLocalityStr(firstRegionStr, firstZoneStr), other: makeLocalityStr("", ""), - isCrossRegion: false, isCrossZone: false, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, + localityType: LocalityComparisonType_SAME_REGION_SAME_ZONE, crossRegionErr: regionErrStr, crossZoneErr: zoneErrStr}, } { t.Run(fmt.Sprintf("%s-crosslocality-%s", tc.l, tc.other), func(t *testing.T) { var l Locality @@ -334,24 +333,21 @@ func TestLocalityIsCrossRegionCrossZone(t *testing.T) { require.NoError(t, l.Set(tc.l)) require.NoError(t, other.Set(tc.other)) type localities struct { - isCrossRegion bool - isCrossZone bool + localityType LocalityComparisonType crossRegionErr string crossZoneErr string } - isCrossRegion, crossRegionErr, isCrossZone, crossZoneErr := l.IsCrossRegionCrossZone(other) - crossRegionErrStr := "" - if crossRegionErr != nil { - crossRegionErrStr = crossRegionErr.Error() + localityType, regionErr, zoneErr := l.CompareWithLocality(other) + regionErrStr := "" + if regionErr != nil { + regionErrStr = regionErr.Error() } - crossZoneErrStr := "" - if crossZoneErr != nil { - crossZoneErrStr = crossZoneErr.Error() + zoneErrStr := "" + if zoneErr != nil { + zoneErrStr = zoneErr.Error() } - actual := localities{isCrossRegion, isCrossZone, - crossRegionErrStr, crossZoneErrStr} - expected := localities{tc.isCrossRegion, tc.isCrossZone, - tc.crossRegionErr, tc.crossZoneErr} + actual := localities{localityType, regionErrStr, zoneErrStr} + expected := localities{tc.localityType, tc.crossRegionErr, tc.crossZoneErr} require.Equal(t, expected, actual) }) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index d44fae2ab7d8..3083a532039d 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -417,6 +417,7 @@ go_test( "decommission_test.go", "drain_test.go", "graphite_test.go", + "helpers_test.go", "index_usage_stats_test.go", "init_handshake_test.go", "intent_test.go", diff --git a/pkg/server/helpers_test.go b/pkg/server/helpers_test.go new file mode 100644 index 000000000000..f5613344b0ad --- /dev/null +++ b/pkg/server/helpers_test.go @@ -0,0 +1,66 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "reflect" + + "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/errors" +) + +// getNodeCounterMetrics fetches the count of each specified node metric from +// the `metricNames` parameter and returns the result as a map. The keys in the +// map represent the metric metadata names, while the corresponding values +// indicate the count of each metric. If any of the specified metric cannot be +// found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstNodeMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(nm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstNodeMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. +// Assumption: beforeMap and afterMap contain the same set of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} diff --git a/pkg/server/node.go b/pkg/server/node.go index ad35e68ad0b5..5c2f822164c7 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -15,7 +15,6 @@ import ( "context" "fmt" "net" - "reflect" "sort" "strings" "sync" @@ -285,39 +284,40 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *kvpb.Error) { nm.Latency.RecordValue(d.Nanoseconds()) } -// getNodeCounterMetrics fetches the count of each specified node metric from -// the `metricNames` parameter and returns the result as a map. The keys in the -// map represent the metric metadata names, while the corresponding values -// indicate the count of each metric. If any of the specified metric cannot be -// found or is not a counter, the function will return an error. -// -// Assumption: 1. The metricNames parameter should consist of string literals -// that match the metadata names used for metric counters. 2. Each metric name -// provided in `metricNames` must exist, unique and be a counter type. -func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]int64, error) { - metricCountMap := make(map[string]int64) - getFirstNodeMetric := func(metricName string) int64 { - metricsStruct := reflect.ValueOf(nm) - for i := 0; i < metricsStruct.NumField(); i++ { - field := metricsStruct.Field(i) - switch t := field.Interface().(type) { - case *metric.Counter: - if t.Name == metricName { - return t.Count() - } - } - } - return -1 - } - - for _, metricName := range metricsName { - count := getFirstNodeMetric(metricName) - if count == -1 { - return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) - } - metricCountMap[metricName] = count +// updateCrossLocalityMetricsOnBatchRequest updates nodeMetrics for batch +// requests processed on the node. The metrics being updated include 1. total +// byte count of batch requests processed 2. cross-region metrics, which monitor +// activities across different regions, and 3. cross-zone metrics, which monitor +// activities across different zones within the same region or in cases where +// region tiers are not configured. These metrics may include batches that were +// not successfully sent but were terminated at an early stage. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchRequest( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchRequestsBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchRequestBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchRequestBytes.Inc(inc) + } +} + +// updateCrossLocalityMetricsOnBatchResponse updates nodeMetrics for batch +// responses that are received back. It updates based on the comparisonResult +// parameter determined during the initial batch requests check. The underlying +// assumption is that the response should match the cross-region or cross-zone +// nature of the requests. +func (nm nodeMetrics) updateCrossLocalityMetricsOnBatchResponse( + comparisonResult roachpb.LocalityComparisonType, inc int64, +) { + nm.BatchResponsesBytes.Inc(inc) + switch comparisonResult { + case roachpb.LocalityComparisonType_CROSS_REGION: + nm.CrossRegionBatchResponseBytes.Inc(inc) + case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE: + nm.CrossZoneBatchResponseBytes.Inc(inc) } - return metricCountMap, nil } // A Node manages a map of stores (by store ID) for which it serves @@ -1325,84 +1325,34 @@ func (n *Node) batchInternal( return br, nil } -// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given -// batch request is cross-region and cross-zone respectively. -func (n *Node) isCrossRegionCrossZoneBatch( - ctx context.Context, ba *kvpb.BatchRequest, -) (bool, bool) { +// getLocalityComparison takes gatewayNodeID as input and returns the locality +// comparison result between the gateway node and the current node. This result +// indicates whether the two nodes are located in different regions or zones. +func (n *Node) getLocalityComparison( + ctx context.Context, gatewayNodeID roachpb.NodeID, +) roachpb.LocalityComparisonType { gossip := n.storeCfg.Gossip if gossip == nil { log.VEventf(ctx, 2, "gossip is not configured") - return false, false + return roachpb.LocalityComparisonType_UNDEFINED } - gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID) + gatewayNodeDesc, err := gossip.GetNodeDescriptor(gatewayNodeID) if err != nil { log.VEventf(ctx, 2, - "failed to perform look up for node descriptor %+v", err) - return false, false + "failed to perform look up for node descriptor %v", err) + return roachpb.LocalityComparisonType_UNDEFINED } - isCrossRegion, regionErr, isCrossZone, zoneErr := n.Descriptor.Locality. - IsCrossRegionCrossZone(gatewayNodeDesc.Locality) + comparisonResult, regionErr, zoneErr := n.Descriptor.Locality.CompareWithLocality(gatewayNodeDesc.Locality) if regionErr != nil { - log.VEventf(ctx, 2, "%v", regionErr) + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr) } if zoneErr != nil { - log.VEventf(ctx, 2, "%v", zoneErr) - } - - return isCrossRegion, isCrossZone -} - -// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a -// more meaningful way. Cross-region metrics monitor activities across different -// regions. Cross-zone metrics monitor cross-zone activities within the same -// region or in cases where region tiers are not configured. The check result is -// returned here to avoid redundant check for metrics updates after receiving -// batch responses. -func (n *Node) checkAndUpdateBatchRequestMetrics( - ctx context.Context, ba *kvpb.BatchRequest, shouldIncrement bool, -) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { - if !shouldIncrement { - return false, false - } - n.metrics.BatchRequestsBytes.Inc(int64(ba.Size())) - isCrossRegion, isCrossZone := n.isCrossRegionCrossZoneBatch(ctx, ba) - if isCrossRegion { - if !isCrossZone { - log.VEventf(ctx, 2, "unexpected: cross region but same zone") - } else { - n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossRegion = true - } - } else { - if isCrossZone { - n.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) - shouldIncCrossZone = true - } + log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr) } - return shouldIncCrossRegion, shouldIncCrossZone -} -// checkAndUpdateBatchResponseMetrics updates the batch response metrics based -// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These -// parameters are determined during the initial check for batch requests. The -// underlying assumption is that if requests were cross-region or cross-zone, -// the response should be as well. -func (n *Node) checkAndUpdateBatchResponseMetrics( - br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, shouldIncrement bool, -) { - if !shouldIncrement { - return - } - n.metrics.BatchResponsesBytes.Inc(int64(br.Size())) - if shouldIncCrossRegion { - n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) - } - if shouldIncCrossZone { - n.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) - } + return comparisonResult } // incrementBatchCounters increments counters to track the batch and composite @@ -1419,20 +1369,14 @@ func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { n.incrementBatchCounters(args) - shouldIncrement := true - if fn := n.storeCfg.TestingKnobs.TestingBatchRequestFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(args) - } - shouldIncCrossRegion, shouldIncCrossZone := n.checkAndUpdateBatchRequestMetrics(ctx, args, shouldIncrement) - // NB: Node.Batch is called directly for "local" calls. We don't want to // carry the associated log tags forward as doing so makes adding additional // log tags more expensive and makes local calls differ from remote calls. ctx = n.storeCfg.AmbientCtx.ResetAndAnnotateCtx(ctx) + comparisonResult := n.getLocalityComparison(ctx, args.GatewayNodeID) + n.metrics.updateCrossLocalityMetricsOnBatchRequest(comparisonResult, int64(args.Size())) + tenantID, ok := roachpb.ClientTenantFromContext(ctx) if !ok { tenantID = roachpb.SystemTenantID @@ -1477,14 +1421,7 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR br.Error = kvpb.NewError(err) } - shouldIncrement = true - if fn := n.storeCfg.TestingKnobs.TestingBatchResponseFilter; fn != nil { - // ShouldIncrement is always set to true in the production environment. The - // testing knob is used here to filter out metrics changes caused by batch - // requests that are irrelevant to our tests. - shouldIncrement = fn(br) - } - n.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone, shouldIncrement) + n.metrics.updateCrossLocalityMetricsOnBatchResponse(comparisonResult, int64(br.Size())) if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil { n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError)) } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 09436a8c6ef9..4ea8481bb159 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -39,10 +39,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -760,177 +759,76 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr) } -// getNodesMetrics retrieves the count of each node metric specified in -// metricNames associated with the specified serverIdx and returns the result as -// a map, along with any lookup errors. -func getNodeCounterMetrics( - tc serverutils.TestClusterInterface, serverIdx int, metricsName []string, -) (map[string]int64, error) { - ts := tc.Server(serverIdx).(*TestServer) - metrics, err := ts.node.metrics.getNodeCounterMetrics(metricsName) - return metrics, err -} - -// getNodesMetricsDiff returns the difference between the values of -// corresponding metrics in two maps. -// Assumption: beforeMap and afterMap contain the same set of keys. -func getNodesMetricsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { - diffMap := make(map[string]int64) - for metricName, beforeValue := range beforeMap { - if v, ok := afterMap[metricName]; ok { - diffMap[metricName] = v - beforeValue - } - } - return diffMap -} - -// TestNodeBatchMetrics verifies that node.Batch() correctly updates the +// TestNodeCrossLocalityMetrics verifies that +// updateCrossLocalityMetricsOnBatch{Request|Response} correctly updates // cross-region, cross-zone byte count metrics for batch requests sent and batch // responses received. -func TestNodeBatchMetrics(t *testing.T) { +func TestNodeCrossLocalityMetrics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - - // The initial setup ensures the correct configuration for three nodes (with - // different localities), single-range, and three replicas (on different - // nodes). - const numNodes = 3 - zcfg := zonepb.DefaultZoneConfig() - zcfg.NumReplicas = proto.Int32(1) - - type InterceptedInfo struct { - syncutil.Mutex - BatchRequestSize int64 - BatchResponseSize int64 - } - - info := InterceptedInfo{} - requestFn := func(ba *kvpb.BatchRequest) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if ba != nil && ba.Txn != nil { - if baTxnName := ba.Txn.Name; baTxnName == "cross-locality-test" { - info.BatchRequestSize = int64(ba.Size()) - return true + const expectedInc = 10 + + metricsNames := []string{ + "batch_requests.bytes", + "batch_requests.cross_region.bytes", + "batch_requests.cross_zone.bytes", + "batch_responses.bytes", + "batch_responses.cross_region.bytes", + "batch_responses.cross_zone.bytes"} + for _, tc := range []struct { + crossLocalityType roachpb.LocalityComparisonType + expectedMetricChange [6]int64 + forRequest bool + }{ + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{expectedInc, expectedInc, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, expectedInc, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{expectedInc, 0, 0, 0, 0, 0}, + forRequest: true, + }, + {crossLocalityType: roachpb.LocalityComparisonType_CROSS_REGION, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, expectedInc, 0}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, expectedInc}, + forRequest: false, + }, + {crossLocalityType: roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE, + expectedMetricChange: [6]int64{0, 0, 0, expectedInc, 0, 0}, + forRequest: false, + }, + } { + t.Run(fmt.Sprintf("%-v", tc.crossLocalityType), func(t *testing.T) { + metrics := makeNodeMetrics(metric.NewRegistry(), 1) + beforeMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) } - } - return false - } - - responseFn := func(br *kvpb.BatchResponse) bool { - // A boolean is returned here to filter out changes in node metrics caused - // by batch requests that are irrelevant to our test case. Most of these - // batch requests are part of the system config and are difficult to - // disable. - info.Lock() - defer info.Unlock() - if br != nil && br.Txn != nil { - if brTxnName := br.Txn.Name; brTxnName == "cross-locality-test" { - info.BatchResponseSize = int64(br.Size()) - return true + if tc.forRequest { + metrics.updateCrossLocalityMetricsOnBatchRequest(tc.crossLocalityType, expectedInc) + } else { + metrics.updateCrossLocalityMetricsOnBatchResponse(tc.crossLocalityType, expectedInc) } - } - return false - } - serverLocality := [numNodes]roachpb.Locality{ - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, - {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, - } - - serverArgs := make(map[int]base.TestServerArgs) - for i := 0; i < numNodes; i++ { - serverArgs[i] = base.TestServerArgs{ - Locality: serverLocality[i], - Knobs: base.TestingKnobs{ - Server: &TestingKnobs{ - DefaultZoneConfigOverride: &zcfg, - }, - Store: &kvserver.StoreTestingKnobs{ - TestingBatchRequestFilter: requestFn, - TestingBatchResponseFilter: responseFn, - }, - }, - } - } - - ctx := context.Background() - var clusterArgs = base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgsPerNode: serverArgs, - } - - tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs) - defer tc.Stopper().Stop(ctx) - - testKey := tc.ScratchRange(t) - desc := tc.LookupRangeOrFatal(t, testKey) - - metrics := []string{"batch_requests.bytes", "batch_responses.bytes", - "batch_requests.cross_region.bytes", "batch_responses.cross_region.bytes", - "batch_requests.cross_zone.bytes", "batch_responses.cross_zone.bytes"} - receiverBefore, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - - // sendBatchToServer is a testing helper that sends a batch request from - // server[serverIndex] to server[0] and returns the number of bytes a batch - // request sent and a batch response received. - sendFromServer := func(serverIndex int) (int64, int64) { - get := &kvpb.GetRequest{ - RequestHeader: kvpb.RequestHeader{Key: testKey}, - } - var ba kvpb.BatchRequest - ba.GatewayNodeID = tc.Server(serverIndex).NodeID() - ba.Add(get) - ba.RangeID = desc.RangeID - ba.Replica.StoreID = tc.Server(0).GetFirstStoreID() - txn := roachpb.MakeTransaction( - "cross-locality-test", testKey, 0, 0, - hlc.Timestamp{WallTime: 1}, 0, 0) - ba.Txn = &txn - _, err := tc.Server(0).(*TestServer).GetNode().Batch(ctx, &ba) - require.NoError(t, err) - info.Lock() - defer info.Unlock() - return info.BatchRequestSize, info.BatchResponseSize + afterMetrics, err := metrics.getNodeCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + metricsDiff := getMapsDiff(beforeMetrics, afterMetrics) + expectedDiff := make(map[string]int64, 6) + for i, inc := range tc.expectedMetricChange { + expectedDiff[metricsNames[i]] = inc + } + require.Equal(t, metricsDiff, expectedDiff) + }) } - // The first batch request is sent from server0 to server0, enforcing a - // within-region, within-zone batch request / response. - firstBatchRequest, firstBatchResponse := sendFromServer(0) - // The second batch request is sent from server1 to server0, enforcing a - // cross-region batch request / response. - secBatchRequest, secBatchResponse := sendFromServer(1) - // The third batch request is sent from server2 to server0, enforcing a - // cross-zone, within-region batch request / response. - thirdBatchRequest, thirdBatchResponse := sendFromServer(2) - totalRequest := firstBatchRequest + secBatchRequest + thirdBatchRequest - totalResponse := firstBatchResponse + secBatchResponse + thirdBatchResponse - - t.Run("receiver", func(t *testing.T) { - receiverAfter, err := getNodeCounterMetrics(tc, 0, metrics) - if err != nil { - t.Error("failed to look up node metrics") - } - receiverDelta := getNodesMetricsDiff(receiverBefore, receiverAfter) - receiverExpected := map[string]int64{ - "batch_requests.bytes": firstBatchRequest + secBatchRequest + thirdBatchRequest, - "batch_responses.bytes": firstBatchResponse + secBatchResponse + thirdBatchResponse, - "batch_requests.cross_region.bytes": secBatchRequest, - "batch_responses.cross_region.bytes": secBatchResponse, - "batch_requests.cross_zone.bytes": thirdBatchRequest, - "batch_responses.cross_zone.bytes": thirdBatchResponse, - } - require.Equal(t, receiverExpected, receiverDelta) - require.Equal(t, firstBatchRequest, totalRequest-secBatchRequest-thirdBatchRequest) - require.Equal(t, firstBatchResponse, totalResponse-secBatchResponse-thirdBatchResponse) - }) } func TestGetTenantWeights(t *testing.T) {