Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
teor2345 authored Nov 21, 2023
2 parents bcbf42a + 5e4c0f9 commit 473be3a
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 76 deletions.
17 changes: 15 additions & 2 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::Span;
use zebra_chain::{parameters::Network, serialization::DateTime32};

use crate::{
constants,
constants::{self, ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
meta_addr::MetaAddrChange,
protocol::external::{canonical_peer_addr, canonical_socket_addr},
types::MetaAddr,
Expand Down Expand Up @@ -268,7 +268,20 @@ impl AddressBook {

/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
///
/// Limited to a the number of peer addresses Zebra should give out per `GetAddr` request.
pub fn fresh_get_addr_response(&self) -> Vec<MetaAddr> {
let now = Utc::now();
let mut peers = self.sanitized(now);
let address_limit = peers.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR);
peers.truncate(MAX_ADDRS_IN_MESSAGE.min(address_limit));

peers
}

/// Get the active addresses in `self` in random order with sanitized timestamps,
/// including our local listener address.
pub(crate) fn sanitized(&self, now: chrono::DateTime<Utc>) -> Vec<MetaAddr> {
use rand::seq::SliceRandom;
let _guard = self.span.enter();

Expand Down
18 changes: 15 additions & 3 deletions zebra-network/src/address_book/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tracing::Span;
use zebra_chain::{parameters::Network::*, serialization::Duration32};

use crate::{
constants::{DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK, MAX_PEER_ACTIVE_FOR_GOSSIP},
constants::{
ADDR_RESPONSE_LIMIT_DENOMINATOR, DEFAULT_MAX_CONNS_PER_IP, MAX_ADDRS_IN_ADDRESS_BOOK,
MAX_ADDRS_IN_MESSAGE, MAX_PEER_ACTIVE_FOR_GOSSIP,
},
meta_addr::{arbitrary::MAX_META_ADDR, MetaAddr, MetaAddrChange},
AddressBook,
};
Expand All @@ -36,8 +39,17 @@ proptest! {
addresses
);

for gossiped_address in address_book.sanitized(chrono_now) {
let duration_since_last_seen = gossiped_address
// Only recently reachable are sanitized
let sanitized = address_book.sanitized(chrono_now);
let gossiped = address_book.fresh_get_addr_response();

let expected_num_gossiped = sanitized.len().div_ceil(ADDR_RESPONSE_LIMIT_DENOMINATOR).min(MAX_ADDRS_IN_MESSAGE);
let num_gossiped = gossiped.len();

prop_assert_eq!(expected_num_gossiped, num_gossiped);

for sanitized_address in sanitized {
let duration_since_last_seen = sanitized_address
.last_seen()
.expect("Peer that was never seen before is being gossiped")
.saturating_elapsed(chrono_now)
Expand Down
9 changes: 5 additions & 4 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub const MAX_ADDRS_IN_MESSAGE: usize = 1000;
///
/// This limit makes sure that Zebra does not reveal its entire address book
/// in a single `Peers` response.
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 3;
pub const ADDR_RESPONSE_LIMIT_DENOMINATOR: usize = 4;

/// The maximum number of addresses Zebra will keep in its address book.
///
Expand Down Expand Up @@ -499,8 +499,9 @@ mod tests {
#[test]
#[allow(clippy::assertions_on_constants)]
fn ensure_address_limits_consistent() {
// Zebra 1.0.0-beta.2 address book metrics in December 2021.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 4_500;
// Estimated network address book size in November 2023, after the address book limit was increased.
// Zebra 1.0.0-beta.2 address book metrics in December 2021 showed 4500 peers.
const TYPICAL_MAINNET_ADDRESS_BOOK_SIZE: usize = 5_500;

let _init_guard = zebra_test::init();

Expand All @@ -515,7 +516,7 @@ mod tests {
);

assert!(
MAX_ADDRS_IN_ADDRESS_BOOK < TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
MAX_ADDRS_IN_ADDRESS_BOOK <= TYPICAL_MAINNET_ADDRESS_BOOK_SIZE,
"the address book limit should actually be used"
);
}
Expand Down
6 changes: 6 additions & 0 deletions zebra-network/src/protocol/internal/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,10 @@ impl Response {
pub fn is_inventory_download(&self) -> bool {
matches!(self, Response::Blocks(_) | Response::Transactions(_))
}

/// Returns true if self is the [`Response::Nil`] variant.
#[allow(dead_code)]
pub fn is_nil(&self) -> bool {
matches!(self, Self::Nil)
}
}
87 changes: 22 additions & 65 deletions zebrad/src/components/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,15 @@ use std::{
collections::HashSet,
future::Future,
pin::Pin,
sync::{Arc, TryLockError},
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use chrono::Utc;
use futures::{
future::{FutureExt, TryFutureExt},
stream::Stream,
};
use num_integer::div_ceil;
use tokio::sync::oneshot::{self, error::TryRecvError};
use tower::{buffer::Buffer, timeout::Timeout, util::BoxService, Service, ServiceExt};

Expand All @@ -32,10 +30,7 @@ use zebra_chain::{
transaction::UnminedTxId,
};
use zebra_consensus::router::RouterError;
use zebra_network::{
constants::{ADDR_RESPONSE_LIMIT_DENOMINATOR, MAX_ADDRS_IN_MESSAGE},
AddressBook, InventoryResponse,
};
use zebra_network::{AddressBook, InventoryResponse};
use zebra_node_services::mempool;

use crate::BoxError;
Expand All @@ -45,8 +40,11 @@ use super::sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT};

use InventoryResponse::*;

mod cached_peer_addr_response;
pub(crate) mod downloads;

use cached_peer_addr_response::CachedPeerAddrResponse;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -135,8 +133,12 @@ pub enum Setup {
Initialized {
// Services
//
/// A shared list of peer addresses.
address_book: Arc<std::sync::Mutex<zn::AddressBook>>,
/// An owned partial list of peer addresses used as a `GetAddr` response, and
/// a shared list of peer addresses used to periodically refresh the partial list.
///
/// Refreshed from the address book in `poll_ready` method
/// after [`CACHED_ADDRS_REFRESH_INTERVAL`](cached_peer_addr_response::CACHED_ADDRS_REFRESH_INTERVAL).
cached_peer_addr_response: CachedPeerAddrResponse,

/// A `futures::Stream` that downloads and verifies gossiped blocks.
block_downloads: Pin<Box<GossipedBlockDownloads>>,
Expand Down Expand Up @@ -261,6 +263,8 @@ impl Service<zn::Request> for Inbound {
latest_chain_tip,
} = setup_data;

let cached_peer_addr_response = CachedPeerAddrResponse::new(address_book);

let block_downloads = Box::pin(BlockDownloads::new(
full_verify_concurrency_limit,
Timeout::new(block_download_peer_set, BLOCK_DOWNLOAD_TIMEOUT),
Expand All @@ -271,7 +275,7 @@ impl Service<zn::Request> for Inbound {

result = Ok(());
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -306,7 +310,7 @@ impl Service<zn::Request> for Inbound {
}
// Clean up completed download tasks, ignoring their results
Setup::Initialized {
address_book,
cached_peer_addr_response,
mut block_downloads,
mempool,
state,
Expand All @@ -321,7 +325,7 @@ impl Service<zn::Request> for Inbound {
result = Ok(());

Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
Expand Down Expand Up @@ -352,13 +356,13 @@ impl Service<zn::Request> for Inbound {
/// and will cause callers to disconnect from the remote peer.
#[instrument(name = "inbound", skip(self, req))]
fn call(&mut self, req: zn::Request) -> Self::Future {
let (address_book, block_downloads, mempool, state) = match &mut self.setup {
let (cached_peer_addr_response, block_downloads, mempool, state) = match &mut self.setup {
Setup::Initialized {
address_book,
cached_peer_addr_response,
block_downloads,
mempool,
state,
} => (address_book, block_downloads, mempool, state),
} => (cached_peer_addr_response, block_downloads, mempool, state),
_ => {
debug!("ignoring request from remote peer during setup");
return async { Ok(zn::Response::Nil) }.boxed();
Expand All @@ -377,58 +381,11 @@ impl Service<zn::Request> for Inbound {
//
// If the address book is busy, try again inside the future. If it can't be locked
// twice, ignore the request.
let address_book = address_book.clone();

let get_peers = move || match address_book.try_lock() {
Ok(address_book) => Some(address_book.clone()),
Err(TryLockError::WouldBlock) => None,
Err(TryLockError::Poisoned(_)) => panic!("previous thread panicked while holding the address book lock"),
};

let peers = get_peers();
cached_peer_addr_response.try_refresh();
let response = cached_peer_addr_response.value();

async move {
// Correctness: get the current time inside the future.
//
// This time is used to filter outdated peers, so it doesn't matter much
// if we get it when the future is created, or when it starts running.
let now = Utc::now();

// If we didn't get the peers when the future was created, wait for other tasks
// to run, then try again when the future first runs.
if peers.is_none() {
tokio::task::yield_now().await;
}
let peers = peers.or_else(get_peers);
let is_busy = peers.is_none();

// Send a sanitized response
let mut peers = peers.map_or_else(Vec::new, |peers| peers.sanitized(now));

// Truncate the list
let address_limit = div_ceil(peers.len(), ADDR_RESPONSE_LIMIT_DENOMINATOR);
let address_limit = MAX_ADDRS_IN_MESSAGE.min(address_limit);
peers.truncate(address_limit);

if peers.is_empty() {
// Sometimes we don't know if the peer response will be empty until we've
// sanitized them.
if is_busy {
info!(
"ignoring `Peers` request from remote peer because our address \
book is busy"
);
} else {
debug!(
"ignoring `Peers` request from remote peer because our address \
book has no available peers"
);
}

Ok(zn::Response::Nil)
} else {
Ok(zn::Response::Peers(peers))
}
Ok(response)
}.boxed()
}
zn::Request::BlocksByHash(hashes) => {
Expand Down
98 changes: 98 additions & 0 deletions zebrad/src/components/inbound/cached_peer_addr_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//! Periodically-refreshed GetAddr response for the inbound service.
//!
//! Used to avoid giving out Zebra's entire address book over a short duration.
use std::{
sync::{Mutex, TryLockError},
time::Instant,
};

use super::*;

/// The minimum duration that a `CachedPeerAddrResponse` is considered fresh before the inbound service
/// should get new peer addresses from the address book to send as a `GetAddr` response.
///
/// Cached responses are considered stale and should be cleared after twice this duration.
pub const CACHED_ADDRS_REFRESH_INTERVAL: Duration = Duration::from_secs(10 * 60);

/// Caches and refreshes a partial list of peer addresses to be returned as a `GetAddr` response.
pub struct CachedPeerAddrResponse {
/// A shared list of peer addresses.
address_book: Arc<Mutex<zn::AddressBook>>,

/// An owned list of peer addresses used as a `GetAddr` response.
value: zn::Response,

/// Instant after which `cached_addrs` should be refreshed.
refresh_time: Instant,
}

impl CachedPeerAddrResponse {
/// Creates a new empty [`CachedPeerAddrResponse`].
pub(super) fn new(address_book: Arc<Mutex<AddressBook>>) -> Self {
Self {
address_book,
value: zn::Response::Nil,
refresh_time: Instant::now(),
}
}

pub(super) fn value(&self) -> zn::Response {
self.value.clone()
}

/// Refreshes the `cached_addrs` if the time has past `refresh_time` or the cache is empty
pub(super) fn try_refresh(&mut self) {
let now = Instant::now();

// return early if there are some cached addresses, and they are still fresh
if now < self.refresh_time {
return;
}

let cache_expiry = self.refresh_time + CACHED_ADDRS_REFRESH_INTERVAL;

// try getting a lock on the address book if it's time to refresh the cached addresses
match self
.address_book
.try_lock()
.map(|book| book.fresh_get_addr_response())
{
// Update cached value and refresh_time if there are some gossipable peers in the address book.
//
// Security: this avoids outdated gossiped peers. Outdated Zebra binaries will gradually lose all their peers,
// because those peers refuse to connect to outdated versions. So we don't want those outdated Zebra
// versions to keep gossiping old peer information either.
Ok(peers) if !peers.is_empty() => {
self.refresh_time = now + CACHED_ADDRS_REFRESH_INTERVAL;
self.value = zn::Response::Peers(peers);
}

// Clear the cached response if the time has past the cache expiry time.
Ok(_) if now > cache_expiry => {
self.value = zn::Response::Nil;
}

Err(TryLockError::WouldBlock) if now > cache_expiry => {
warn!("getaddrs response hasn't been refreshed in some time");
self.value = zn::Response::Nil;
}

// Don't update the cached response or refresh time if unable to get new peer addresses
// from the address book and `now` is before the cache expiry.
Ok(_) => {
debug!(
"could not refresh cached response because our address \
book has no available peers"
);
}

Err(TryLockError::WouldBlock) => {}

// Panic if the address book lock is poisoned
Err(TryLockError::Poisoned(_)) => {
panic!("previous thread panicked while holding the address book lock")
}
};
}
}
Loading

0 comments on commit 473be3a

Please sign in to comment.