Skip to content

Commit

Permalink
Merge #126150
Browse files Browse the repository at this point in the history
126150: rpc: add locality to dialing r=jeffswenson a=andrewbaptist

Previously RPC metrics were broken down by individual remote node and class if `server.child_metrics.enabled` was set to true. This set of commits changes the behavior to publish the RPC network metrics by locality instead. 

Epic: CRDB-41138

Release note (ops change): Adds three new network tracking metrics. `rpc.connection.connected` is the number of rRPC TCP level connection established to remote nodes. `rpc.client.bytes.egress` is the number of TCP bytes sent via gRPC on connections we initiated. `rpc.client.bytes.ingress` is the number of TCP bytes received via gRPC on connections we initiated.

Co-authored-by: Andrew Baptist <[email protected]>
  • Loading branch information
craig[bot] and andrewbaptist committed Aug 14, 2024
2 parents e02199c + 23f9b47 commit 667785c
Show file tree
Hide file tree
Showing 40 changed files with 559 additions and 273 deletions.
3 changes: 3 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,10 @@
<tr><td>APPLICATION</td><td>physical_replication.sst_bytes</td><td>SST bytes (compressed) sent to KV by all replication jobs</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>requests.slow.distsender</td><td>Number of range-bound RPCs currently stuck or retrying for a long time.<br/><br/>Note that this is not a good signal for KV health. The remote side of the<br/>RPCs tracked here may experience contention, so an end user can easily<br/>cause values for this metric to be emitted by leaving a transaction open<br/>for a long time and contending with it using a second transaction.</td><td>Requests</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>round-trip-latency</td><td>Distribution of round-trip latencies with other nodes.<br/><br/>This only reflects successful heartbeats and measures gRPC overhead as well as<br/>possible head-of-line blocking. Elevated values in this metric may hint at<br/>network issues and/or saturation, but they are no proof of them. CPU overload<br/>can similarly elevate this metric. The operator should look towards OS-level<br/>metrics such as packet loss, retransmits, etc, to conclusively diagnose network<br/>issues. Heartbeats are not very frequent (~seconds), so they may not capture<br/>rare or short-lived degradations.<br/></td><td>Round-trip time</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.client.bytes.egress</td><td>Counter of TCP bytes sent via gRPC on connections we initiated.</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.client.bytes.ingress</td><td>Counter of TCP bytes received via gRPC on connections we initiated.</td><td>Bytes</td><td>COUNTER</td><td>BYTES</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.avg_round_trip_latency</td><td>Sum of exponentially weighted moving average of round-trip latencies, as measured through a gRPC RPC.<br/><br/>Dividing this Gauge by rpc.connection.healthy gives an approximation of average<br/>latency, but the top-level round-trip-latency histogram is more useful. Instead,<br/>users should consult the label families of this metric if they are available<br/>(which requires prometheus and the cluster setting &#39;server.child_metrics.enabled&#39;);<br/>these provide per-peer moving averages.<br/><br/>This metric does not track failed connection. A failed connection&#39;s contribution<br/>is reset to zero.<br/></td><td>Latency</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.connected</td><td>Counter of TCP level connected connections.<br/><br/>This metric is the number of gRPC connections from the TCP level. Unlike rpc.connection.healthy<br/>this metric does not take into account whether the application has been able to heartbeat<br/>over this connection.<br/></td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.failures</td><td>Counter of failed connections.<br/><br/>This includes both the event in which a healthy connection terminates as well as<br/>unsuccessful reconnection attempts.<br/><br/>Connections that are terminated as part of local node shutdown are excluded.<br/>Decommissioned peers are excluded.<br/></td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.healthy</td><td>Gauge of current connections in a healthy state (i.e. bidirectionally connected and heartbeating)</td><td>Connections</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>rpc.connection.healthy_nanos</td><td>Gauge of nanoseconds of healthy connection time<br/><br/>On the prometheus endpoint scraped with the cluster setting &#39;server.child_metrics.enabled&#39; set,<br/>the constituent parts of this metric are available on a per-peer basis and one can read off<br/>for how long a given peer has been connected</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 1 addition & 1 deletion pkg/acceptance/localcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (n *Node) StatusClient(ctx context.Context) serverpb.StatusClient {
return existingClient
}

conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr()).Connect(ctx)
conn, err := n.rpcCtx.GRPCUnvalidatedDial(n.RPCAddr(), roachpb.Locality{}).Connect(ctx)
if err != nil {
log.Fatalf(context.Background(), "failed to initialize status client: %s", err)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/blobs/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ func setUpService(
require.NoError(t, err)

localDialer := nodedialer.New(rpcContext,
func(nodeID roachpb.NodeID) (net.Addr, error) {
func(nodeID roachpb.NodeID) (net.Addr, roachpb.Locality, error) {
if nodeID == remoteNodeID {
return ln.Addr(), nil
return ln.Addr(), roachpb.Locality{}, nil
} else if nodeID == localNodeID {
return ln2.Addr(), nil
return ln2.Addr(), roachpb.Locality{}, nil
}
return nil, errors.Errorf("node %d not found", nodeID)
return nil, roachpb.Locality{}, errors.Errorf("node %d not found", nodeID)
},
)
localNodeIDContainer := &base.NodeIDContainer{}
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/serverccl/statusccl/tenant_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -33,6 +34,8 @@ func TestTenantGRPCServices(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t, "test can time out under stress")

ctx := context.Background()

testCluster := serverutils.StartCluster(t, 3, base.TestClusterArgs{
Expand Down Expand Up @@ -121,7 +124,7 @@ func TestTenantGRPCServices(t *testing.T) {
rpcCtx := tenant2.RPCContext()

nodeID := roachpb.NodeID(tenant.SQLInstanceID())
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, rpc.DefaultClass).Connect(ctx)
conn, err := rpcCtx.GRPCDialNode(grpcAddr, nodeID, roachpb.Locality{}, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)

client := serverpb.NewStatusClient(conn)
Expand All @@ -135,7 +138,7 @@ func TestTenantGRPCServices(t *testing.T) {
grpcAddr := server.RPCAddr()
rpcCtx := tenant.RPCContext()

conn, err := rpcCtx.GRPCDialNode(grpcAddr, server.NodeID(), rpc.DefaultClass).Connect(ctx)
conn, err := rpcCtx.GRPCDialNode(grpcAddr, server.NodeID(), roachpb.Locality{}, rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)

client := serverpb.NewStatusClient(conn)
Expand Down
17 changes: 15 additions & 2 deletions pkg/gossip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type client struct {
peerID roachpb.NodeID
resolvedPlaceholder bool // Whether we've resolved the nodeSet's placeholder for this client
addr net.Addr // Peer node network address
locality roachpb.Locality // Peer node locality (if known)
forwardAddr *util.UnresolvedAddr // Set if disconnected with an alternate addr
prevHighWaterStamps map[roachpb.NodeID]int64 // Last high water timestamps sent to remote server
remoteHighWaterStamps map[roachpb.NodeID]int64 // Remote server's high water timestamps
Expand All @@ -56,11 +57,14 @@ func extractKeys(delta map[string]*Info) string {
}

// newClient creates and returns a client struct.
func newClient(ambient log.AmbientContext, addr net.Addr, nodeMetrics Metrics) *client {
func newClient(
ambient log.AmbientContext, addr net.Addr, locality roachpb.Locality, nodeMetrics Metrics,
) *client {
return &client{
AmbientContext: ambient,
createdAt: timeutil.Now(),
addr: addr,
locality: locality,
remoteHighWaterStamps: map[roachpb.NodeID]int64{},
closer: make(chan struct{}),
clientMetrics: makeMetrics(),
Expand Down Expand Up @@ -104,7 +108,16 @@ func (c *client) startLocked(
// asynchronous from the caller's perspective, so the only effect of
// `WithBlock` here is blocking shutdown - at the time of this writing,
// that ends ups up making `kv` tests take twice as long.
conn, err := rpcCtx.GRPCUnvalidatedDial(c.addr.String()).Connect(ctx)
var connection *rpc.Connection
if c.peerID != 0 {
connection = rpcCtx.GRPCDialNode(c.addr.String(), c.peerID, c.locality, rpc.SystemClass)
} else {
// TODO(baptist): Use this as a temporary connection for getting
// onto gossip and then replace with a validated connection.
log.Infof(ctx, "unvalidated bootstrap gossip dial to %s", c.addr)
connection = rpcCtx.GRPCUnvalidatedDial(c.addr.String(), c.locality)
}
conn, err := connection.Connect(ctx)
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/gossip/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestClientGossip(t *testing.T) {
local, _ := startGossip(clusterID, 1, stopper, t, metric.NewRegistry())
remote, _ := startGossip(clusterID, 2, stopper, t, metric.NewRegistry())
disconnected := make(chan *client, 1)
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), remote.GetNodeAddr(), makeMetrics())
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), remote.GetNodeAddr(), roachpb.Locality{}, makeMetrics())

defer func() {
stopper.Stop(ctx)
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestClientGossipMetrics(t *testing.T) {
gossipSucceedsSoon(
t, stopper, clusterID, make(chan *client, 2),
map[*client]*Gossip{
newClient(log.MakeTestingAmbientCtxWithNewTracer(), local.GetNodeAddr(), remote.nodeMetrics): remote,
newClient(log.MakeTestingAmbientCtxWithNewTracer(), local.GetNodeAddr(), roachpb.Locality{}, remote.nodeMetrics): remote,
},
func() error {
// Infos/Bytes Sent/Received should not be zero.
Expand Down Expand Up @@ -295,7 +295,7 @@ func TestClientNodeID(t *testing.T) {
// Use an insecure context. We're talking to tcp socket which are not in the certs.
rpcContext := rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)

c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, makeMetrics())
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, roachpb.Locality{}, makeMetrics())
disconnected <- c

defer func() {
Expand Down Expand Up @@ -342,7 +342,7 @@ func TestClientDisconnectLoopback(t *testing.T) {
local, localCtx := startGossip(uuid.Nil, 1, stopper, t, metric.NewRegistry())
local.mu.Lock()
lAddr := local.mu.is.NodeAddr
local.startClientLocked(lAddr, localCtx)
local.startClientLocked(lAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
local.manage(localCtx)
testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestClientDisconnectRedundant(t *testing.T) {
// Restart the client connection in the loop. It might have failed due to
// a heartbeat time.
local.mu.Lock()
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
return fmt.Errorf("unable to find local to remote client")
}
Expand All @@ -399,7 +399,7 @@ func TestClientDisconnectRedundant(t *testing.T) {
// Start a remote to local client. This client will get removed as being
// redundant as there is already a connection between the two nodes.
remote.mu.Lock()
remote.startClientLocked(lAddr, remoteCtx)
remote.startClientLocked(lAddr, roachpb.Locality{}, remoteCtx)
remote.mu.Unlock()

testutils.SucceedsSoon(t, func() error {
Expand Down Expand Up @@ -434,8 +434,8 @@ func TestClientDisallowMultipleConns(t *testing.T) {
// Start two clients from local to remote. RPC client cache is
// disabled via the context, so we'll start two different outgoing
// connections.
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.startClientLocked(rAddr, roachpb.Locality{}, localCtx)
local.mu.Unlock()
remote.mu.Unlock()
local.manage(localCtx)
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestClientForwardUnresolved(t *testing.T) {
local, _ := startGossip(uuid.Nil, nodeID, stopper, t, metric.NewRegistry())
addr := local.GetNodeAddr()

client := newClient(log.MakeTestingAmbientCtxWithNewTracer(), addr, makeMetrics()) // never started
client := newClient(log.MakeTestingAmbientCtxWithNewTracer(), addr, roachpb.Locality{}, makeMetrics()) // never started

newAddr := util.UnresolvedAddr{
NetworkField: "tcp",
Expand Down Expand Up @@ -564,15 +564,15 @@ func TestClientSendsHighStampsDiff(t *testing.T) {
rpc.NewInsecureTestingContextWithClusterID(ctx, clock, stopper, clusterID)

// Create a client and let it connect to the remote address.
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, makeMetrics())
c := newClient(log.MakeTestingAmbientCtxWithNewTracer(), &remote.nodeAddr, roachpb.Locality{}, makeMetrics())
disconnected <- c

ctxNew, cancel := context.WithCancel(c.AnnotateCtx(context.Background()))
defer func() {
cancel()
}()

conn, err := rCtx.GRPCUnvalidatedDial(c.addr.String()).Connect(ctxNew)
conn, err := rCtx.GRPCUnvalidatedDial(c.addr.String(), roachpb.Locality{}).Connect(ctxNew)
require.NoError(t, err)

stream, err := NewGossipClient(conn).Gossip(ctx)
Expand Down
Loading

0 comments on commit 667785c

Please sign in to comment.