Skip to content

Commit

Permalink
fix(net): Fix a potential hang caused by accessing the address book d…
Browse files Browse the repository at this point in the history
…irectly (#7902)

* Fix a potential hang accessing the address book directly

* Remove unused connection shutdown MetaAddr arguments

* Add an UpdateConnected MetaAddrChange, that sends initial connection info

* Fix some tests

* Fix a panic with a zero channel size
  • Loading branch information
teor2345 authored Nov 5, 2023
1 parent f836f7f commit 43e54d1
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 128 deletions.
4 changes: 1 addition & 3 deletions zebra-network/src/address_book/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ fn address_book_peer_order() {
fn reconnection_peers_skips_recently_updated_ip() {
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
// - `last_response`
test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK)
});
test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded);

// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
// - `last_attempt`
Expand Down
10 changes: 8 additions & 2 deletions zebra-network/src/address_book_updater.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! The timestamp collector collects liveness information from peers.
use std::{net::SocketAddr, sync::Arc};
use std::{cmp::max, net::SocketAddr, sync::Arc};

use thiserror::Error;
use tokio::{
Expand All @@ -13,6 +13,9 @@ use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};

/// The minimum size of the address book updater channel.
pub const MIN_CHANNEL_SIZE: usize = 10;

/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or
Expand Down Expand Up @@ -46,7 +49,10 @@ impl AddressBookUpdater {
) {
// Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers.
let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
let (worker_tx, mut worker_rx) = mpsc::channel(max(
config.peerset_total_connection_limit(),
MIN_CHANNEL_SIZE,
));

let address_book = AddressBook::new(
local_listener,
Expand Down
103 changes: 63 additions & 40 deletions zebra-network/src/meta_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,23 @@ pub enum MetaAddrChange {
addr: PeerSocketAddr,
},

/// Updates an existing `MetaAddr` when we've made a successful connection with a peer.
UpdateConnected {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
services: PeerServices,
},

/// Updates an existing `MetaAddr` when a peer responds with a message.
UpdateResponded {
#[cfg_attr(
any(test, feature = "proptest-impl"),
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
services: PeerServices,
},

/// Updates an existing `MetaAddr` when a peer fails.
Expand Down Expand Up @@ -345,25 +354,42 @@ impl MetaAddr {
})
}

/// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
/// sent us a message.
/// Returns a [`MetaAddrChange::UpdateConnected`] for a peer that has just successfully
/// connected.
///
/// # Security
///
/// This address must be the remote address from an outbound connection,
/// and the services must be the services from that peer's handshake.
///
/// Otherwise:
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) state,
/// or
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
UpdateResponded {
pub fn new_connected(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
UpdateConnected {
addr: canonical_peer_addr(*addr),
services: *services,
}
}

/// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
/// sent us a message.
///
/// # Security
///
/// This address must be the remote address from an outbound connection.
///
/// Otherwise:
/// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
/// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange {
UpdateResponded {
addr: canonical_peer_addr(*addr),
}
}

/// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we
/// want to make an outbound connection to.
pub fn new_reconnect(addr: PeerSocketAddr) -> MetaAddrChange {
Expand Down Expand Up @@ -391,8 +417,7 @@ impl MetaAddr {
}
}

/// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had
/// an error.
/// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had an error.
pub fn new_errored(
addr: PeerSocketAddr,
services: impl Into<Option<PeerServices>>,
Expand All @@ -404,13 +429,10 @@ impl MetaAddr {
}

/// Create a new `MetaAddr` for a peer that has just shut down.
pub fn new_shutdown(
addr: PeerSocketAddr,
services: impl Into<Option<PeerServices>>,
) -> MetaAddrChange {
pub fn new_shutdown(addr: PeerSocketAddr) -> MetaAddrChange {
// TODO: if the peer shut down in the Responded state, preserve that
// state. All other states should be treated as (timeout) errors.
MetaAddr::new_errored(addr, services.into())
MetaAddr::new_errored(addr, None)
}

/// Return the address for this `MetaAddr`.
Expand Down Expand Up @@ -696,6 +718,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr,
}
Expand All @@ -712,6 +735,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
| UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr = new_addr,
}
Expand All @@ -721,17 +745,18 @@ impl MetaAddrChange {
pub fn untrusted_services(&self) -> Option<PeerServices> {
match self {
NewInitial { .. } => None,
// TODO: split untrusted and direct services (#2324)
NewGossiped {
untrusted_services, ..
} => Some(*untrusted_services),
NewAlternate {
}
| NewAlternate {
untrusted_services, ..
} => Some(*untrusted_services),
// TODO: create a "services implemented by Zebra" constant (#2324)
NewLocal { .. } => Some(PeerServices::NODE_NETWORK),
UpdateAttempt { .. } => None,
// TODO: split untrusted and direct services (#2324)
UpdateResponded { services, .. } => Some(*services),
UpdateConnected { services, .. } => Some(*services),
UpdateResponded { .. } => None,
UpdateFailed { services, .. } => *services,
}
}
Expand All @@ -747,9 +772,10 @@ impl MetaAddrChange {
NewAlternate { .. } => None,
// We know that our local listener is available
NewLocal { .. } => Some(now),
UpdateAttempt { .. } => None,
UpdateResponded { .. } => None,
UpdateFailed { .. } => None,
UpdateAttempt { .. }
| UpdateConnected { .. }
| UpdateResponded { .. }
| UpdateFailed { .. } => None,
}
}

Expand All @@ -775,46 +801,43 @@ impl MetaAddrChange {
/// Return the last attempt for this change, if available.
pub fn last_attempt(&self, now: Instant) -> Option<Instant> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => None,
// Attempt changes are applied before we start the handshake to the
// peer address. So the attempt time is a lower bound for the actual
// handshake time.
UpdateAttempt { .. } => Some(now),
UpdateResponded { .. } => None,
UpdateFailed { .. } => None,
UpdateConnected { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None,
}
}

/// Return the last response for this change, if available.
pub fn last_response(&self, now: DateTime32) -> Option<DateTime32> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
UpdateAttempt { .. } => None,
NewInitial { .. }
| NewGossiped { .. }
| NewAlternate { .. }
| NewLocal { .. }
| UpdateAttempt { .. } => None,
// If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` state for longer,
// - we might send outdated last seen times to our peers, and
// - the peer will appear to be live for longer, delaying future
// reconnection attempts.
UpdateResponded { .. } => Some(now),
UpdateConnected { .. } | UpdateResponded { .. } => Some(now),
UpdateFailed { .. } => None,
}
}

/// Return the last failure for this change, if available.
pub fn last_failure(&self, now: Instant) -> Option<Instant> {
match self {
NewInitial { .. } => None,
NewGossiped { .. } => None,
NewAlternate { .. } => None,
NewLocal { .. } => None,
UpdateAttempt { .. } => None,
UpdateResponded { .. } => None,
NewInitial { .. }
| NewGossiped { .. }
| NewAlternate { .. }
| NewLocal { .. }
| UpdateAttempt { .. }
| UpdateConnected { .. }
| UpdateResponded { .. } => None,
// If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` or `Responded`
// states for longer, and
Expand All @@ -833,7 +856,7 @@ impl MetaAddrChange {
// local listeners get sanitized, so the state doesn't matter here
NewLocal { .. } => NeverAttemptedGossiped,
UpdateAttempt { .. } => AttemptPending,
UpdateResponded { .. } => Responded,
UpdateConnected { .. } | UpdateResponded { .. } => Responded,
UpdateFailed { .. } => Failed,
}
}
Expand Down
14 changes: 7 additions & 7 deletions zebra-network/src/meta_addr/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn recently_responded_peer_is_gossipable() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand All @@ -191,7 +191,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -222,7 +222,7 @@ fn responded_long_ago_peer_is_not_gossipable() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -253,7 +253,7 @@ fn long_delayed_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -297,7 +297,7 @@ fn later_revert_change_is_applied() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -340,7 +340,7 @@ fn concurrent_state_revert_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down Expand Up @@ -400,7 +400,7 @@ fn concurrent_state_progress_change_is_applied() {
.into_new_meta_addr(instant_now, local_now);

// Create a peer that has responded
let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");

Expand Down
Loading

0 comments on commit 43e54d1

Please sign in to comment.