Skip to content

Commit

Permalink
server: add cross-region byte count metrics to Node
Browse files Browse the repository at this point in the history
Previously, we introduced two new metrics to DistSender to help improve
observability of cross-region traffic on nodes. However, the issue of limited
observability on the receiver node still exists.

To solve this issue, this commit introduces two new metrics to NodeMetrics -
CrossRegionBatchRequestBytes and CrossRegionBatchResponseBytes. These metrics
track the byte count for batch requests sent and responses received in
cross-region batches within the receiver node.

Note that the node represents the destination range node, so node metrics are
updated when the node receives cross-region batches.

Fixes: cockroachdb#103983
Release note (ops change): Two new metrics - CrossRegionBatchRequestBytes,
CrossRegionBatchResponseBytes - are now added to Node metrics.
  • Loading branch information
wenyihu6 committed May 30, 2023
1 parent cdfa67c commit 2bd4d1d
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 10 deletions.
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
Expand Down
81 changes: 71 additions & 10 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ This metric is thus not an indicator of KV health.`,
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}

metaCrossRegionBatchRequest = metric.Metadata{
Name: "batch_requests.cross_region",
Help: `Total byte count of cross-region batch requests sent`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaCrossRegionBatchResponse = metric.Metadata{
Name: "batch_responses.cross_region",
Help: `Total byte count of cross-region batch requests received`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// Cluster settings.
Expand Down Expand Up @@ -182,8 +195,10 @@ type nodeMetrics struct {
Err *metric.Counter
DiskStalls *metric.Counter

BatchCount *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
BatchCount *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
CrossRegionBatchRequestBytes *metric.Counter
CrossRegionBatchResponseBytes *metric.Counter
}

func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMetrics {
Expand All @@ -194,10 +209,12 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets,
}),
Success: metric.NewCounter(metaExecSuccess),
Err: metric.NewCounter(metaExecError),
DiskStalls: metric.NewCounter(metaDiskStalls),
BatchCount: metric.NewCounter(metaInternalBatchRPCCount),
Success: metric.NewCounter(metaExecSuccess),
Err: metric.NewCounter(metaExecError),
DiskStalls: metric.NewCounter(metaDiskStalls),
BatchCount: metric.NewCounter(metaInternalBatchRPCCount),
CrossRegionBatchRequestBytes: metric.NewCounter(metaCrossRegionBatchRequest),
CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse),
}

for i := range nm.MethodCounts {
Expand Down Expand Up @@ -1235,19 +1252,60 @@ func (n *Node) batchInternal(
return br, nil
}

// incrementBatchCounters increments counters to track the batch and composite
// request methods.
func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) {
// The incrementBatchCounters function updates node metrics for batch counts,
// composite request methods, and cross-region batches.
//
// It takes a BatchRequest parameter to extract information about the
// gatewayNodeID and the byte count of the batch request. It returns (bool,
// error) indicating whether the batch request is cross-region and if any errors
// occurred during the process.
//
// Note that we are already at the destination range node here. Node first tries
// to obtain a gateway node descriptor through gossip. If it fails to obtain
// gossip or the gateway node descriptor, or if the locality of any node does
// not have a “region” key, the function returns (false, error). If no errors
// occurred, it checks if the gateway and destination nodes are in different
// regions. If they are, it updates the cross-region metrics and returns (true,
// nil); otherwise, it returns (false, nil).
//
// isCrossRegion is returned here to avoid redundant checks for cross-region
// after receiving batch responses.
func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) (bool, error) {
n.metrics.BatchCount.Inc(1)
for _, ru := range ba.Requests {
m := ru.GetInner().Method()
n.metrics.MethodCounts[m].Inc(1)
}

gossip := n.storeCfg.Gossip
if gossip == nil {
return false, errors.New("gossip is not configured")
}

gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID)
if err != nil {
return false, err
}

isCrossRegion, err := n.Descriptor.Locality.IsCrossRegion(gatewayNodeDesc.Locality)
if err != nil {
return false, err
}

if isCrossRegion {
n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
return true, nil
}

return false, nil
}

// Batch implements the kvpb.InternalServer interface.
func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
n.incrementBatchCounters(args)
isCrossRegionBatchRequest, error := n.incrementBatchCounters(args)
if error != nil {
log.Eventf(ctx, "%v", error)
}

// NB: Node.Batch is called directly for "local" calls. We don't want to
// carry the associated log tags forward as doing so makes adding additional
Expand Down Expand Up @@ -1297,6 +1355,9 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
}
br.Error = kvpb.NewError(err)
}
if isCrossRegionBatchRequest {
n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}
if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil {
n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError))
}
Expand Down
121 changes: 121 additions & 0 deletions pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -758,6 +761,124 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) {
require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr)
}

// TestNodeCrossRegionBatchMetrics verifies that node.Batch() correctly updates
// the cross-region byte count metrics for sent batch requests and received
// batch responses.
func TestNodeCrossRegionBatchMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// The initial setup ensures the correct configuration for three nodes (with
// different localities), single-range, and three replicas (on different
// nodes). The zone configuration is also set up so that the leaseholder for
// the range resides on server[1].
const numNodes = 3
const leaseServerIdx = 1
serverArgs := make(map[int]base.TestServerArgs)
regions := [numNodes]int{1, 2, 3}
zcfg := zonepb.DefaultZoneConfig()
zcfg.NumReplicas = proto.Int32(1)

for i := 0; i < numNodes; i++ {
serverArgs[i] = base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{
Key: "region", Value: fmt.Sprintf("us-east-%v", regions[i]),
},
},
},
Knobs: base.TestingKnobs{
Server: &TestingKnobs{
DefaultZoneConfigOverride: &zcfg,
},
},
}
}

ctx := context.Background()
var clusterArgs = base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: serverArgs,
}

tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs)
defer tc.Stopper().Stop(ctx)

testKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, testKey, tc.Target(1))
tc.AddVotersOrFatal(t, testKey, tc.Target(2))
desc := tc.LookupRangeOrFatal(t, testKey)
tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(leaseServerIdx))

testutils.SucceedsSoon(t, func() (err error) {
for i := 0; i < tc.NumServers(); i++ {
err = tc.Server(i).GetStores().(*kvserver.Stores).VisitStores(func(store *kvserver.Store) error {
return store.ForceReplicationScanAndProcess()
})
}
return err
})

leaseHolder, _ := tc.FindRangeLeaseHolder(desc, nil)
actualLeaseHolderNodeID := leaseHolder.NodeID
expectedLeaseHolderNodeID := tc.Server(leaseServerIdx).NodeID()

// Make sure the leaseholder of the range is now at server[1].
require.Equal(t, expectedLeaseHolderNodeID, actualLeaseHolderNodeID,
fmt.Sprintf("expected leaseholder to be at %v but is at %v", expectedLeaseHolderNodeID, actualLeaseHolderNodeID),
)

type metricsPair struct {
request int64
response int64
}

// Record the node metrics before sending the requests for comparison.
serversMetricsBeforeRequest := make(map[int]metricsPair)
for i := 0; i < tc.NumServers(); i++ {
n := tc.Server(i).(*TestServer).GetNode()
serversMetricsBeforeRequest[i] = metricsPair{
request: n.metrics.CrossRegionBatchRequestBytes.Count(),
response: n.metrics.CrossRegionBatchResponseBytes.Count(),
}
}

// The request is sent from server[0] to the leaseholder (server[1]) to
// enforce a cross-region batch request / response. It is expected that the
// receiver node metrics, server[1], will be updated, while the node metrics
// at server[0] and server[2] remain unchanged.
get := &kvpb.GetRequest{
RequestHeader: kvpb.RequestHeader{Key: testKey},
}

if _, error := kv.SendWrappedWith(
context.Background(), tc.Server(0).DistSenderI().(*kvcoord.DistSender), kvpb.Header{RoutingPolicy: kvpb.RoutingPolicy_LEASEHOLDER}, get,
); error != nil {
t.Fatal(error)
}

for i := 0; i < tc.NumServers(); i++ {
n := tc.Server(i).(*TestServer).GetNode()
requestDiff := n.metrics.CrossRegionBatchRequestBytes.Count() - serversMetricsBeforeRequest[i].request
responseDiff := n.metrics.CrossRegionBatchResponseBytes.Count() - serversMetricsBeforeRequest[i].response

if i == leaseServerIdx {
// Nodes with the leaseholder receives cross-region batches and should be updated.
require.Greater(t, requestDiff, int64(0),
fmt.Sprintf("expected cross-region bytes sent to be > 0 but got %v", requestDiff))
require.Greater(t, responseDiff, int64(0),
fmt.Sprintf("expected cross-region bytes received be > 0 but got %v", responseDiff))
} else {
// Other nodes metrics should remain unchanged.
require.Equal(t, int64(0), requestDiff,
fmt.Sprintf("expected cross-region bytes sent to be 0 but got %v", requestDiff))
require.Equal(t, int64(0), responseDiff,
fmt.Sprintf("expected cross-region bytes received be 0 but got %v", responseDiff))
}
}
}

func TestGetTenantWeights(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 2bd4d1d

Please sign in to comment.