Skip to content

Commit

Permalink
kv: refactor CompareWithLocality to use enum
Browse files Browse the repository at this point in the history
Prior to this commit, `CompareWithLocality` returned two boolean values
indicating whether two localities were cross-region and cross-zone. However,
this required callers to perform additional cross-comparison of these boolean
values to make meaningful metrics updates.

To simplify this, this commit introduces a new enum type
`LocalityComparisonType`. It provides four locality comparison results: cross
region, same region cross zone, same region same zone, and undefined
(indicating error behavior). This refactoring allow the caller to directly use
the comparison result without additional operations.

In addition, this commit also updates the logic to classify activities between
different regions as cross-regional, regardless of the zone tiers’
configuration. Initially, cross-region but same-zone tiers activities were
flagged as misconfiguration. After some discussion, we have decided that
regions should be non-overlapping. Hence, same zone tiers from different
regions should still be considered as different zones.

In addition, this commit also includes some refactoring of function
parameters.

Note that this commit does not change any existing functionality.

Part of: #103983

Release note: None
  • Loading branch information
wenyihu6 committed Jun 30, 2023
1 parent 5b9824f commit af9cbad
Show file tree
Hide file tree
Showing 16 changed files with 462 additions and 711 deletions.
170 changes: 54 additions & 116 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package kvcoord
import (
"context"
"fmt"
"reflect"
"runtime"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -358,41 +357,42 @@ func makeDistSenderMetrics() DistSenderMetrics {
return m
}

// getDistSenderCounterMetrics fetches the count of each specified DisSender
// metric from the `metricNames` parameter and returns the result as a map. The
// keys in the map represent the metric metadata names, while the corresponding
// values indicate the count of each metric. If any of the specified metric
// cannot be found or is not a counter, the function will return an error.
//
// Assumption: 1. The metricNames parameter should consist of string literals
// that match the metadata names used for metric counters. 2. Each metric name
// provided in `metricNames` must exist, unique and be a counter type.
func (dm *DistSenderMetrics) getDistSenderCounterMetrics(
metricsName []string,
) (map[string]int64, error) {
metricCountMap := make(map[string]int64)
getFirstDistSenderMetric := func(metricName string) int64 {
metricsStruct := reflect.ValueOf(*dm)
for i := 0; i < metricsStruct.NumField(); i++ {
field := metricsStruct.Field(i)
switch t := field.Interface().(type) {
case *metric.Counter:
if t.Name == metricName {
return t.Count()
}
}
}
return -1
// updateCrossLocalityMetricsOnReplicaAddressedBatchRequest updates
// DistSenderMetrics for batch requests that have been divided and are currently
// forwarding to a specific replica for the corresponding range. The metrics
// being updated include 1. total byte count of replica-addressed batch requests
// processed 2. cross-region metrics, which monitor activities across different
// regions, and 3. cross-zone metrics, which monitor activities across different
// zones within the same region or in cases where region tiers are not
// configured. These metrics may include batches that were not successfully sent
// but were terminated at an early stage.
func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(
comparisonResult roachpb.LocalityComparisonType, inc int64,
) {
dm.ReplicaAddressedBatchRequestBytes.Inc(inc)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
dm.CrossRegionBatchRequestBytes.Inc(inc)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
dm.CrossZoneBatchRequestBytes.Inc(inc)
}
}

for _, metricName := range metricsName {
count := getFirstDistSenderMetric(metricName)
if count == -1 {
return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName)
}
metricCountMap[metricName] = count
// updateCrossLocalityMetricsOnReplicaAddressedBatchResponse updates
// DistSenderMetrics for batch responses that are received back from transport
// rpc. It updates based on the comparisonResult parameter determined during the
// initial batch requests check. The underlying assumption is that the response
// should match the cross-region or cross-zone nature of the requests.
func (dm *DistSenderMetrics) updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(
comparisonResult roachpb.LocalityComparisonType, inc int64,
) {
dm.ReplicaAddressedBatchResponseBytes.Inc(inc)
switch comparisonResult {
case roachpb.LocalityComparisonType_CROSS_REGION:
dm.CrossRegionBatchResponseBytes.Inc(inc)
case roachpb.LocalityComparisonType_SAME_REGION_CROSS_ZONE:
dm.CrossZoneBatchResponseBytes.Inc(inc)
}
return metricCountMap, nil
}

// FirstRangeProvider is capable of providing DistSender with the descriptor of
Expand Down Expand Up @@ -465,14 +465,6 @@ type DistSender struct {

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch request byte count to the test.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch response byte count to the test.
BatchResponseInterceptor func(br *kvpb.BatchResponse)

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -630,14 +622,6 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
}

if cfg.TestingKnobs.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor
}

if cfg.TestingKnobs.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor
}

