diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 36bb4755b8ab..91dfc402e9cd 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -462,6 +462,7 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvclient/kvtenant", "//pkg/kv/kvpb", "//pkg/kv/kvserver", diff --git a/pkg/server/node.go b/pkg/server/node.go index 269533ab8f7d..73e88d311ffe 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -142,6 +142,19 @@ This metric is thus not an indicator of KV health.`, Measurement: "Batches", Unit: metric.Unit_COUNT, } + + metaCrossRegionBatchRequest = metric.Metadata{ + Name: "batch_requests.cross_region", + Help: `Total byte count of cross-region batch requests sent`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaCrossRegionBatchResponse = metric.Metadata{ + Name: "batch_responses.cross_region", + Help: `Total byte count of cross-region batch requests received`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // Cluster settings. @@ -182,8 +195,10 @@ type nodeMetrics struct { Err *metric.Counter DiskStalls *metric.Counter - BatchCount *metric.Counter - MethodCounts [kvpb.NumMethods]*metric.Counter + BatchCount *metric.Counter + MethodCounts [kvpb.NumMethods]*metric.Counter + CrossRegionBatchRequestBytes *metric.Counter + CrossRegionBatchResponseBytes *metric.Counter } func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMetrics { @@ -194,10 +209,12 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe Duration: histogramWindow, Buckets: metric.IOLatencyBuckets, }), - Success: metric.NewCounter(metaExecSuccess), - Err: metric.NewCounter(metaExecError), - DiskStalls: metric.NewCounter(metaDiskStalls), - BatchCount: metric.NewCounter(metaInternalBatchRPCCount), + Success: metric.NewCounter(metaExecSuccess), + Err: metric.NewCounter(metaExecError), + DiskStalls: metric.NewCounter(metaDiskStalls), + BatchCount: metric.NewCounter(metaInternalBatchRPCCount), + CrossRegionBatchRequestBytes: metric.NewCounter(metaCrossRegionBatchRequest), + CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse), } for i := range nm.MethodCounts { @@ -1235,19 +1252,60 @@ func (n *Node) batchInternal( return br, nil } -// incrementBatchCounters increments counters to track the batch and composite -// request methods. -func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) { +// The incrementBatchCounters function updates node metrics for batch counts, +// composite request methods, and cross-region batches. +// +// It takes a BatchRequest parameter to extract information about the +// gatewayNodeID and the byte count of the batch request. It returns (bool, +// error) indicating whether the batch request is cross-region and if any errors +// occurred during the process. +// +// Note that we are already at the destination range node here. Node first tries +// to obtain a gateway node descriptor through gossip. If it fails to obtain +// gossip or the gateway node descriptor, or if the locality of any node does +// not have a “region” key, the function returns (false, error). If no errors +// occurred, it checks if the gateway and destination nodes are in different +// regions. If they are, it updates the cross-region metrics and returns (true, +// nil); otherwise, it returns (false, nil). +// +// isCrossRegion is returned here to avoid redundant checks for cross-region +// after receiving batch responses. +func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) (bool, error) { n.metrics.BatchCount.Inc(1) for _, ru := range ba.Requests { m := ru.GetInner().Method() n.metrics.MethodCounts[m].Inc(1) } + + gossip := n.storeCfg.Gossip + if gossip == nil { + return false, errors.New("gossip is not configured") + } + + gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID) + if err != nil { + return false, err + } + + isCrossRegion, err := n.Descriptor.Locality.IsCrossRegion(gatewayNodeDesc.Locality) + if err != nil { + return false, err + } + + if isCrossRegion { + n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) + return true, nil + } + + return false, nil } // Batch implements the kvpb.InternalServer interface. func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - n.incrementBatchCounters(args) + isCrossRegionBatchRequest, error := n.incrementBatchCounters(args) + if error != nil { + log.Eventf(ctx, "%v", error) + } // NB: Node.Batch is called directly for "local" calls. We don't want to // carry the associated log tags forward as doing so makes adding additional @@ -1297,6 +1355,9 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR } br.Error = kvpb.NewError(err) } + if isCrossRegionBatchRequest { + n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) + } if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil { n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError)) } diff --git a/pkg/server/node_test.go b/pkg/server/node_test.go index 781e6ca7e578..81c99cf91d1f 100644 --- a/pkg/server/node_test.go +++ b/pkg/server/node_test.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" @@ -41,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" + "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" ) @@ -758,6 +761,124 @@ func TestNodeBatchRequestMetricsInc(t *testing.T) { require.GreaterOrEqual(t, n.metrics.MethodCounts[kvpb.Put].Count(), putCurr) } +// TestNodeCrossRegionBatchMetrics verifies that node.Batch() correctly updates +// the cross-region byte count metrics for sent batch requests and received +// batch responses. +func TestNodeCrossRegionBatchMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // The initial setup ensures the correct configuration for three nodes (with + // different localities), single-range, and three replicas (on different + // nodes). The zone configuration is also set up so that the leaseholder for + // the range resides on server[1]. + const numNodes = 3 + const leaseServerIdx = 1 + serverArgs := make(map[int]base.TestServerArgs) + regions := [numNodes]int{1, 2, 3} + zcfg := zonepb.DefaultZoneConfig() + zcfg.NumReplicas = proto.Int32(1) + + for i := 0; i < numNodes; i++ { + serverArgs[i] = base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", Value: fmt.Sprintf("us-east-%v", regions[i]), + }, + }, + }, + Knobs: base.TestingKnobs{ + Server: &TestingKnobs{ + DefaultZoneConfigOverride: &zcfg, + }, + }, + } + } + + ctx := context.Background() + var clusterArgs = base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: serverArgs, + } + + tc := serverutils.StartNewTestCluster(t, numNodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + + testKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, testKey, tc.Target(1)) + tc.AddVotersOrFatal(t, testKey, tc.Target(2)) + desc := tc.LookupRangeOrFatal(t, testKey) + tc.TransferRangeLeaseOrFatal(t, desc, tc.Target(leaseServerIdx)) + + testutils.SucceedsSoon(t, func() (err error) { + for i := 0; i < tc.NumServers(); i++ { + err = tc.Server(i).GetStores().(*kvserver.Stores).VisitStores(func(store *kvserver.Store) error { + return store.ForceReplicationScanAndProcess() + }) + } + return err + }) + + leaseHolder, _ := tc.FindRangeLeaseHolder(desc, nil) + actualLeaseHolderNodeID := leaseHolder.NodeID + expectedLeaseHolderNodeID := tc.Server(leaseServerIdx).NodeID() + + // Make sure the leaseholder of the range is now at server[1]. + require.Equal(t, expectedLeaseHolderNodeID, actualLeaseHolderNodeID, + fmt.Sprintf("expected leaseholder to be at %v but is at %v", expectedLeaseHolderNodeID, actualLeaseHolderNodeID), + ) + + type metricsPair struct { + request int64 + response int64 + } + + // Record the node metrics before sending the requests for comparison. + serversMetricsBeforeRequest := make(map[int]metricsPair) + for i := 0; i < tc.NumServers(); i++ { + n := tc.Server(i).(*TestServer).GetNode() + serversMetricsBeforeRequest[i] = metricsPair{ + request: n.metrics.CrossRegionBatchRequestBytes.Count(), + response: n.metrics.CrossRegionBatchResponseBytes.Count(), + } + } + + // The request is sent from server[0] to the leaseholder (server[1]) to + // enforce a cross-region batch request / response. It is expected that the + // receiver node metrics, server[1], will be updated, while the node metrics + // at server[0] and server[2] remain unchanged. + get := &kvpb.GetRequest{ + RequestHeader: kvpb.RequestHeader{Key: testKey}, + } + + if _, error := kv.SendWrappedWith( + context.Background(), tc.Server(0).DistSenderI().(*kvcoord.DistSender), kvpb.Header{RoutingPolicy: kvpb.RoutingPolicy_LEASEHOLDER}, get, + ); error != nil { + t.Fatal(error) + } + + for i := 0; i < tc.NumServers(); i++ { + n := tc.Server(i).(*TestServer).GetNode() + requestDiff := n.metrics.CrossRegionBatchRequestBytes.Count() - serversMetricsBeforeRequest[i].request + responseDiff := n.metrics.CrossRegionBatchResponseBytes.Count() - serversMetricsBeforeRequest[i].response + + if i == leaseServerIdx { + // Nodes with the leaseholder receives cross-region batches and should be updated. + require.Greater(t, requestDiff, int64(0), + fmt.Sprintf("expected cross-region bytes sent to be > 0 but got %v", requestDiff)) + require.Greater(t, responseDiff, int64(0), + fmt.Sprintf("expected cross-region bytes received be > 0 but got %v", responseDiff)) + } else { + // Other nodes metrics should remain unchanged. + require.Equal(t, int64(0), requestDiff, + fmt.Sprintf("expected cross-region bytes sent to be 0 but got %v", requestDiff)) + require.Equal(t, int64(0), responseDiff, + fmt.Sprintf("expected cross-region bytes received be 0 but got %v", responseDiff)) + } + } +} + func TestGetTenantWeights(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)