Skip to content

Commit

Permalink
kv: refactor metric updates test cases
Browse files Browse the repository at this point in the history
Previously, the metrics updates testing knobs were embedded within the logic of
`DistSender.Send()` and `Node.Batch()`, which could confuse the readers of the
code. This commit addresses the issue by removing end to end tests and replacing
them with more focused unit tests. In addition, this commit also includes some
refactoring of function parameters.

Note that this commit does not change any existing functionality.

Part of: #103983

Release note: None
  • Loading branch information
wenyihu6 committed Jun 22, 2023
1 parent cce98d1 commit da864f0
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 527 deletions.
128 changes: 55 additions & 73 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,46 @@ func (dm *DistSenderMetrics) getDistSenderCounterMetrics(
return metricCountMap, nil
}

// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
// DistSenderMetrics for batch requests that have been divided and are currently
// forwarding to a specific replica for the corresponding range. The metrics
// being updated include 1. total byte count of replica-addressed batch requests
// processed 2. cross-region metrics, which monitor activities across different
// regions, and 3. cross-zone metrics, which monitor activities across different
// zones within the same region or in cases where region tiers are not
// configured. These metrics may include batches that were not successfully sent
// but were terminated at an early stage.
func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(
comparisonResult roachpb.LocalityComparisonType, inc int64,
) {
dm.ReplicaAddressedBatchRequestBytes.Inc(inc)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
dm.CrossRegionBatchRequestBytes.Inc(inc)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
dm.CrossZoneBatchRequestBytes.Inc(inc)
case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE:
// No metrics or error reporting.
}
}

// updateCrossLocalityMetricsOnReplicaAddressedBatchResponse updates
// DistSenderMetrics for batch responses that are received back from transport
// rpc. It updates based on the comparisonResult parameter determined during the
// initial batch requests check. The underlying assumption is that the response
// should match the cross-region or cross-zone nature of the requests.
func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(
comparisonResult roachpb.LocalityComparisonType, inc int64,
) {
dm.ReplicaAddressedBatchResponseBytes.Inc(inc)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
dm.CrossRegionBatchResponseBytes.Inc(inc)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
dm.CrossZoneBatchResponseBytes.Inc(inc)
}
}

// FirstRangeProvider is capable of providing DistSender with the descriptor of
// the first range in the cluster and notifying the DistSender when the first
// range in the cluster has changed.
Expand Down Expand Up @@ -465,14 +505,6 @@ type DistSender struct {

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch request byte count to the test.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch response byte count to the test.
BatchResponseInterceptor func(br *kvpb.BatchResponse)

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -630,14 +662,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

if cfg.TestingKnobs.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor
}

if cfg.TestingKnobs.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor
}

return ds
}

Expand Down Expand Up @@ -2293,15 +2317,11 @@ func (ds *DistSender) sendToReplicas(
(desc.Generation == 0 && routing.LeaseSeq() == 0),
}

if ds.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor(ba)
}
comparisonResult := ds.checkAndUpdateCrossLocalityBatchMetrics(ctx, ba)
comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

br, err = transport.SendNext(ctx, ba)
if ds.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor(br)
}
ds.updateCrossLocalityBatchMetrics(br, comparisonResult)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
Expand Down Expand Up @@ -2557,71 +2577,33 @@ func (ds *DistSender) sendToReplicas(
}
}

// getCrossLocalityComparison compares the localities of the current node and
// the destination range node to determine if the given batch request is
// cross-region and cross-zone.
func (ds *DistSender) getCrossLocalityComparison(
ctx context.Context, ba *kvpb.BatchRequest,
// getLocalityComparison takes two nodeIDs as input and returns the locality
// comparison result between their corresponding nodes. This result indicates
// whether the two nodes are located in different regions or zones.
func (ds *DistSender) getLocalityComparison(
ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID,
) roachpb.LocalityComparisonType {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID)
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID)
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}

comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "unable to determine if batch is cross region %v", regionErr)
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "unable to determine if batch is cross zone %v", zoneErr)
}
return comparisonResult
}

// checkAndUpdateCrossLocalityBatchMetrics updates the batch requests metrics in
// a more meaningful way. Cross-region metrics monitor activities across
// different regions. Cross-zone metrics monitor cross-zone activities within
// the same region or in cases where region tiers are not configured. The
// locality comparison result is returned here to avoid redundant check for
// metrics updates after receiving batch responses.
func (ds *DistSender) checkAndUpdateCrossLocalityBatchMetrics(
ctx context.Context, ba *kvpb.BatchRequest,
) roachpb.LocalityComparisonType {
ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size()))
comparisonResult := ds.getCrossLocalityComparison(ctx, ba)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size()))
case roachpb.LocalityComparisonType_SAME_REGION_SAME_ZONE:
// No metrics or error reporting.
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
}
return comparisonResult
}

// updateCrossLocalityBatchMetrics updates the batch response metrics based on
// the comparisonResult parameter determined during the initial batch requests
// check. The underlying assumption is that the response should match the
// cross-region or cross-zone nature of the request.
func (ds *DistSender) updateCrossLocalityBatchMetrics(
br *kvpb.BatchResponse, comparisonResult roachpb.LocalityComparisonType,
) {
ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size()))
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size()))
}
}

// getCostControllerConfig returns the config for the tenant cost model. This
// returns nil if no KV interceptors are associated with the DistSender, or the
// KV interceptor is not a multitenant.TenantSideCostController.
Expand Down
Loading

0 comments on commit da864f0

Please sign in to comment.