Skip to content

Commit

Permalink
2. Route peer requests based on missing inventory (#3465)
Browse files Browse the repository at this point in the history
* feat(network): send notfound messages to the inventory registry

* refactor(network): move the inventory filter into an async function

* feat(network): avoid routing requests to peers that are missing inventory

* test(network): advertised routing is independent of numeric address value

* test(network): peer set routes requests to peers not missing that inventory

* test(network): peer set fails requests if all ready peers are missing that inventory

* fix(clippy): needless-borrow in the peer set

* fix(lint): remove redundant trailing commas in macro calls

There is no clippy lint for this, maybe because some macros
are sensitive to trailing commas.
(But not the ones changed in this commit.)

* test(network): check the exact number of inventory peers

* doc(network): explain why we ignore inventory send failures

* docs(network): explain why a channel error is ignored
  • Loading branch information
teor2345 authored Feb 8, 2022
1 parent 1a14baf commit 9be13a4
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 80 deletions.
2 changes: 1 addition & 1 deletion zebra-consensus/src/transaction/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ fn v5_coinbase_transaction_without_enable_spends_flag_passes_validation() {

insert_fake_orchard_shielded_data(&mut transaction);

assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok(),);
assert!(check::coinbase_tx_no_prevout_joinsplit_spend(&transaction).is_ok());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/isolated/tor/tests/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn connect_isolated_run_tor_once_with(network: Network, hostname: String)
// We make the test pass if there are network errors, if we get a valid running service,
// or if we are still waiting for Tor or the handshake.
let outbound_result = outbound_join_handle_timeout.await;
assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_),));
assert!(matches!(outbound_result, Ok(Ok(_)) | Err(_)));

outbound_join_handle.abort();
}
123 changes: 72 additions & 51 deletions zebra-network/src/peer/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use zebra_chain::{
block,
chain_tip::{ChainTip, NoChainTip},
parameters::Network,
serialization::SerializationError,
};

use crate::{
Expand Down Expand Up @@ -917,57 +918,7 @@ where
.then(move |msg| {
let inv_collector = inv_collector.clone();
let span = debug_span!(parent: inv_inner_conn_span.clone(), "inventory_filter");
async move {
if let (Ok(Message::Inv(hashes)), Some(transient_addr)) =
(&msg, connected_addr.get_transient_addr())
{
// We ignore inventory messages with more than one
// block, because they are most likely replies to a
// query, rather than a newly gossiped block.
//
// (We process inventory messages with any number of
// transactions.)
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
//
// Note: zcashd has a bug where it merges queued inv messages of
// the same or different types. Zebra compensates by sending `notfound`
// responses to the inv collector. (#2156, #1768)
//
// (We can't split `inv`s, because that fills the inventory registry
// with useless entries that the whole network has, making it large and slow.)
match hashes.as_slice() {
[hash @ InventoryHash::Block(_)] => {
debug!(?hash, "registering gossiped block inventory for peer");

// The peer set and inv collector use the peer's remote
// address as an identifier
let _ = inv_collector.send(InventoryChange::new_advertised(
*hash,
transient_addr,
));
}
[hashes @ ..] => {
let hashes =
hashes.iter().filter(|hash| hash.unmined_tx_id().is_some());

debug!(
?hashes,
"registering unmined transaction inventory for peer"
);

if let Some(change) = InventoryChange::new_advertised_multi(
hashes,
transient_addr,
) {
let _ = inv_collector.send(change);
}
}
}
}
msg
}
.instrument(span)
register_inventory_status(msg, connected_addr, inv_collector).instrument(span)
})
.boxed();

Expand Down Expand Up @@ -1017,6 +968,76 @@ where
}
}

/// Register any advertised or missing inventory in `msg` for `connected_addr`.
async fn register_inventory_status(
msg: Result<Message, SerializationError>,
connected_addr: ConnectedAddr,
inv_collector: broadcast::Sender<InventoryChange>,
) -> Result<Message, SerializationError> {
match (&msg, connected_addr.get_transient_addr()) {
(Ok(Message::Inv(advertised)), Some(transient_addr)) => {
// We ignore inventory messages with more than one
// block, because they are most likely replies to a
// query, rather than a newly gossiped block.
//
// (We process inventory messages with any number of
// transactions.)
//
// https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html#inventory-monitoring
//
// Note: zcashd has a bug where it merges queued inv messages of
// the same or different types. Zebra compensates by sending `notfound`
// responses to the inv collector. (#2156, #1768)
//
// (We can't split `inv`s, because that fills the inventory registry
// with useless entries that the whole network has, making it large and slow.)
match advertised.as_slice() {
[advertised @ InventoryHash::Block(_)] => {
debug!(
?advertised,
"registering gossiped advertised block inventory for peer"
);

// The peer set and inv collector use the peer's remote
// address as an identifier
// If all receivers have been dropped, `send` returns an error.
// When that happens, Zebra is shutting down, so we want to ignore this error.
let _ = inv_collector
.send(InventoryChange::new_advertised(*advertised, transient_addr));
}
[advertised @ ..] => {
let advertised = advertised
.iter()
.filter(|advertised| advertised.unmined_tx_id().is_some());

debug!(
?advertised,
"registering advertised unmined transaction inventory for peer",
);

if let Some(change) =
InventoryChange::new_advertised_multi(advertised, transient_addr)
{
// Ignore channel errors that should only happen during shutdown.
let _ = inv_collector.send(change);
}
}
}
}

(Ok(Message::NotFound(missing)), Some(transient_addr)) => {
debug!(?missing, "registering missing inventory for peer");

if let Some(change) = InventoryChange::new_missing_multi(missing, transient_addr) {
let _ = inv_collector.send(change);
}
}
_ => {}
}

msg
}

