Skip to content

Commit

Permalink
Add connection error metrics (#31049)
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge authored Apr 5, 2023
1 parent 6849018 commit e575650
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 77 deletions.
198 changes: 121 additions & 77 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,90 +461,96 @@ async fn setup_connection(
wait_for_chunk_timeout: Duration,
) {
const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
let from = connecting.remote_address();
if let Ok(connecting_result) = timeout(QUIC_CONNECTION_HANDSHAKE_TIMEOUT, connecting).await {
if let Ok(new_connection) = connecting_result {
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);

let params = get_connection_stake(&new_connection, &staked_nodes).map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
},
);

if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}
match connecting_result {
Ok(new_connection) => {
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);

let params = get_connection_stake(&new_connection, &staked_nodes).map_or(
NewConnectionHandlerParams::new_unstaked(
packet_sender.clone(),
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
}
},
);

if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}
} else {
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);

if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
}
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.fetch_add(1, Ordering::Relaxed);
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.fetch_add(1, Ordering::Relaxed);
}
}
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
}
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.lock().unwrap(),
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
}
} else {
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
Err(e) => {
handle_connection_error(e, &stats, from);
}
}
} else {
stats
Expand All @@ -553,6 +559,44 @@ async fn setup_connection(
}
}

fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamStats, from: SocketAddr) {
debug!("error: {:?} from: {:?}", e, from);
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
match e {
quinn::ConnectionError::TimedOut => {
stats
.connection_setup_error_timed_out
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::ConnectionClosed(_) => {
stats
.connection_setup_error_closed
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::TransportError(_) => {
stats
.connection_setup_error_transport
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::ApplicationClosed(_) => {
stats
.connection_setup_error_app_closed
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::Reset => {
stats
.connection_setup_error_reset
.fetch_add(1, Ordering::Relaxed);
}
quinn::ConnectionError::LocallyClosed => {
stats
.connection_setup_error_locally_closed
.fetch_add(1, Ordering::Relaxed);
}
_ => {}
}
}

async fn packet_batch_sender(
packet_sender: Sender<PacketBatch>,
packet_receiver: AsyncReceiver<PacketAccumulator>,
Expand Down
41 changes: 41 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ pub struct StreamStats {
pub(crate) connection_add_failed_on_pruning: AtomicUsize,
pub(crate) connection_setup_timeout: AtomicUsize,
pub(crate) connection_setup_error: AtomicUsize,
pub(crate) connection_setup_error_closed: AtomicUsize,
pub(crate) connection_setup_error_timed_out: AtomicUsize,
pub(crate) connection_setup_error_transport: AtomicUsize,
pub(crate) connection_setup_error_app_closed: AtomicUsize,
pub(crate) connection_setup_error_reset: AtomicUsize,
pub(crate) connection_setup_error_locally_closed: AtomicUsize,
pub(crate) connection_removed: AtomicUsize,
pub(crate) connection_remove_failed: AtomicUsize,
}
Expand Down Expand Up @@ -242,6 +248,41 @@ impl StreamStats {
self.connection_setup_error.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_timed_out",
self.connection_setup_error_timed_out
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_closed",
self.connection_setup_error_closed
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_transport",
self.connection_setup_error_transport
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_app_closed",
self.connection_setup_error_app_closed
.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_reset",
self.connection_setup_error_reset.swap(0, Ordering::Relaxed),
i64
),
(
"connection_setup_error_locally_closed",
self.connection_setup_error_locally_closed
.swap(0, Ordering::Relaxed),
i64
),
(
"invalid_chunk",
self.total_invalid_chunks.swap(0, Ordering::Relaxed),
Expand Down

0 comments on commit e575650

Please sign in to comment.