diff --git a/zebra-network/src/address_book/tests/vectors.rs b/zebra-network/src/address_book/tests/vectors.rs
index e401e6a5de3..16a9544429c 100644
--- a/zebra-network/src/address_book/tests/vectors.rs
+++ b/zebra-network/src/address_book/tests/vectors.rs
@@ -128,9 +128,7 @@ fn address_book_peer_order() {
fn reconnection_peers_skips_recently_updated_ip() {
// tests that reconnection_peers() skips addresses where there's a connection at that IP with a recent:
// - `last_response`
- test_reconnection_peers_skips_recently_updated_ip(true, |addr| {
- MetaAddr::new_responded(addr, &PeerServices::NODE_NETWORK)
- });
+ test_reconnection_peers_skips_recently_updated_ip(true, MetaAddr::new_responded);
// tests that reconnection_peers() *does not* skip addresses where there's a connection at that IP with a recent:
// - `last_attempt`
diff --git a/zebra-network/src/address_book_updater.rs b/zebra-network/src/address_book_updater.rs
index ef503bc8d82..f0729b3efb0 100644
--- a/zebra-network/src/address_book_updater.rs
+++ b/zebra-network/src/address_book_updater.rs
@@ -1,6 +1,6 @@
//! The timestamp collector collects liveness information from peers.
-use std::{net::SocketAddr, sync::Arc};
+use std::{cmp::max, net::SocketAddr, sync::Arc};
use thiserror::Error;
use tokio::{
@@ -13,6 +13,9 @@ use crate::{
address_book::AddressMetrics, meta_addr::MetaAddrChange, AddressBook, BoxError, Config,
};
+/// The minimum size of the address book updater channel.
+pub const MIN_CHANNEL_SIZE: usize = 10;
+
/// The `AddressBookUpdater` hooks into incoming message streams for each peer
/// and lets the owner of the sender handle update the address book. For
/// example, it can be used to record per-connection last-seen timestamps, or
@@ -46,7 +49,10 @@ impl AddressBookUpdater {
) {
// Create an mpsc channel for peerset address book updates,
// based on the maximum number of inbound and outbound peers.
- let (worker_tx, mut worker_rx) = mpsc::channel(config.peerset_total_connection_limit());
+ let (worker_tx, mut worker_rx) = mpsc::channel(max(
+ config.peerset_total_connection_limit(),
+ MIN_CHANNEL_SIZE,
+ ));
let address_book = AddressBook::new(
local_listener,
diff --git a/zebra-network/src/meta_addr.rs b/zebra-network/src/meta_addr.rs
index 1f8572fd53c..98d3f32224a 100644
--- a/zebra-network/src/meta_addr.rs
+++ b/zebra-network/src/meta_addr.rs
@@ -280,6 +280,16 @@ pub enum MetaAddrChange {
addr: PeerSocketAddr,
},
+ /// Updates an existing `MetaAddr` when we've made a successful connection with a peer.
+ UpdateConnected {
+ #[cfg_attr(
+ any(test, feature = "proptest-impl"),
+ proptest(strategy = "canonical_peer_addr_strategy()")
+ )]
+ addr: PeerSocketAddr,
+ services: PeerServices,
+ },
+
/// Updates an existing `MetaAddr` when a peer responds with a message.
UpdateResponded {
#[cfg_attr(
@@ -287,7 +297,6 @@ pub enum MetaAddrChange {
proptest(strategy = "canonical_peer_addr_strategy()")
)]
addr: PeerSocketAddr,
- services: PeerServices,
},
/// Updates an existing `MetaAddr` when a peer fails.
@@ -345,8 +354,8 @@ impl MetaAddr {
})
}
- /// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
- /// sent us a message.
+ /// Returns a [`MetaAddrChange::UpdateConnected`] for a peer that has just successfully
+ /// connected.
///
/// # Security
///
@@ -354,16 +363,33 @@ impl MetaAddr {
/// and the services must be the services from that peer's handshake.
///
/// Otherwise:
- /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook) state,
- /// or
+ /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
+ /// state, or
/// - Zebra could advertise unreachable addresses to its own peers.
- pub fn new_responded(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
- UpdateResponded {
+ pub fn new_connected(addr: PeerSocketAddr, services: &PeerServices) -> MetaAddrChange {
+ UpdateConnected {
addr: canonical_peer_addr(*addr),
services: *services,
}
}
+ /// Returns a [`MetaAddrChange::UpdateResponded`] for a peer that has just
+ /// sent us a message.
+ ///
+ /// # Security
+ ///
+ /// This address must be the remote address from an outbound connection.
+ ///
+ /// Otherwise:
+ /// - malicious peers could interfere with other peers' [`AddressBook`](crate::AddressBook)
+ /// state, or
+ /// - Zebra could advertise unreachable addresses to its own peers.
+ pub fn new_responded(addr: PeerSocketAddr) -> MetaAddrChange {
+ UpdateResponded {
+ addr: canonical_peer_addr(*addr),
+ }
+ }
+
/// Returns a [`MetaAddrChange::UpdateAttempt`] for a peer that we
/// want to make an outbound connection to.
pub fn new_reconnect(addr: PeerSocketAddr) -> MetaAddrChange {
@@ -391,8 +417,7 @@ impl MetaAddr {
}
}
- /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had
- /// an error.
+ /// Returns a [`MetaAddrChange::UpdateFailed`] for a peer that has just had an error.
pub fn new_errored(
addr: PeerSocketAddr,
services: impl Into>,
@@ -404,13 +429,10 @@ impl MetaAddr {
}
/// Create a new `MetaAddr` for a peer that has just shut down.
- pub fn new_shutdown(
- addr: PeerSocketAddr,
- services: impl Into >,
- ) -> MetaAddrChange {
+ pub fn new_shutdown(addr: PeerSocketAddr) -> MetaAddrChange {
// TODO: if the peer shut down in the Responded state, preserve that
// state. All other states should be treated as (timeout) errors.
- MetaAddr::new_errored(addr, services.into())
+ MetaAddr::new_errored(addr, None)
}
/// Return the address for this `MetaAddr`.
@@ -696,6 +718,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
+ | UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr,
}
@@ -712,6 +735,7 @@ impl MetaAddrChange {
| NewAlternate { addr, .. }
| NewLocal { addr, .. }
| UpdateAttempt { addr }
+ | UpdateConnected { addr, .. }
| UpdateResponded { addr, .. }
| UpdateFailed { addr, .. } => *addr = new_addr,
}
@@ -721,17 +745,18 @@ impl MetaAddrChange {
pub fn untrusted_services(&self) -> Option {
match self {
NewInitial { .. } => None,
+ // TODO: split untrusted and direct services (#2324)
NewGossiped {
untrusted_services, ..
- } => Some(*untrusted_services),
- NewAlternate {
+ }
+ | NewAlternate {
untrusted_services, ..
} => Some(*untrusted_services),
// TODO: create a "services implemented by Zebra" constant (#2324)
NewLocal { .. } => Some(PeerServices::NODE_NETWORK),
UpdateAttempt { .. } => None,
- // TODO: split untrusted and direct services (#2324)
- UpdateResponded { services, .. } => Some(*services),
+ UpdateConnected { services, .. } => Some(*services),
+ UpdateResponded { .. } => None,
UpdateFailed { services, .. } => *services,
}
}
@@ -747,9 +772,10 @@ impl MetaAddrChange {
NewAlternate { .. } => None,
// We know that our local listener is available
NewLocal { .. } => Some(now),
- UpdateAttempt { .. } => None,
- UpdateResponded { .. } => None,
- UpdateFailed { .. } => None,
+ UpdateAttempt { .. }
+ | UpdateConnected { .. }
+ | UpdateResponded { .. }
+ | UpdateFailed { .. } => None,
}
}
@@ -775,33 +801,29 @@ impl MetaAddrChange {
/// Return the last attempt for this change, if available.
pub fn last_attempt(&self, now: Instant) -> Option {
match self {
- NewInitial { .. } => None,
- NewGossiped { .. } => None,
- NewAlternate { .. } => None,
- NewLocal { .. } => None,
+ NewInitial { .. } | NewGossiped { .. } | NewAlternate { .. } | NewLocal { .. } => None,
// Attempt changes are applied before we start the handshake to the
// peer address. So the attempt time is a lower bound for the actual
// handshake time.
UpdateAttempt { .. } => Some(now),
- UpdateResponded { .. } => None,
- UpdateFailed { .. } => None,
+ UpdateConnected { .. } | UpdateResponded { .. } | UpdateFailed { .. } => None,
}
}
/// Return the last response for this change, if available.
pub fn last_response(&self, now: DateTime32) -> Option {
match self {
- NewInitial { .. } => None,
- NewGossiped { .. } => None,
- NewAlternate { .. } => None,
- NewLocal { .. } => None,
- UpdateAttempt { .. } => None,
+ NewInitial { .. }
+ | NewGossiped { .. }
+ | NewAlternate { .. }
+ | NewLocal { .. }
+ | UpdateAttempt { .. } => None,
// If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` state for longer,
// - we might send outdated last seen times to our peers, and
// - the peer will appear to be live for longer, delaying future
// reconnection attempts.
- UpdateResponded { .. } => Some(now),
+ UpdateConnected { .. } | UpdateResponded { .. } => Some(now),
UpdateFailed { .. } => None,
}
}
@@ -809,12 +831,13 @@ impl MetaAddrChange {
/// Return the last failure for this change, if available.
pub fn last_failure(&self, now: Instant) -> Option {
match self {
- NewInitial { .. } => None,
- NewGossiped { .. } => None,
- NewAlternate { .. } => None,
- NewLocal { .. } => None,
- UpdateAttempt { .. } => None,
- UpdateResponded { .. } => None,
+ NewInitial { .. }
+ | NewGossiped { .. }
+ | NewAlternate { .. }
+ | NewLocal { .. }
+ | UpdateAttempt { .. }
+ | UpdateConnected { .. }
+ | UpdateResponded { .. } => None,
// If there is a large delay applying this change, then:
// - the peer might stay in the `AttemptPending` or `Responded`
// states for longer, and
@@ -833,7 +856,7 @@ impl MetaAddrChange {
// local listeners get sanitized, so the state doesn't matter here
NewLocal { .. } => NeverAttemptedGossiped,
UpdateAttempt { .. } => AttemptPending,
- UpdateResponded { .. } => Responded,
+ UpdateConnected { .. } | UpdateResponded { .. } => Responded,
UpdateFailed { .. } => Failed,
}
}
diff --git a/zebra-network/src/meta_addr/tests/vectors.rs b/zebra-network/src/meta_addr/tests/vectors.rs
index 5b341901b18..40e00b66513 100644
--- a/zebra-network/src/meta_addr/tests/vectors.rs
+++ b/zebra-network/src/meta_addr/tests/vectors.rs
@@ -170,7 +170,7 @@ fn recently_responded_peer_is_gossipable() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -191,7 +191,7 @@ fn not_so_recently_responded_peer_is_still_gossipable() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -222,7 +222,7 @@ fn responded_long_ago_peer_is_not_gossipable() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let mut peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let mut peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -253,7 +253,7 @@ fn long_delayed_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -297,7 +297,7 @@ fn later_revert_change_is_applied() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -340,7 +340,7 @@ fn concurrent_state_revert_change_is_not_applied() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
@@ -400,7 +400,7 @@ fn concurrent_state_progress_change_is_applied() {
.into_new_meta_addr(instant_now, local_now);
// Create a peer that has responded
- let peer = MetaAddr::new_responded(address, &PeerServices::NODE_NETWORK)
+ let peer = MetaAddr::new_responded(address)
.apply_to_meta_addr(peer_seed, instant_now, chrono_now)
.expect("Failed to create MetaAddr for responded peer");
diff --git a/zebra-network/src/peer/handshake.rs b/zebra-network/src/peer/handshake.rs
index e0d981951ad..e343e1c206c 100644
--- a/zebra-network/src/peer/handshake.rs
+++ b/zebra-network/src/peer/handshake.rs
@@ -929,12 +929,13 @@ where
let _ = address_book_updater.send(alt_addr).await;
}
- // The handshake succeeded: update the peer status from AttemptPending to Responded
+ // The handshake succeeded: update the peer status from AttemptPending to Responded,
+ // and send initial connection info.
if let Some(book_addr) = connected_addr.get_address_book_addr() {
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = address_book_updater
- .send(MetaAddr::new_responded(book_addr, &remote_services))
+ .send(MetaAddr::new_connected(book_addr, &remote_services))
.await;
}
@@ -1075,7 +1076,6 @@ where
let heartbeat_task = tokio::spawn(
send_periodic_heartbeats_with_shutdown_handle(
connected_addr,
- remote_services,
shutdown_rx,
server_tx.clone(),
address_book_updater.clone(),
@@ -1213,7 +1213,6 @@ pub(crate) async fn register_inventory_status(
/// Returning from this function terminates the connection's heartbeat task.
async fn send_periodic_heartbeats_with_shutdown_handle(
connected_addr: ConnectedAddr,
- remote_services: PeerServices,
shutdown_rx: oneshot::Receiver,
server_tx: futures::channel::mpsc::Sender,
heartbeat_ts_collector: tokio::sync::mpsc::Sender,
@@ -1222,7 +1221,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
let heartbeat_run_loop = send_periodic_heartbeats_run_loop(
connected_addr,
- remote_services,
server_tx,
heartbeat_ts_collector.clone(),
);
@@ -1246,7 +1244,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
PeerError::ClientCancelledHeartbeatTask,
&heartbeat_ts_collector,
&connected_addr,
- &remote_services,
)
.await
}
@@ -1256,7 +1253,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
PeerError::ClientDropped,
&heartbeat_ts_collector,
&connected_addr,
- &remote_services,
)
.await
}
@@ -1275,7 +1271,6 @@ async fn send_periodic_heartbeats_with_shutdown_handle(
/// See `send_periodic_heartbeats_with_shutdown_handle` for details.
async fn send_periodic_heartbeats_run_loop(
connected_addr: ConnectedAddr,
- remote_services: PeerServices,
mut server_tx: futures::channel::mpsc::Sender,
heartbeat_ts_collector: tokio::sync::mpsc::Sender,
) -> Result<(), BoxError> {
@@ -1294,13 +1289,7 @@ async fn send_periodic_heartbeats_run_loop(
// We've reached another heartbeat interval without
// shutting down, so do a heartbeat request.
let heartbeat = send_one_heartbeat(&mut server_tx);
- heartbeat_timeout(
- heartbeat,
- &heartbeat_ts_collector,
- &connected_addr,
- &remote_services,
- )
- .await?;
+ heartbeat_timeout(heartbeat, &heartbeat_ts_collector, &connected_addr).await?;
// # Security
//
@@ -1312,7 +1301,7 @@ async fn send_periodic_heartbeats_run_loop(
// the collector doesn't depend on network activity,
// so this await should not hang
let _ = heartbeat_ts_collector
- .send(MetaAddr::new_responded(book_addr, &remote_services))
+ .send(MetaAddr::new_responded(book_addr))
.await;
}
}
@@ -1375,29 +1364,16 @@ async fn heartbeat_timeout(
fut: F,
address_book_updater: &tokio::sync::mpsc::Sender,
connected_addr: &ConnectedAddr,
- remote_services: &PeerServices,
) -> Result
where
F: Future>,
{
let t = match timeout(constants::HEARTBEAT_INTERVAL, fut).await {
Ok(inner_result) => {
- handle_heartbeat_error(
- inner_result,
- address_book_updater,
- connected_addr,
- remote_services,
- )
- .await?
+ handle_heartbeat_error(inner_result, address_book_updater, connected_addr).await?
}
Err(elapsed) => {
- handle_heartbeat_error(
- Err(elapsed),
- address_book_updater,
- connected_addr,
- remote_services,
- )
- .await?
+ handle_heartbeat_error(Err(elapsed), address_book_updater, connected_addr).await?
}
};
@@ -1409,7 +1385,6 @@ async fn handle_heartbeat_error(
result: Result,
address_book_updater: &tokio::sync::mpsc::Sender,
connected_addr: &ConnectedAddr,
- remote_services: &PeerServices,
) -> Result
where
E: std::fmt::Debug,
@@ -1427,7 +1402,7 @@ where
// - after the first error or shutdown, the peer is disconnected
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = address_book_updater
- .send(MetaAddr::new_errored(book_addr, *remote_services))
+ .send(MetaAddr::new_errored(book_addr, None))
.await;
}
Err(err)
@@ -1440,13 +1415,12 @@ async fn handle_heartbeat_shutdown(
peer_error: PeerError,
address_book_updater: &tokio::sync::mpsc::Sender,
connected_addr: &ConnectedAddr,
- remote_services: &PeerServices,
) -> Result<(), BoxError> {
tracing::debug!(?peer_error, "client shutdown, shutting down heartbeat");
if let Some(book_addr) = connected_addr.get_address_book_addr() {
let _ = address_book_updater
- .send(MetaAddr::new_shutdown(book_addr, *remote_services))
+ .send(MetaAddr::new_shutdown(book_addr))
.await;
}
diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs
index f951bda5b9b..b60dc74732f 100644
--- a/zebra-network/src/peer_set/candidate_set.rs
+++ b/zebra-network/src/peer_set/candidate_set.rs
@@ -414,6 +414,8 @@ where
}
/// Returns the address book for this `CandidateSet`.
+ #[cfg(any(test, feature = "proptest-impl"))]
+ #[allow(dead_code)]
pub async fn address_book(&self) -> Arc> {
self.address_book.clone()
}
diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs
index 9dc74d99f6c..30404ac6aea 100644
--- a/zebra-network/src/peer_set/initialize.rs
+++ b/zebra-network/src/peer_set/initialize.rs
@@ -29,7 +29,6 @@ use tokio_stream::wrappers::IntervalStream;
use tower::{
buffer::Buffer, discover::Change, layer::Layer, util::BoxService, Service, ServiceExt,
};
-use tracing::Span;
use tracing_futures::Instrument;
use zebra_chain::{chain_tip::ChainTip, diagnostic::task::WaitForPanics};
@@ -197,7 +196,7 @@ where
config.clone(),
outbound_connector.clone(),
peerset_tx.clone(),
- address_book_updater,
+ address_book_updater.clone(),
);
let initial_peers_join = tokio::spawn(initial_peers_fut.in_current_span());
@@ -242,6 +241,7 @@ where
outbound_connector,
peerset_tx,
active_outbound_connections,
+ address_book_updater,
);
let crawl_guard = tokio::spawn(crawl_fut.in_current_span());
@@ -740,6 +740,7 @@ enum CrawlerAction {
///
/// Uses `active_outbound_connections` to limit the number of active outbound connections
/// across both the initial peers and crawler. The limit is based on `config`.
+#[allow(clippy::too_many_arguments)]
#[instrument(
skip(
config,
@@ -749,6 +750,7 @@ enum CrawlerAction {
outbound_connector,
peerset_tx,
active_outbound_connections,
+ address_book_updater,
),
fields(
new_peer_interval = ?config.crawl_new_peer_interval,
@@ -762,6 +764,7 @@ async fn crawl_and_dial(
outbound_connector: C,
peerset_tx: futures::channel::mpsc::Sender,
mut active_outbound_connections: ActiveConnectionCounter,
+ address_book_updater: tokio::sync::mpsc::Sender,
) -> Result<(), BoxError>
where
C: Service<
@@ -783,8 +786,6 @@ where
"starting the peer address crawler",
);
- let address_book = candidates.address_book().await;
-
// # Concurrency
//
// Allow tasks using the candidate set to be spawned, so they can run concurrently.
@@ -857,7 +858,7 @@ where
let candidates = candidates.clone();
let outbound_connector = outbound_connector.clone();
let peerset_tx = peerset_tx.clone();
- let address_book = address_book.clone();
+ let address_book_updater = address_book_updater.clone();
let demand_tx = demand_tx.clone();
// Increment the connection count before we spawn the connection.
@@ -886,7 +887,7 @@ where
outbound_connector,
outbound_connection_tracker,
peerset_tx,
- address_book,
+ address_book_updater,
demand_tx,
)
.await?;
@@ -1020,7 +1021,7 @@ where
outbound_connector,
outbound_connection_tracker,
peerset_tx,
- address_book,
+ address_book_updater,
demand_tx
))]
async fn dial(
@@ -1028,7 +1029,7 @@ async fn dial(
mut outbound_connector: C,
outbound_connection_tracker: ConnectionTracker,
mut peerset_tx: futures::channel::mpsc::Sender,
- address_book: Arc>,
+ address_book_updater: tokio::sync::mpsc::Sender,
mut demand_tx: futures::channel::mpsc::Sender,
) -> Result<(), BoxError>
where
@@ -1069,8 +1070,8 @@ where
}
// The connection was never opened, or it failed the handshake and was dropped.
Err(error) => {
- debug!(?error, ?candidate.addr, "failed to make outbound connection to peer");
- report_failed(address_book.clone(), candidate).await;
+ info!(?error, ?candidate.addr, "failed to make outbound connection to peer");
+ report_failed(address_book_updater.clone(), candidate).await;
// The demand signal that was taken out of the queue to attempt to connect to the
// failed candidate never turned into a connection, so add it back.
@@ -1090,25 +1091,15 @@ where
Ok(())
}
-/// Mark `addr` as a failed peer in `address_book`.
-#[instrument(skip(address_book))]
-async fn report_failed(address_book: Arc>, addr: MetaAddr) {
- let addr = MetaAddr::new_errored(addr.addr, addr.services);
+/// Mark `addr` as a failed peer to `address_book_updater`.
+#[instrument(skip(address_book_updater))]
+async fn report_failed(
+ address_book_updater: tokio::sync::mpsc::Sender,
+ addr: MetaAddr,
+) {
+ // The connection info is the same as what's already in the address book.
+ let addr = MetaAddr::new_errored(addr.addr, None);
- // # Correctness
- //
- // Spawn address book accesses on a blocking thread, to avoid deadlocks (see #1976).
- let span = Span::current();
- let updated_addr = tokio::task::spawn_blocking(move || {
- span.in_scope(|| address_book.lock().unwrap().update(addr))
- })
- .wait_for_panics()
- .await;
-
- assert_eq!(
- updated_addr.map(|addr| addr.addr()),
- Some(addr.addr()),
- "incorrect address updated by address book: \
- original: {addr:?}, updated: {updated_addr:?}"
- );
+ // Ignore send errors on Zebra shutdown.
+ let _ = address_book_updater.send(addr).await;
}
diff --git a/zebra-network/src/peer_set/initialize/tests/vectors.rs b/zebra-network/src/peer_set/initialize/tests/vectors.rs
index c871ab43227..59bafdc771e 100644
--- a/zebra-network/src/peer_set/initialize/tests/vectors.rs
+++ b/zebra-network/src/peer_set/initialize/tests/vectors.rs
@@ -24,7 +24,6 @@ use futures::{channel::mpsc, FutureExt, StreamExt};
use indexmap::IndexSet;
use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinHandle};
use tower::{service_fn, Layer, Service, ServiceExt};
-use tracing::Span;
use zebra_chain::{chain_tip::NoChainTip, parameters::Network, serialization::DateTime32};
use zebra_test::net::random_known_port;
@@ -1517,13 +1516,8 @@ where
config.peerset_initial_target_size = peerset_initial_target_size;
}
- // Manually initialize an address book without a timestamp tracker.
- let mut address_book = AddressBook::new(
- config.listen_addr,
- config.network,
- config.max_connections_per_ip,
- Span::current(),
- );
+ let (address_book, address_book_updater, _address_metrics, _address_book_updater_guard) =
+ AddressBookUpdater::spawn(&config, config.listen_addr);
// Add enough fake peers to go over the limit, even if the limit is zero.
let over_limit_peers = config.peerset_outbound_connection_limit() * 2 + 1;
@@ -1540,7 +1534,10 @@ where
.new_gossiped_change()
.expect("created MetaAddr contains enough information to represent a gossiped address");
- address_book.update(addr);
+ address_book
+ .lock()
+ .expect("panic in previous thread while accessing the address book")
+ .update(addr);
}
// Create a fake peer set.
@@ -1555,8 +1552,6 @@ where
Ok(rsp)
});
- let address_book = Arc::new(std::sync::Mutex::new(address_book));
-
// Make the channels large enough to hold all the peers.
let (peerset_tx, peerset_rx) = mpsc::channel::(over_limit_peers);
let (mut demand_tx, demand_rx) = mpsc::channel::(over_limit_peers);
@@ -1581,6 +1576,7 @@ where
outbound_connector,
peerset_tx,
active_outbound_connections,
+ address_book_updater,
);
let crawl_task_handle = tokio::spawn(crawl_fut);