/// Send periodical heartbeats to `server_tx`, and update the peer status through
/// `heartbeat_ts_collector`.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ async fn inv_registry_one_advertised_ok() {
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}

Expand Down Expand Up @@ -87,6 +88,7 @@ async fn inv_registry_one_missing_ok() {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}

/// Check inventory registration for one hash/peer prefers missing over advertised.
Expand Down Expand Up @@ -131,6 +133,7 @@ async fn inv_registry_prefer_missing_order(missing_first: bool) {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
}

/// Check inventory registration for one hash/peer prefers current over previous.
Expand Down Expand Up @@ -179,11 +182,13 @@ async fn inv_registry_prefer_current_order(missing_current: bool) {
inv_registry.missing_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 1);
} else {
assert_eq!(
inv_registry.advertising_peers(test_hash).next(),
Some(&test_peer),
);
assert_eq!(inv_registry.advertising_peers(test_hash).count(), 1);
assert_eq!(inv_registry.missing_peers(test_hash).count(), 0);
}
}
Expand Down
79 changes: 60 additions & 19 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ use crate::{
external::InventoryHash,
internal::{Request, Response},
},
BoxError, Config,
BoxError, Config, PeerError, SharedPeerError,
};

#[cfg(test)]
Expand Down Expand Up @@ -650,17 +650,22 @@ where
fut.map_err(Into::into).boxed()
}

/// Tries to route a request to a peer that advertised that inventory,
/// falling back to P2C if there is no ready peer.
/// Tries to route a request to a ready peer that advertised that inventory,
/// falling back to a ready peer that isn't missing the inventory.
///
/// If all ready peers are missing the inventory,
/// returns a [`NotFound`](PeerError::NotFound) error.
///
/// Uses P2C to route requests to the least loaded peer in each list.
fn route_inv(
&mut self,
req: Request,
hash: InventoryHash,
) -> <Self as tower::Service<Request>>::Future {
let inventory_peer_list = self
let advertising_peer_list = self
.inventory_registry
.advertising_peers(hash)
.filter(|&key| self.ready_services.contains_key(key))
.filter(|&addr| self.ready_services.contains_key(addr))
.copied()
.collect();

Expand All @@ -672,21 +677,57 @@ where
// peers would be able to influence our choice by switching addresses.
// But we need the choice to be random,
// so that a peer can't provide all our inventory responses.
let peer = self.select_p2c_peer_from_list(&inventory_peer_list);

match peer.and_then(|key| self.take_ready_service(&key)) {
Some(mut svc) => {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing based on inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
fut.map_err(Into::into).boxed()
}
None => {
tracing::trace!(?hash, "no ready peer for inventory, falling back to p2c");
self.route_p2c(req)
}
let peer = self.select_p2c_peer_from_list(&advertising_peer_list);

if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing to a peer which advertised inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
return fut.map_err(Into::into).boxed();
}

let missing_peer_list: HashSet<SocketAddr> = self
.inventory_registry
.missing_peers(hash)
.copied()
.collect();
let maybe_peer_list = self
.ready_services
.keys()
.filter(|addr| !missing_peer_list.contains(addr))
.copied()
.collect();

// Security: choose a random, less-loaded peer that might have the inventory.
let peer = self.select_p2c_peer_from_list(&maybe_peer_list);

if let Some(mut svc) = peer.and_then(|key| self.take_ready_service(&key)) {
let peer = peer.expect("just checked peer is Some");
tracing::trace!(?hash, ?peer, "routing to a peer that might have inventory");
let fut = svc.call(req);
self.push_unready(peer, svc);
return fut.map_err(Into::into).boxed();
}

// TODO: reduce this log level after testing #2156 and #2726
tracing::info!(
?hash,
"all ready peers are missing inventory, failing request"
);
async move {
// Let other tasks run, so a retry request might get different ready peers.
tokio::task::yield_now().await;

// # Security
//
// Avoid routing requests to peers that are missing inventory.
// If we kept trying doomed requests, peers that are missing our requested inventory
// could take up a large amount of our bandwidth and retry limits.
Err(SharedPeerError::from(PeerError::NotFound(vec![hash])))
}
.map_err(Into::into)
.boxed()
}

/// Routes the same request to up to `max_peers` ready peers, ignoring return values.
Expand Down
Loading

0 comments on commit 9be13a4

Please sign in to comment.