diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 711b0ab275da..7ce5ba1297aa 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -180,6 +180,18 @@ This counts the number of ranges with an active rangefeed that are performing ca Measurement: "Count", Unit: metric.Unit_COUNT, } + metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_region", + Help: `Total byte count of cross-region batch requests sent`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_region", + Help: `Total byte count of cross-region batch responses received`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // CanSendToFollower is used by the DistSender to determine if it needs to look @@ -241,44 +253,48 @@ func max(a, b int64) int64 { // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { - BatchCount *metric.Counter - PartialBatchCount *metric.Counter - AsyncSentCount *metric.Counter - AsyncThrottledCount *metric.Counter - SentCount *metric.Counter - LocalSentCount *metric.Counter - NextReplicaErrCount *metric.Counter - NotLeaseHolderErrCount *metric.Counter - InLeaseTransferBackoffs *metric.Counter - RangeLookups *metric.Counter - SlowRPCs *metric.Gauge - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter - MethodCounts [kvpb.NumMethods]*metric.Counter - ErrCounts [kvpb.NumErrors]*metric.Counter + BatchCount *metric.Counter + PartialBatchCount *metric.Counter + CrossRegionBatchRequestBytes *metric.Counter + CrossRegionBatchResponseBytes *metric.Counter + AsyncSentCount *metric.Counter + AsyncThrottledCount *metric.Counter + SentCount *metric.Counter + LocalSentCount *metric.Counter + NextReplicaErrCount *metric.Counter + NotLeaseHolderErrCount *metric.Counter + InLeaseTransferBackoffs *metric.Counter + RangeLookups *metric.Counter + SlowRPCs *metric.Gauge + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter + MethodCounts [kvpb.NumMethods]*metric.Counter + ErrCounts [kvpb.NumErrors]*metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { m := DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), - InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), - RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), - SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes), + CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), + RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), + SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), } for i := range m.MethodCounts { method := kvpb.Method(i).String() @@ -367,6 +383,12 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + // BatchRequestInterceptor intercepts DistSender.Send() to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor intercepts DistSender.Send() to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) + // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -520,8 +542,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency } - if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { - ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch + if cfg.TestingKnobs.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor + } + + if cfg.TestingKnobs.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor } return ds @@ -2173,9 +2199,27 @@ func (ds *DistSender) sendToReplicas( ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested || (desc.Generation == 0 && routing.LeaseSeq() == 0), } + + if ds.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor(ba) + } + + isCrossRegion, error := ds.maybeIncrementCrossRegionBatchMetrics(ba) + if error != nil { + log.Eventf(ctx, "%v", error) + } + br, err = transport.SendNext(ctx, ba) + + if ds.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor(br) + } ds.maybeIncrementErrCounters(br, err) + if isCrossRegion { + ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) + } + if err != nil { if grpcutil.IsAuthError(err) { // Authentication or authorization error. Propagate. @@ -2429,6 +2473,42 @@ func (ds *DistSender) sendToReplicas( } } +// maybeIncrementCrossRegionBatchMetrics updates DistSender metrics for +// cross-region batch requests. It takes a BatchRequest parameter to extract +// information about the gatewayNodeID, destinationNodeID, and the byte count of +// batch request. It returns (bool, error) indicating whether the batch request +// is cross-region and if any errors occurred during the process. +// +// DistSender first tries to obtain the localities of the nodes. If DistSender +// is unable to obtain the node descriptor for gateway or destination node, or +// if the locality of any node does not have a “region” key, the function +// returns (false, error). If no errors occured, it checks if the gateway and +// destination nodes are in different regions. If they are, it updates the +// cross-region metrics and returns (true, nil); otherwise, it returns (false, +// nil). +// +// isCrossRegion is returned here to avoid redundant checks for cross-region +// after receiving batch responses. +func (ds *DistSender) maybeIncrementCrossRegionBatchMetrics(ba *kvpb.BatchRequest) (bool, error) { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + if err != nil { + return false, err + } + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + if err != nil { + return false, err + } + isCrossRegion, err := gatewayNodeDesc.Locality.IsCrossRegion(destinationNodeDesc.Locality) + if err != nil { + return false, err + } + if isCrossRegion { + ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) + return true, nil + } + return false, nil +} + // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index b49f6bc18271..dc715412a7ba 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,6 +5651,124 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } +// TestDistSenderCrossRegionBatchMetrics verifies that the DistSender.Send() +// correctly updates the cross-region byte count metrics for batches requests +// sent and batch responses received. +func TestDistSenderCrossRegionBatchMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // The initial setup ensures the correct setup for three nodes (with different + // localities), single-range, three replicas (on different nodes), gossip, and + // DistSender. + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + rangeDesc := testUserRangeDescriptor3Replicas + replicas := rangeDesc.InternalReplicas + + makeNewNodeWithLocality := func(nodeID roachpb.NodeID) roachpb.NodeDescriptor { + return roachpb.NodeDescriptor{ + NodeID: nodeID, + Address: util.UnresolvedAddr{}, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: fmt.Sprintf("us-east-%v", nodeID)}, + {Key: "zone", Value: fmt.Sprintf("us-east-%va", nodeID)}, + {Key: "az", Value: "a"}, + }, + }, + } + } + + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + makeNewNodeWithLocality(1), + makeNewNodeWithLocality(2), + makeNewNodeWithLocality(3), + }} + + var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + return ba.CreateReply(), nil + } + expectedRequestBytes := -1 + expectedResponseBytes := -1 + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { + expectedRequestBytes = ba.Size() + }, + BatchResponseInterceptor: func(br *kvpb.BatchResponse) { + expectedResponseBytes = br.Size() + }, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ba := &kvpb.BatchRequest{} + get := &kvpb.GetRequest{} + get.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(get) + + ba.Header = kvpb.Header{ + GatewayNodeID: 1, + } + + // In the given setup, gateway is set to be on Node 1 which is where + // replicas[0] resides. The first test sets replica[1] as the leaseholder for + // the range, enforcing a cross-region batch request / response. It is + // expected that the metrics will be updated to reflect this cross-region + // scenario. + dsFirst := NewDistSender(cfg) + dsFirst.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[1], + }, + }) + if _, err := dsFirst.Send(ctx, ba); err != nil { + t.Fatal(err) + } + if expectedRequestBytes == -1 || expectedResponseBytes == -1 { + t.Errorf("expected batch bytes not set correctly") + } + + requestBytesMetrics := dsFirst.metrics.CrossRegionBatchRequestBytes.Count() + responseBytesMetrics := dsFirst.metrics.CrossRegionBatchResponseBytes.Count() + require.Equal(t, int64(expectedRequestBytes), requestBytesMetrics, + fmt.Sprintf("expected cross-region bytes sent: %v but got %v", expectedRequestBytes, requestBytesMetrics)) + require.Equal(t, int64(expectedResponseBytes), responseBytesMetrics, + fmt.Sprintf("expected cross-region bytes received: %v but got %v", expectedResponseBytes, responseBytesMetrics)) + + // The second test sets replica[0] as the leaseholder for the range, enforcing + // a within-same-region batch request / response. In this case, the metrics + // are expected to remain unchanged as no cross-region activities are + // involved. + dsSec := NewDistSender(cfg) + dsSec.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[0], + }, + }) + if _, err := dsSec.Send(ctx, ba); err != nil { + t.Fatal(err) + } + + requestBytesMetrics = dsSec.metrics.CrossRegionBatchRequestBytes.Count() + responseBytesMetrics = dsSec.metrics.CrossRegionBatchResponseBytes.Count() + require.Equal(t, int64(0), requestBytesMetrics, + fmt.Sprintf("expected cross-region bytes sent: 0 but got %v", requestBytesMetrics)) + require.Equal(t, int64(0), responseBytesMetrics, + fmt.Sprintf("expected cross-region bytes received: 0 but got %v", responseBytesMetrics)) +} + // TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff // ensures that a NLHE from an uninitialized replica, which points to a replica // that isn't part of the range, doesn't result in the dist sender getting diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 58bb2efafe1d..dc7b88927ad7 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,6 +61,19 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // Currently, BatchRequestInterceptor and BatchResponseInterceptor only + // intercepts DistSender.Send() to pass the actual batch request and response + // byte count to the test. However, it can be easily extended to validate + // other properties of batch requests / response if required. + + // BatchRequestInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} diff --git a/pkg/roachpb/metadata.go b/pkg/roachpb/metadata.go index 30a6d4b5fa13..d42b65d62eee 100644 --- a/pkg/roachpb/metadata.go +++ b/pkg/roachpb/metadata.go @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) { return true, Tier{} } +// IsCrossRegion checks if both this and passed locality has a tier with "region" +// as the key. If either locality does not have a region tier, it returns +// (false, error). Otherwise, it compares their region values and returns (true, +// nil) if they are different, and (false, nil) otherwise. +func (l Locality) IsCrossRegion(other Locality) (bool, error) { + // It is unfortunate that the "region" tier key is hardcoded here. Ideally, we + // would prefer a more robust way to determine node locality regions. + region, hasRegion := l.Find("region") + otherRegion, hasRegionOther := other.Find("region") + + if hasRegion && hasRegionOther { + return region != otherRegion, nil + } + return false, errors.Errorf("locality must have a tier with key region") +} + // SharedPrefix returns the number of this locality's tiers which match those of // the passed locality. func (l Locality) SharedPrefix(other Locality) int {