Skip to content

Commit

Permalink
kvclient: add cross-region byte count metrics to DistSender
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region traffic in batch
requests / responses. This limitation becomes problematic when trying to
evaluate cross-region traffic managed by nodes.

To improve this issue, this commit adds two new metrics to DistSenderMetrics –
CrossRegionBatchRequestBytes and CrossRegionBatchResponseBytes. These metrics
track the byte count for batch requests sent and responses received in
cross-region batches within DistSender.

Note that DistSender resides on the gateway node that receives SQL queries, so
DistSender metrics are updated when DistSender sends cross-region batches to
another node.

Part of: cockroachdb#103983
Release note (ops change): Two new metrics - CrossRegionBatchRequestBytes,
CrossRegionBatchResponseBytes - are now added to DistSender metrics.
  • Loading branch information
wenyihu6 committed May 30, 2023
1 parent 48c2bc6 commit cdfa67c
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 36 deletions.
152 changes: 116 additions & 36 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{
Name: "distsender.batch_requests.cross_region",
Help: `Total byte count of cross-region batch requests sent`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{
Name: "distsender.batch_responses.cross_region",
Help: `Total byte count of cross-region batch responses received`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// CanSendToFollower is used by the DistSender to determine if it needs to look
Expand Down Expand Up @@ -241,44 +253,48 @@ func max(a, b int64) int64 {

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
CrossRegionBatchRequestBytes *metric.Counter
CrossRegionBatchResponseBytes *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
m := DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes),
CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
method := kvpb.Method(i).String()
Expand Down Expand Up @@ -367,6 +383,12 @@ type DistSender struct {

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// BatchRequestInterceptor intercepts DistSender.Send() to validate BatchRequest properties.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchResponseInterceptor intercepts DistSender.Send() to validate BatchResponse properties.
BatchResponseInterceptor func(br *kvpb.BatchResponse)

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -520,8 +542,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
if cfg.TestingKnobs.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor
}

if cfg.TestingKnobs.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor
}

return ds
Expand Down Expand Up @@ -2173,9 +2199,27 @@ func (ds *DistSender) sendToReplicas(
ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested ||
(desc.Generation == 0 && routing.LeaseSeq() == 0),
}

if ds.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor(ba)
}

isCrossRegion, error := ds.maybeIncrementCrossRegionBatchMetrics(ba)
if error != nil {
log.Eventf(ctx, "%v", error)
}

br, err = transport.SendNext(ctx, ba)

if ds.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor(br)
}
ds.maybeIncrementErrCounters(br, err)

if isCrossRegion {
ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}

if err != nil {
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
Expand Down Expand Up @@ -2429,6 +2473,42 @@ func (ds *DistSender) sendToReplicas(
}
}

// maybeIncrementCrossRegionBatchMetrics updates DistSender metrics for
// cross-region batch requests. It takes a BatchRequest parameter to extract
// information about the gatewayNodeID, destinationNodeID, and the byte count of
// batch request. It returns (bool, error) indicating whether the batch request
// is cross-region and if any errors occurred during the process.
//
// DistSender first tries to obtain the localities of the nodes. If DistSender
// is unable to obtain the node descriptor for gateway or destination node, or
// if the locality of any node does not have a “region” key, the function
// returns (false, error). If no errors occured, it checks if the gateway and
// destination nodes are in different regions. If they are, it updates the
// cross-region metrics and returns (true, nil); otherwise, it returns (false,
// nil).
//
// isCrossRegion is returned here to avoid redundant checks for cross-region
// after receiving batch responses.
func (ds *DistSender) maybeIncrementCrossRegionBatchMetrics(ba *kvpb.BatchRequest) (bool, error) {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID)
if err != nil {
return false, err
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID)
if err != nil {
return false, err
}
isCrossRegion, err := gatewayNodeDesc.Locality.IsCrossRegion(destinationNodeDesc.Locality)
if err != nil {
return false, err
}
if isCrossRegion {
ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
return true, nil
}
return false, nil
}

// getCostControllerConfig returns the config for the tenant cost model. This
// returns nil if no KV interceptors are associated with the DistSender, or the
// KV interceptor is not a multitenant.TenantSideCostController.
Expand Down
118 changes: 118 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5651,6 +5651,124 @@ func TestDistSenderRPCMetrics(t *testing.T) {
require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1))
}

// TestDistSenderCrossRegionBatchMetrics verifies that the DistSender.Send()
// correctly updates the cross-region byte count metrics for batches requests
// sent and batch responses received.
func TestDistSenderCrossRegionBatchMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// The initial setup ensures the correct setup for three nodes (with different
// localities), single-range, three replicas (on different nodes), gossip, and
// DistSender.
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rangeDesc := testUserRangeDescriptor3Replicas
replicas := rangeDesc.InternalReplicas

