Skip to content

Commit

Permalink
net: add node_id label to client metrics
Browse files Browse the repository at this point in the history
RPC `client_probe` metrics currently leverage a labeling scheme defined
by Seastar in which, for a given metric, a set of metric labels can only
be registered once per metric. Currently, the label used is solely based
on the server address associated with a given `rpc::transport`. As such,
we currently cannot start multiple `rpc::transport`s pointed at the same
server.

This functionality could be useful though: consider when a node is
restarted empty with a new node ID. Redpanda currently has a check that
nodes being added to the controller Raft group don't overlap with
existing group members' addresses. But if we were to remove this check,
when the new node _is_ added to the Raft group, each node will try to
create a new `rpc::transport` pointing at the new node, and register
metrics with identical labels to those registered by the old node, and
be met with a Seastar `double_registration` exception.

To enable the above scenario, this commit adds the `node_id` as a label
for client metrics, and aggregates them by this label.
  • Loading branch information
andrwng committed Oct 6, 2022
1 parent 0d753b8 commit 59d5a7f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 23 deletions.
2 changes: 2 additions & 0 deletions src/v/net/client_probe.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/

#pragma once
#include "model/metadata.h"
#include "net/unresolved_address.h"
#include "rpc/logger.h"
#include "rpc/types.h"
Expand Down Expand Up @@ -70,6 +71,7 @@ class client_probe {
void setup_metrics(
ss::metrics::metric_groups& mgs,
const std::optional<rpc::connection_cache_label>& label,
const std::optional<model::node_id>& node_id,
const net::unresolved_address& target_addr);

private:
Expand Down
53 changes: 39 additions & 14 deletions src/v/net/probes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ std::ostream& operator<<(std::ostream& o, const server_probe& p) {
void client_probe::setup_metrics(
ss::metrics::metric_groups& mgs,
const std::optional<rpc::connection_cache_label>& label,
const std::optional<model::node_id>& node_id,
const net::unresolved_address& target_addr) {
namespace sm = ss::metrics;
auto target = sm::label("target");
Expand All @@ -156,80 +157,104 @@ void client_probe::setup_metrics(
if (label) {
labels.push_back(sm::label("connection_cache_label")((*label)()));
}
std::vector<sm::label> aggregate_labels;
// Label the metrics for a given server with the node ID so Seastar can
// differentiate between them, in case multiple node IDs start at the same
// address (e.g. in an ungraceful decommission). Aggregate on node ID so
// the user is presented metrics for each server regardless of node ID.
if (node_id) {
auto node_id_label = sm::label("node_id");
labels.push_back(node_id_label(*node_id));
aggregate_labels.push_back(node_id_label);
}
mgs.add_group(
prometheus_sanitize::metrics_name("rpc_client"),
{
sm::make_gauge(
"active_connections",
[this] { return _connections; },
sm::description("Currently active connections"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"connects",
[this] { return _connects; },
sm::description("Connection attempts"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"requests",
[this] { return _requests; },
sm::description("Number of requests"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_gauge(
"requests_pending",
[this] { return _requests_pending; },
sm::description("Number of requests pending"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"request_errors",
[this] { return _request_errors; },
sm::description("Number or requests errors"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"request_timeouts",
[this] { return _request_timeouts; },
sm::description("Number or requests timeouts"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"in_bytes",
[this] { return _out_bytes; },
sm::description("Total number of bytes sent (including headers)"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_total_bytes(
"out_bytes",
[this] { return _in_bytes; },
sm::description("Total number of bytes received"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"connection_errors",
[this] { return _connection_errors; },
sm::description("Number of connection errors"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"read_dispatch_errors",
[this] { return _read_dispatch_errors; },
sm::description("Number of errors while dispatching responses"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"corrupted_headers",
[this] { return _corrupted_headers; },
sm::description("Number of responses with corrupted headers"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"server_correlation_errors",
[this] { return _server_correlation_errors; },
sm::description("Number of responses with wrong correlation id"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"client_correlation_errors",
[this] { return _client_correlation_errors; },
sm::description("Number of errors in client correlation id"),
labels),
labels)
.aggregate(aggregate_labels),
sm::make_counter(
"requests_blocked_memory",
[this] { return _requests_blocked_memory; },
sm::description("Number of requests that are blocked because"
" of insufficient memory"),
labels),
labels)
.aggregate(aggregate_labels),
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/v/rpc/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ss::future<> connection_cache::emplace(
_cache.emplace(
n,
ss::make_lw_shared<rpc::reconnect_transport>(
std::move(c), std::move(backoff_policy), _label));
std::move(c), std::move(backoff_policy), _label, n));
});
}
ss::future<> connection_cache::remove(model::node_id n) {
Expand Down
9 changes: 7 additions & 2 deletions src/v/rpc/reconnect_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "model/metadata.h"
#include "outcome.h"
#include "rpc/backoff_policy.h"
#include "rpc/transport.h"
Expand All @@ -31,11 +32,15 @@ namespace rpc {
*/
class reconnect_transport {
public:
// Instantiates an underlying rpc::transport, using the given node ID (if
// provided) to distinguish client metrics that target the same server and
// to indicate that the client metrics should be aggregated by node ID.
explicit reconnect_transport(
rpc::transport_configuration c,
backoff_policy backoff_policy,
std::optional<connection_cache_label> label = std::nullopt)
: _transport(std::move(c), std::move(label))
const std::optional<connection_cache_label>& label = std::nullopt,
const std::optional<model::node_id>& node_id = std::nullopt)
: _transport(std::move(c), std::move(label), std::move(node_id))
, _backoff_policy(std::move(backoff_policy)) {}

bool is_valid() const { return _transport.is_valid(); }
Expand Down
11 changes: 7 additions & 4 deletions src/v/rpc/transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,16 @@ struct client_context_impl final : streaming_context {
};

transport::transport(
transport_configuration c, std::optional<connection_cache_label> label)
transport_configuration c,
std::optional<connection_cache_label> label,
std::optional<model::node_id> node_id)
: base_transport(base_transport::configuration{
.server_addr = std::move(c.server_addr),
.credentials = std::move(c.credentials),
})
, _memory(c.max_queued_bytes, "rpc/transport-mem") {
if (!c.disable_metrics) {
setup_metrics(label);
setup_metrics(label, node_id);
}
}

Expand Down Expand Up @@ -287,8 +289,9 @@ ss::future<> transport::dispatch(header h) {
}

void transport::setup_metrics(
const std::optional<connection_cache_label>& label) {
_probe.setup_metrics(_metrics, label, server_address());
const std::optional<connection_cache_label>& label,
const std::optional<model::node_id>& node_id) {
_probe.setup_metrics(_metrics, label, node_id, server_address());
}

transport::~transport() {
Expand Down
8 changes: 6 additions & 2 deletions src/v/rpc/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "model/metadata.h"
#include "net/transport.h"
#include "outcome.h"
#include "reflection/async_adl.h"
Expand Down Expand Up @@ -56,7 +57,8 @@ class transport final : public net::base_transport {
public:
explicit transport(
transport_configuration c,
std::optional<connection_cache_label> label = std::nullopt);
std::optional<connection_cache_label> label = std::nullopt,
std::optional<model::node_id> node_id = std::nullopt);
~transport() override;
transport(transport&&) noexcept = default;
// semaphore is not move assignable
Expand Down Expand Up @@ -93,7 +95,9 @@ class transport final : public net::base_transport {
ss::future<> do_reads();
ss::future<> dispatch(header);
void fail_outstanding_futures() noexcept final;
void setup_metrics(const std::optional<connection_cache_label>&);
void setup_metrics(
const std::optional<connection_cache_label>&,
const std::optional<model::node_id>&);

ss::future<result<std::unique_ptr<streaming_context>>>
do_send(sequence_t, netbuf, rpc::client_opts);
Expand Down

0 comments on commit 59d5a7f

Please sign in to comment.