diff --git a/zebra-network/src/address_book.rs b/zebra-network/src/address_book.rs index 548c96aa0a7..7a2cd556e19 100644 --- a/zebra-network/src/address_book.rs +++ b/zebra-network/src/address_book.rs @@ -18,7 +18,7 @@ use tracing::Span; use zebra_chain::{parameters::Network, serialization::DateTime32}; use crate::{ - constants, + constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, meta_addr::MetaAddrChange, protocol::external::{canonical_peer_addr, canonical_socket_addr}, types::MetaAddr, @@ -268,7 +268,20 @@ impl AddressBook { /// Get the active addresses in `self` in random order with sanitized timestamps, /// including our local listener address. - pub fn sanitized(&self, now: chrono::DateTime) -> Vec { + /// + /// Limited to a the number of peer addresses Zebra should give out per `GetAddr` request. + pub fn fresh_get_addr_response(&self) -> Vec { + let now = Utc::now(); + let mut peers = self.sanitized(now); + let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR); + peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit)); + + peers + } + + /// Get the active addresses in `self` in random order with sanitized timestamps, + /// including our local listener address. + pub(crate) fn sanitized(&self, now: chrono::DateTime) -> Vec { use rand::seq::SliceRandom; let _guard = self.span.enter(); diff --git a/zebra-network/src/address_book/tests/prop.rs b/zebra-network/src/address_book/tests/prop.rs index 732d477379b..75ad6635a36 100644 --- a/zebra-network/src/address_book/tests/prop.rs +++ b/zebra-network/src/address_book/tests/prop.rs @@ -9,7 +9,10 @@ use tracing::Span; use zebra_chain::{parameters::Network::*, serialization::Duration32}; use crate::{ - constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP}, + constants::{ + ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, + MAX_ADDRS_IN_MESSAGE, MAX_PEER_ACTIVE_FOR_GOSSIP, + }, meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange}, AddressBook, }; @@ -36,8 +39,17 @@ proptest! { addresses ); - for gossiped_address in address_book.sanitized(chrono_now) { - let duration_since_last_seen = gossiped_address + // Only recently reachable are sanitized + let sanitized = address_book.sanitized(chrono_now); + let gossiped = address_book.fresh_get_addr_response(); + + let expected_num_gossiped = sanitized.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR).min(MAX_ADDRS_IN_MESSAGE); + let num_gossiped = gossiped.len(); + + prop_assert_eq!(expected_num_gossiped, num_gossiped); + + for sanitized_address in sanitized { + let duration_since_last_seen = sanitized_address .last_seen() .expect("Peer that was never seen before is being gossiped") .saturating_elapsed(chrono_now) diff --git a/zebra-network/src/constants.rs b/zebra-network/src/constants.rs index c372949b1c9..12fabbff51a 100644 --- a/zebra-network/src/constants.rs +++ b/zebra-network/src/constants.rs @@ -309,7 +309,7 @@ pub const MAX_ADDRS_IN_MESSAGE: usize = 1000; /// /// This limit makes sure that Zebra does not reveal its entire address book /// in a single `Peers` response. -pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 3; +pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 4; /// The maximum number of addresses Zebra will keep in its address book. /// @@ -499,8 +499,9 @@ mod tests { #[test] #[allow(clippy::assertions_on_constants)] fn ensure_address_limits_consistent() { - // Zebra 1.0.0-beta.2 address book metrics in December 2021. - const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 4_500; + // Estimated network address book size in November 2023, after the address book limit was increased. + // Zebra 1.0.0-beta.2 address book metrics in December 2021 showed 4500 peers. + const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 5_500; let _init_guard = zebra_test::init(); @@ -515,7 +516,7 @@ mod tests { ); assert!( - MAX_ADDRS_IN_ADDRESS_BOOK < TYPICAL_MAINNET_ADDRESS_BOOK_SIZE, + MAX_ADDRS_IN_ADDRESS_BOOK <= TYPICAL_MAINNET_ADDRESS_BOOK_SIZE, "the address book limit should actually be used" ); } diff --git a/zebra-network/src/protocol/internal/response.rs b/zebra-network/src/protocol/internal/response.rs index 379517bb067..6e1cd3291ff 100644 --- a/zebra-network/src/protocol/internal/response.rs +++ b/zebra-network/src/protocol/internal/response.rs @@ -142,4 +142,10 @@ impl Response { pub fn is_inventory_download(&self) -> bool { matches!(self, Response::Blocks(_) | Response::Transactions(_)) } + + /// Returns true if self is the [`Response::Nil`] variant. + #[allow(dead_code)] + pub fn is_nil(&self) -> bool { + matches!(self, Self::Nil) + } } diff --git a/zebrad/src/components/inbound.rs b/zebrad/src/components/inbound.rs index f25b461f812..cfc7008e0bd 100644 --- a/zebrad/src/components/inbound.rs +++ b/zebrad/src/components/inbound.rs @@ -9,17 +9,15 @@ use std::{ collections::HashSet, future::Future, pin::Pin, - sync::{Arc, TryLockError}, + sync::Arc, task::{Context, Poll}, time::Duration, }; -use chrono::Utc; use futures::{ future::{FutureExt, TryFutureExt}, stream::Stream, }; -use num_integer::div_ceil; use tokio::sync::oneshot::{self, error::TryRecvError}; use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt}; @@ -32,10 +30,7 @@ use zebra_chain::{ transaction::UnminedTxId, }; use zebra_consensus::router::RouterError; -use zebra_network::{ - constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE}, - AddressBook, InventoryResponse, -}; +use zebra_network::{AddressBook, InventoryResponse}; use zebra_node_services::mempool; use crate::BoxError; @@ -45,8 +40,11 @@ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT}; use InventoryResponse::*; +mod cached_peer_addr_response; pub(crate) mod downloads; +use cached_peer_addr_response::CachedPeerAddrResponse; + #[cfg(test)] mod tests; @@ -135,8 +133,12 @@ pub enum Setup { Initialized { // Services // - /// A shared list of peer addresses. - address_book: Arc>, + /// An owned partial list of peer addresses used as a `GetAddr` response, and + /// a shared list of peer addresses used to periodically refresh the partial list. + /// + /// Refreshed from the address book in `poll_ready` method + /// after [`CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::CACHED_ADDRS_REFRESH_INTERVAL). + cached_peer_addr_response: CachedPeerAddrResponse, /// A `futures::Stream` that downloads and verifies gossiped blocks. block_downloads: Pin>, @@ -261,6 +263,8 @@ impl Service for Inbound { latest_chain_tip, } = setup_data; + let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book); + let block_downloads = Box::pin(BlockDownloads::new( full_verify_concurrency_limit, Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT), @@ -271,7 +275,7 @@ impl Service for Inbound { result = Ok(()); Setup::Initialized { - address_book, + cached_peer_addr_response, block_downloads, mempool, state, @@ -306,7 +310,7 @@ impl Service for Inbound { } // Clean up completed download tasks, ignoring their results Setup::Initialized { - address_book, + cached_peer_addr_response, mut block_downloads, mempool, state, @@ -321,7 +325,7 @@ impl Service for Inbound { result = Ok(()); Setup::Initialized { - address_book, + cached_peer_addr_response, block_downloads, mempool, state, @@ -352,13 +356,13 @@ impl Service for Inbound { /// and will cause callers to disconnect from the remote peer. #[instrument(name = "inbound", skip(self, req))] fn call(&mut self, req: zn::Request) -> Self::Future { - let (address_book, block_downloads, mempool, state) = match &mut self.setup { + let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup { Setup::Initialized { - address_book, + cached_peer_addr_response, block_downloads, mempool, state, - } => (address_book, block_downloads, mempool, state), + } => (cached_peer_addr_response, block_downloads, mempool, state), _ => { debug!("ignoring request from remote peer during setup"); return async { Ok(zn::Response::Nil) }.boxed(); @@ -377,58 +381,11 @@ impl Service for Inbound { // // If the address book is busy, try again inside the future. If it can't be locked // twice, ignore the request. - let address_book = address_book.clone(); - - let get_peers = move || match address_book.try_lock() { - Ok(address_book) => Some(address_book.clone()), - Err(TryLockError::WouldBlock) => None, - Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"), - }; - - let peers = get_peers(); + cached_peer_addr_response.try_refresh(); + let response = cached_peer_addr_response.value(); async move { - // Correctness: get the current time inside the future. - // - // This time is used to filter outdated peers, so it doesn't matter much - // if we get it when the future is created, or when it starts running. - let now = Utc::now(); - - // If we didn't get the peers when the future was created, wait for other tasks - // to run, then try again when the future first runs. - if peers.is_none() { - tokio::task::yield_now().await; - } - let peers = peers.or_else(get_peers); - let is_busy = peers.is_none(); - - // Send a sanitized response - let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now)); - - // Truncate the list - let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR); - let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit); - peers.truncate(address_limit); - - if peers.is_empty() { - // Sometimes we don't know if the peer response will be empty until we've - // sanitized them. - if is_busy { - info!( - "ignoring `Peers` request from remote peer because our address \ - book is busy" - ); - } else { - debug!( - "ignoring `Peers` request from remote peer because our address \ - book has no available peers" - ); - } - - Ok(zn::Response::Nil) - } else { - Ok(zn::Response::Peers(peers)) - } + Ok(response) }.boxed() } zn::Request::BlocksByHash(hashes) => { diff --git a/zebrad/src/components/inbound/cached_peer_addr_response.rs b/zebrad/src/components/inbound/cached_peer_addr_response.rs new file mode 100644 index 00000000000..8e95f16f89e --- /dev/null +++ b/zebrad/src/components/inbound/cached_peer_addr_response.rs @@ -0,0 +1,98 @@ +//! Periodically-refreshed GetAddr response for the inbound service. +//! +//! Used to avoid giving out Zebra's entire address book over a short duration. + +use std::{ + sync::{Mutex, TryLockError}, + time::Instant, +}; + +use super::*; + +/// The minimum duration that a `CachedPeerAddrResponse` is considered fresh before the inbound service +/// should get new peer addresses from the address book to send as a `GetAddr` response. +/// +/// Cached responses are considered stale and should be cleared after twice this duration. +pub const CACHED_ADDRS_REFRESH_INTERVAL: Duration = Duration::from_secs(10 * 60); + +/// Caches and refreshes a partial list of peer addresses to be returned as a `GetAddr` response. +pub struct CachedPeerAddrResponse { + /// A shared list of peer addresses. + address_book: Arc>, + + /// An owned list of peer addresses used as a `GetAddr` response. + value: zn::Response, + + /// Instant after which `cached_addrs` should be refreshed. + refresh_time: Instant, +} + +impl CachedPeerAddrResponse { + /// Creates a new empty [`CachedPeerAddrResponse`]. + pub(super) fn new(address_book: Arc>) -> Self { + Self { + address_book, + value: zn::Response::Nil, + refresh_time: Instant::now(), + } + } + + pub(super) fn value(&self) -> zn::Response { + self.value.clone() + } + + /// Refreshes the `cached_addrs` if the time has past `refresh_time` or the cache is empty + pub(super) fn try_refresh(&mut self) { + let now = Instant::now(); + + // return early if there are some cached addresses, and they are still fresh + if now < self.refresh_time { + return; + } + + let cache_expiry = self.refresh_time + CACHED_ADDRS_REFRESH_INTERVAL; + + // try getting a lock on the address book if it's time to refresh the cached addresses + match self + .address_book + .try_lock() + .map(|book| book.fresh_get_addr_response()) + { + // Update cached value and refresh_time if there are some gossipable peers in the address book. + // + // Security: this avoids outdated gossiped peers. Outdated Zebra binaries will gradually lose all their peers, + // because those peers refuse to connect to outdated versions. So we don't want those outdated Zebra + // versions to keep gossiping old peer information either. + Ok(peers) if !peers.is_empty() => { + self.refresh_time = now + CACHED_ADDRS_REFRESH_INTERVAL; + self.value = zn::Response::Peers(peers); + } + + // Clear the cached response if the time has past the cache expiry time. + Ok(_) if now > cache_expiry => { + self.value = zn::Response::Nil; + } + + Err(TryLockError::WouldBlock) if now > cache_expiry => { + warn!("getaddrs response hasn't been refreshed in some time"); + self.value = zn::Response::Nil; + } + + // Don't update the cached response or refresh time if unable to get new peer addresses + // from the address book and `now` is before the cache expiry. + Ok(_) => { + debug!( + "could not refresh cached response because our address \ + book has no available peers" + ); + } + + Err(TryLockError::WouldBlock) => {} + + // Panic if the address book lock is poisoned + Err(TryLockError::Poisoned(_)) => { + panic!("previous thread panicked while holding the address book lock") + } + }; + } +} diff --git a/zebrad/src/components/inbound/tests/fake_peer_set.rs b/zebrad/src/components/inbound/tests/fake_peer_set.rs index 783686afab8..44f27bd80e2 100644 --- a/zebrad/src/components/inbound/tests/fake_peer_set.rs +++ b/zebrad/src/components/inbound/tests/fake_peer_set.rs @@ -19,12 +19,16 @@ use zebra_chain::{ block::{Block, Height}, fmt::humantime_seconds, parameters::Network::{self, *}, - serialization::ZcashDeserializeInto, + serialization::{DateTime32, ZcashDeserializeInto}, transaction::{UnminedTx, UnminedTxId, VerifiedUnminedTx}, }; use zebra_consensus::{error::TransactionError, transaction, Config as ConsensusConfig}; use zebra_network::{ - constants::DEFAULT_MAX_CONNS_PER_IP, AddressBook, InventoryResponse, Request, Response, + constants::{ + ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, + }, + types::{MetaAddr, PeerServices}, + AddressBook, InventoryResponse, Request, Response, }; use zebra_node_services::mempool; use zebra_state::{ChainTipChange, Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT}; @@ -742,6 +746,112 @@ async fn inbound_block_height_lookahead_limit() -> Result<(), crate::BoxError> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +/// Checks that Zebra won't give out its entire address book over a short duration. +async fn caches_getaddr_response() { + const NUM_ADDRESSES: usize = 20; + const NUM_REQUESTS: usize = 10; + const EXPECTED_NUM_RESULTS: usize = NUM_ADDRESSES / ADDR_RESPONSE_LIMIT_DENOMINATOR; + + let _init_guard = zebra_test::init(); + + let addrs = (0..NUM_ADDRESSES) + .map(|idx| format!("127.0.0.{idx}:{idx}")) + .map(|addr| { + MetaAddr::new_gossiped_meta_addr( + addr.parse().unwrap(), + PeerServices::NODE_NETWORK, + DateTime32::now(), + ) + }); + + let inbound = { + let network = Mainnet; + let consensus_config = ConsensusConfig::default(); + let state_config = StateConfig::ephemeral(); + let address_book = AddressBook::new_with_addrs( + SocketAddr::from_str("0.0.0.0:0").unwrap(), + Mainnet, + DEFAULT_MAX_CONNS_PER_IP, + MAX_ADDRS_IN_ADDRESS_BOOK, + Span::none(), + addrs, + ); + + let address_book = Arc::new(std::sync::Mutex::new(address_book)); + + // UTXO verification doesn't matter for these tests. + let (state, _read_only_state_service, latest_chain_tip, _chain_tip_change) = + zebra_state::init(state_config.clone(), network, Height::MAX, 0); + + let state_service = ServiceBuilder::new().buffer(1).service(state); + + // Download task panics and timeouts are propagated to the tests that use Groth16 verifiers. + let ( + block_verifier, + _transaction_verifier, + _groth16_download_handle, + _max_checkpoint_height, + ) = zebra_consensus::router::init(consensus_config.clone(), network, state_service.clone()) + .await; + + let peer_set = MockService::build() + .with_max_request_delay(MAX_PEER_SET_REQUEST_DELAY) + .for_unit_tests(); + let buffered_peer_set = Buffer::new(BoxService::new(peer_set.clone()), 10); + + let buffered_mempool_service = + Buffer::new(BoxService::new(MockService::build().for_unit_tests()), 10); + let (setup_tx, setup_rx) = oneshot::channel(); + + let inbound_service = ServiceBuilder::new() + .load_shed() + .service(Inbound::new(MAX_INBOUND_CONCURRENCY, setup_rx)); + let inbound_service = BoxService::new(inbound_service); + let inbound_service = ServiceBuilder::new().buffer(1).service(inbound_service); + + let setup_data = InboundSetupData { + address_book: address_book.clone(), + block_download_peer_set: buffered_peer_set, + block_verifier, + mempool: buffered_mempool_service.clone(), + state: state_service.clone(), + latest_chain_tip, + }; + let r = setup_tx.send(setup_data); + // We can't expect or unwrap because the returned Result does not implement Debug + assert!(r.is_ok(), "unexpected setup channel send failure"); + + inbound_service + }; + + let Ok(zebra_network::Response::Peers(first_result)) = + inbound.clone().oneshot(zebra_network::Request::Peers).await + else { + panic!("result should match Ok(Peers(_))") + }; + + assert_eq!( + first_result.len(), + EXPECTED_NUM_RESULTS, + "inbound service should respond with expected number of peer addresses", + ); + + for _ in 0..NUM_REQUESTS { + let Ok(zebra_network::Response::Peers(peers)) = + inbound.clone().oneshot(zebra_network::Request::Peers).await + else { + panic!("result should match Ok(Peers(_))") + }; + + assert_eq!( + peers, + first_result, + "inbound service should return the same result for every Peers request until the refresh time", + ); + } +} + /// Setup a fake Zebra network stack, with fake peer set. /// /// Adds some initial state blocks, and mempool transactions if `add_transactions` is true. diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index ac773145966..0913d60274d 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -59,6 +59,9 @@ async fn inbound_peers_empty_address_book() -> Result<(), crate::BoxError> { listen_addr, ) = setup(None).await; + // yield and sleep until the address book lock is released. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Send a request to inbound directly let request = inbound_service.clone().oneshot(Request::Peers); let response = request.await;