makeNewNodeWithLocality := func(nodeID roachpb.NodeID) roachpb.NodeDescriptor {
return roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.UnresolvedAddr{},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: fmt.Sprintf("us-east-%v", nodeID)},
{Key: "zone", Value: fmt.Sprintf("us-east-%va", nodeID)},
{Key: "az", Value: "a"},
},
},
}
}

ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
makeNewNodeWithLocality(1),
makeNewNodeWithLocality(2),
makeNewNodeWithLocality(3),
}}

var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
return ba.CreateReply(), nil
}
expectedRequestBytes := -1
expectedResponseBytes := -1
cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
BatchRequestInterceptor: func(ba *kvpb.BatchRequest) {
expectedRequestBytes = ba.Size()
},
BatchResponseInterceptor: func(br *kvpb.BatchResponse) {
expectedResponseBytes = br.Size()
},
},
Settings: cluster.MakeTestingClusterSettings(),
}

ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = rangeDesc.StartKey.AsRawKey()
ba.Add(get)

ba.Header = kvpb.Header{
GatewayNodeID: 1,
}

// In the given setup, gateway is set to be on Node 1 which is where
// replicas[0] resides. The first test sets replica[1] as the leaseholder for
// the range, enforcing a cross-region batch request / response. It is
// expected that the metrics will be updated to reflect this cross-region
// scenario.
dsFirst := NewDistSender(cfg)
dsFirst.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: rangeDesc,
Lease: roachpb.Lease{
Replica: replicas[1],
},
})
if _, err := dsFirst.Send(ctx, ba); err != nil {
t.Fatal(err)
}
if expectedRequestBytes == -1 || expectedResponseBytes == -1 {
t.Errorf("expected batch bytes not set correctly")
}

requestBytesMetrics := dsFirst.metrics.CrossRegionBatchRequestBytes.Count()
responseBytesMetrics := dsFirst.metrics.CrossRegionBatchResponseBytes.Count()
require.Equal(t, int64(expectedRequestBytes), requestBytesMetrics,
fmt.Sprintf("expected cross-region bytes sent: %v but got %v", expectedRequestBytes, requestBytesMetrics))
require.Equal(t, int64(expectedResponseBytes), responseBytesMetrics,
fmt.Sprintf("expected cross-region bytes received: %v but got %v", expectedResponseBytes, responseBytesMetrics))

// The second test sets replica[0] as the leaseholder for the range, enforcing
// a within-same-region batch request / response. In this case, the metrics
// are expected to remain unchanged as no cross-region activities are
// involved.
dsSec := NewDistSender(cfg)
dsSec.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: rangeDesc,
Lease: roachpb.Lease{
Replica: replicas[0],
},
})
if _, err := dsSec.Send(ctx, ba); err != nil {
t.Fatal(err)
}

requestBytesMetrics = dsSec.metrics.CrossRegionBatchRequestBytes.Count()
responseBytesMetrics = dsSec.metrics.CrossRegionBatchResponseBytes.Count()
require.Equal(t, int64(0), requestBytesMetrics,
fmt.Sprintf("expected cross-region bytes sent: 0 but got %v", requestBytesMetrics))
require.Equal(t, int64(0), responseBytesMetrics,
fmt.Sprintf("expected cross-region bytes received: 0 but got %v", responseBytesMetrics))
}

// TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff
// ensures that a NLHE from an uninitialized replica, which points to a replica
// that isn't part of the range, doesn't result in the dist sender getting
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ type ClientTestingKnobs struct {
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// Currently, BatchRequestInterceptor and BatchResponseInterceptor only
// intercepts DistSender.Send() to pass the actual batch request and response
// byte count to the test. However, it can be easily extended to validate
// other properties of batch requests / response if required.

// BatchRequestInterceptor is designed to intercept calls to DistSender
// function calls to validate BatchRequest properties.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchResponseInterceptor is designed to intercept calls to DistSender
// function calls to validate BatchResponse properties.
BatchResponseInterceptor func(br *kvpb.BatchResponse)
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down
16 changes: 16 additions & 0 deletions pkg/roachpb/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ func (l Locality) Matches(filter Locality) (bool, Tier) {
return true, Tier{}
}

// IsCrossRegion checks if both this and passed locality has a tier with "region"
// as the key. If either locality does not have a region tier, it returns
// (false, error). Otherwise, it compares their region values and returns (true,
// nil) if they are different, and (false, nil) otherwise.
func (l Locality) IsCrossRegion(other Locality) (bool, error) {
// It is unfortunate that the "region" tier key is hardcoded here. Ideally, we
// would prefer a more robust way to determine node locality regions.
region, hasRegion := l.Find("region")
otherRegion, hasRegionOther := other.Find("region")

if hasRegion && hasRegionOther {
return region != otherRegion, nil
}
return false, errors.Errorf("locality must have a tier with key region")
}

// SharedPrefix returns the number of this locality's tiers which match those of
// the passed locality.
func (l Locality) SharedPrefix(other Locality) int {
Expand Down

0 comments on commit cdfa67c

Please sign in to comment.