return ds
}

Expand Down Expand Up @@ -2293,15 +2277,11 @@ func (ds *DistSender) sendToReplicas(
(desc.Generation == 0 && routing.LeaseSeq() == 0),
}

if ds.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor(ba)
}
shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba)
comparisonResult := ds.getLocalityComparison(ctx, ds.getNodeID(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

br, err = transport.SendNext(ctx, ba)
if ds.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor(br)
}
ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

if err != nil {
Expand Down Expand Up @@ -2557,73 +2537,31 @@ func (ds *DistSender) sendToReplicas(
}
}

// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given
// batch request is cross-region and cross-zone respectively.
func (ds *DistSender) isCrossRegionCrossZoneBatch(
ctx context.Context, ba *kvpb.BatchRequest,
) (bool, bool) {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID)
// getLocalityComparison takes two nodeIDs as input and returns the locality
// comparison result between their corresponding nodes. This result indicates
// whether the two nodes are located in different regions or zones.
func (ds *DistSender) getLocalityComparison(
ctx context.Context, fromNodeID roachpb.NodeID, toNodeID roachpb.NodeID,
) roachpb.LocalityComparisonType {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(fromNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
return false, false
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID)
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(toNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
return false, false
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %+v", err)
return roachpb.LocalityComparisonType_UNDEFINED
}
isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality)

comparisonResult, regionErr, zoneErr := gatewayNodeDesc.Locality.CompareWithLocality(destinationNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "%v", regionErr)
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross region %+v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "%v", zoneErr)
}
return isCrossRegion, isCrossZone
}

// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a
// more meaningful way. Cross-region metrics monitor activities across different
// regions. Cross-zone metrics monitor cross-zone activities within the same
// region or in cases where region tiers are not configured. The check result is
// returned here to avoid redundant check for metrics updates after receiving
// batch responses.
func (ds *DistSender) checkAndUpdateBatchRequestMetrics(
ctx context.Context, ba *kvpb.BatchRequest,
) (shouldIncCrossRegion bool, shouldIncCrossZone bool) {
ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size()))
isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba)
if isCrossRegion {
if !isCrossZone {
log.VEventf(ctx, 2, "unexpected: cross region but same zone")
} else {
ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossRegion = true
}
} else {
if isCrossZone {
ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossZone = true
}
}
return shouldIncCrossRegion, shouldIncCrossZone
}

// checkAndUpdateBatchResponseMetrics updates the batch response metrics based
// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These
// parameters are determined during the initial check for batch requests. The
// underlying assumption is that if requests were cross-region or cross-zone,
// the response should be as well.
func (ds *DistSender) checkAndUpdateBatchResponseMetrics(
br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool,
) {
ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size()))
if shouldIncCrossRegion {
ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}
if shouldIncCrossZone {
ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size()))
log.VEventf(ctx, 2, "unable to determine if the given nodes are cross zone %+v", zoneErr)
}
return comparisonResult
}

// getCostControllerConfig returns the config for the tenant cost model. This
Expand Down
Loading

0 comments on commit af9cbad

Please sign in to comment.