Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gossip dynamic local listener ports to peers #2277

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions zebra-network/src/address_book.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::Span;

use zebra_chain::serialization::canonical_socket_addr;

use crate::{meta_addr::MetaAddrChange, types::MetaAddr, Config, PeerAddrState};
use crate::{meta_addr::MetaAddrChange, types::MetaAddr, PeerAddrState};

/// A database of peer listener addresses, their advertised services, and
/// information on when they were last seen.
Expand Down Expand Up @@ -89,14 +89,15 @@ pub struct AddressMetrics {

#[allow(clippy::len_without_is_empty)]
impl AddressBook {
/// Construct an `AddressBook` with the given `config` and [`tracing::Span`].
pub fn new(config: &Config, span: Span) -> AddressBook {
/// Construct an [`AddressBook`] with the given `local_listener` and
/// [`tracing::Span`].
pub fn new(local_listener: SocketAddr, span: Span) -> AddressBook {
let constructor_span = span.clone();
let _guard = constructor_span.enter();

let mut new_book = AddressBook {
by_addr: HashMap::default(),
local_listener: canonical_socket_addr(config.listen_addr),
local_listener: canonical_socket_addr(local_listener),
span,
last_address_log: None,
};
Expand All @@ -105,18 +106,18 @@ impl AddressBook {
new_book
}

/// Construct an [`AddressBook`] with the given [`Config`],
/// Construct an [`AddressBook`] with the given `local_listener`,
/// [`tracing::Span`], and addresses.
///
/// This constructor can be used to break address book invariants,
/// so it should only be used in tests.
#[cfg(any(test, feature = "proptest-impl"))]
pub fn new_with_addrs(
config: &Config,
local_listener: SocketAddr,
span: Span,
addrs: impl IntoIterator<Item = MetaAddr>,
) -> AddressBook {
let mut new_book = AddressBook::new(config, span);
let mut new_book = AddressBook::new(local_listener, span);

let addrs = addrs
.into_iter()
Expand All @@ -135,7 +136,7 @@ impl AddressBook {
/// Get the local listener address.
///
/// This address contains minimal state, but it is not sanitized.
pub fn get_local_listener(&self) -> MetaAddr {
pub fn local_listener_meta_addr(&self) -> MetaAddr {
MetaAddr::new_local_listener_change(&self.local_listener)
.into_new_meta_addr()
.expect("unexpected invalid new local listener addr")
Expand All @@ -151,7 +152,7 @@ impl AddressBook {
// Unconditionally add our local listener address to the advertised peers,
// to replace any self-connection failures. The address book and change
// constructors make sure that the SocketAddr is canonical.
let local_listener = self.get_local_listener();
let local_listener = self.local_listener_meta_addr();
peers.insert(local_listener.addr, local_listener);

// Then sanitize and shuffle
Expand Down
6 changes: 5 additions & 1 deletion zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ impl Config {
use futures::stream::StreamExt;

if peers.is_empty() {
error!("no initial peers in the network config. Hint: you must configure at least one peer or DNS seeder to run Zebra");
warn!(
"no initial peers in the network config. \
Hint: you must configure at least one peer IP or DNS seeder to run Zebra, \
or make sure Zebra's listener port gets inbound connections."
);
return HashSet::new();
}

Expand Down
18 changes: 13 additions & 5 deletions zebra-network/src/meta_addr/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
convert::{TryFrom, TryInto},
env,
net::SocketAddr,
str::FromStr,
sync::Arc,
time::Duration,
};
Expand All @@ -26,7 +27,7 @@ use crate::{
},
peer_set::candidate_set::CandidateSet,
protocol::types::PeerServices,
AddressBook, Config,
AddressBook,
};

/// The number of test cases to use for proptest that have verbose failures.
Expand Down Expand Up @@ -275,11 +276,14 @@ proptest! {
) {
zebra_test::init();

let config = Config { listen_addr: local_listener, ..Config::default() };
let address_book = AddressBook::new_with_addrs(&config, Span::none(), address_book_addrs);
let address_book = AddressBook::new_with_addrs(
local_listener,
Span::none(),
address_book_addrs
);
let sanitized_addrs = address_book.sanitized();

let expected_local_listener = address_book.get_local_listener();
let expected_local_listener = address_book.local_listener_meta_addr();
let canonical_local_listener = canonical_socket_addr(local_listener);
let book_sanitized_local_listener = sanitized_addrs.iter().find(|meta_addr| meta_addr.addr == canonical_local_listener );

Expand Down Expand Up @@ -339,7 +343,11 @@ proptest! {
None
};

let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(&Config::default(), Span::none(), addrs)));
let address_book = Arc::new(std::sync::Mutex::new(AddressBook::new_with_addrs(
SocketAddr::from_str("0.0.0.0:0").unwrap(),
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
Span::none(),
addrs,
)));
let peer_service = service_fn(|_| async { unreachable!("Service should not be called") });
let mut candidate_set = CandidateSet::new(address_book.clone(), peer_service);

Expand Down
11 changes: 7 additions & 4 deletions zebra-network/src/peer_set/candidate_set/tests/prop.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{
env,
net::SocketAddr,
str::FromStr,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -16,8 +18,9 @@ use zebra_chain::serialization::DateTime32;

use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::MIN_PEER_CONNECTION_INTERVAL, meta_addr::MetaAddrChange, types::MetaAddr,
AddressBook, BoxError, Config, Request, Response,
constants::MIN_PEER_CONNECTION_INTERVAL,
meta_addr::{MetaAddr, MetaAddrChange},
AddressBook, BoxError, Request, Response,
};

/// The maximum number of candidates for a "next peer" test.
Expand Down Expand Up @@ -67,7 +70,7 @@ proptest! {
});

// Since the address book is empty, there won't be any available peers
let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());

let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);

Expand Down Expand Up @@ -103,7 +106,7 @@ proptest! {
unreachable!("Mock peer service is never used");
});

let mut address_book = AddressBook::new(&Config::default(), Span::none());
let mut address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
address_book.extend(peers);

let mut candidate_set = CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
Expand Down
7 changes: 4 additions & 3 deletions zebra-network/src/peer_set/candidate_set/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
convert::TryInto,
iter,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration as StdDuration,
};
Expand All @@ -23,7 +24,7 @@ use super::super::{validate_addrs, CandidateSet};
use crate::{
constants::{GET_ADDR_FANOUT, MIN_PEER_GET_ADDR_INTERVAL},
types::{MetaAddr, PeerServices},
AddressBook, Config, Request, Response,
AddressBook, Request, Response,
};

/// Test that offset is applied when all addresses have `last_seen` times in the future.
Expand Down Expand Up @@ -144,7 +145,7 @@ fn candidate_set_updates_are_rate_limited() {
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();

let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
Expand Down Expand Up @@ -179,7 +180,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
let runtime = Runtime::new().expect("Failed to create Tokio runtime");
let _guard = runtime.enter();

let address_book = AddressBook::new(&Config::default(), Span::none());
let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
Expand Down
96 changes: 60 additions & 36 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use super::CandidateSet;
use super::PeerSet;
use peer::Client;

#[cfg(test)]
mod tests;

type PeerChange = Result<Change<SocketAddr, peer::Client>, BoxError>;

/// Initialize a peer set.
Expand Down Expand Up @@ -64,7 +67,9 @@ where
S: Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
let (address_book, timestamp_collector) = TimestampCollector::spawn(&config);
let (tcp_listener, listen_addr) = open_listener(&config.clone()).await;

let (address_book, timestamp_collector) = TimestampCollector::spawn(listen_addr);
let (inv_sender, inv_receiver) = broadcast::channel(100);

// Construct services that handle inbound handshakes and perform outbound
Expand Down Expand Up @@ -115,24 +120,8 @@ where
let peer_set = Buffer::new(BoxService::new(peer_set), constants::PEERSET_BUFFER_SIZE);

// 1. Incoming peer connections, via a listener.

// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {
Mainnet => Testnet,
Testnet => Mainnet,
};
if config.listen_addr.port() == wrong_net.default_port() {
warn!(
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
config.listen_addr.port(),
config.network,
wrong_net
);
}

let listen_guard = tokio::spawn(
listen(config.listen_addr, listen_handshaker, peerset_tx.clone())
accept_inbound_connections(tcp_listener, listen_handshaker, peerset_tx.clone())
.instrument(Span::current()),
);

Expand Down Expand Up @@ -232,36 +221,71 @@ where
Ok(())
}

/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
/// Open a peer connection listener on `config.listen_addr`,
/// returning the opened [`TcpListener`], and the address it is bound to.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the `Client` result over `tx`.
#[instrument(skip(tx, handshaker))]
async fn listen<S>(
addr: SocketAddr,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
info!("Trying to open Zcash protocol endpoint at {}...", addr);
let listener_result = TcpListener::bind(addr).await;
/// If the listener is configured to use an automatically chosen port (port `0`),
/// then the returned address will contain the actual port.
///
/// # Panics
///
/// If opening the listener fails.
#[instrument(skip(config), fields(addr = ?config.listen_addr))]
async fn open_listener(config: &Config) -> (TcpListener, SocketAddr) {
// Warn if we're configured using the wrong network port.
use Network::*;
let wrong_net = match config.network {
Mainnet => Testnet,
Testnet => Mainnet,
};
if config.listen_addr.port() == wrong_net.default_port() {
warn!(
"We are configured with port {} for {:?}, but that port is the default port for {:?}",
config.listen_addr.port(),
config.network,
wrong_net
);
}

info!(
"Trying to open Zcash protocol endpoint at {}...",
config.listen_addr
);
let listener_result = TcpListener::bind(config.listen_addr).await;

let listener = match listener_result {
Ok(l) => l,
Err(e) => panic!(
"Opening Zcash network protocol listener {:?} failed: {:?}. \
Hint: Check if another zebrad or zcashd process is running. \
Try changing the network listen_addr in the Zebra config.",
addr, e,
config.listen_addr, e,
),
};

let local_addr = listener.local_addr()?;
let local_addr = listener
.local_addr()
.expect("unexpected missing local addr for open listener");
info!("Opened Zcash protocol endpoint at {}", local_addr);

(listener, local_addr)
}

/// Listens for peer connections on `addr`, then sets up each connection as a
/// Zcash peer.
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`Client`][peer::Client] result over `tx`.
#[instrument(skip(listener, handshaker, tx), fields(listener_addr = ?listener.local_addr()))]
async fn accept_inbound_connections<S>(
listener: TcpListener,
mut handshaker: S,
tx: mpsc::Sender<PeerChange>,
) -> Result<(), BoxError>
where
S: Service<peer::HandshakeRequest, Response = peer::Client, Error = BoxError> + Clone,
S::Future: Send + 'static,
{
loop {
if let Ok((tcp_stream, addr)) = listener.accept().await {
let connected_addr = peer::ConnectedAddr::new_inbound_direct(addr);
Expand Down
3 changes: 3 additions & 0 deletions zebra-network/src/peer_set/initialize/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Tests for zebra-network initialization

mod vectors;
Loading