Skip to content

Commit

Permalink
fix(net): Clean up licensing, closure move, log typos, tracing spans (
Browse files Browse the repository at this point in the history
#6995)

* Remove a redundant outbound connector timeout

* Fix panics in inbound connection handshaker

* Refactor to simplify FuturesUnordered types

* Make licensing notes consistent

* Delete redundant `move` in closures

* Fix a log typo

* Add some missing tracing spans
  • Loading branch information
teor2345 authored Jun 19, 2023
1 parent 40d697a commit ad7af3e
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 45 deletions.
6 changes: 5 additions & 1 deletion zebra-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ description = "Networking code for Zebra"
#
# This licence is deliberately different to the rest of Zebra.
#
# zebra-network/src/peer_set/set.rs was modified from a 2019 version of:
# Some code in:
# zebra-network/src/peer_set/set.rs
# zebra-network/src/peer_set/unready_service.rs
# zebra-network/src/peer_set/initialize.rs
# was modified from a 2019 version of:
# https://github.com/tower-rs/tower/tree/master/tower/src/balance/p2c/service.rs
license = "MIT"
repository = "https://github.com/ZcashFoundation/zebra"
Expand Down
57 changes: 32 additions & 25 deletions zebra-network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use indexmap::IndexSet;
use serde::{de, Deserialize, Deserializer};
use tempfile::NamedTempFile;
use tokio::{fs, io::AsyncWriteExt};
use tracing::Span;

use zebra_chain::parameters::Network;

Expand Down Expand Up @@ -493,12 +494,15 @@ impl Config {

// Create the temporary file.
// Do blocking filesystem operations on a dedicated thread.
let span = Span::current();
let tmp_peer_cache_file = tokio::task::spawn_blocking(move || {
// Put the temporary file in the same directory as the permanent file,
// so atomic filesystem operations are possible.
tempfile::Builder::new()
.prefix(&tmp_peer_cache_prefix)
.tempfile_in(peer_cache_dir)
span.in_scope(move || {
// Put the temporary file in the same directory as the permanent file,
// so atomic filesystem operations are possible.
tempfile::Builder::new()
.prefix(&tmp_peer_cache_prefix)
.tempfile_in(peer_cache_dir)
})
})
.await
.expect("unexpected panic creating temporary peer cache file")?;
Expand All @@ -514,31 +518,34 @@ impl Config {

// Atomically replace the current cache with the temporary cache.
// Do blocking filesystem operations on a dedicated thread.
let span = Span::current();
tokio::task::spawn_blocking(move || {
let result = tmp_peer_cache_file.persist(&peer_cache_file);
span.in_scope(move || {
let result = tmp_peer_cache_file.persist(&peer_cache_file);

// Drops the temp file if needed
match result {
Ok(_temp_file) => {
info!(
cached_ip_count = ?peer_list.len(),
?peer_cache_file,
"updated cached peer IP addresses"
);

for ip in &peer_list {
metrics::counter!(
"zcash.net.peers.cache",
1,
"cache" => peer_cache_file.display().to_string(),
"remote_ip" => ip.to_string()
// Drops the temp file if needed
match result {
Ok(_temp_file) => {
info!(
cached_ip_count = ?peer_list.len(),
?peer_cache_file,
"updated cached peer IP addresses"
);
}

Ok(())
for ip in &peer_list {
metrics::counter!(
"zcash.net.peers.cache",
1,
"cache" => peer_cache_file.display().to_string(),
"remote_ip" => ip.to_string()
);
}

Ok(())
}
Err(error) => Err(error.error),
}
Err(error) => Err(error.error),
}
})
})
.await
.expect("unexpected panic making temporary peer cache file permanent")
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer_cache_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
};

/// An ongoing task that regularly caches the current `address_book` to disk, based on `config`.
#[instrument(skip(config, address_book))]
pub async fn peer_cache_updater(
config: Config,
address_book: Arc<Mutex<AddressBook>>,
Expand Down
32 changes: 16 additions & 16 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! A peer set whose size is dynamically determined by resource constraints.
// Portions of this submodule were adapted from tower-balance,
// which is (c) 2019 Tower Contributors (MIT licensed).
//!
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
//!
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
use std::{
collections::{BTreeMap, HashSet},
Expand Down Expand Up @@ -614,7 +615,7 @@ where
peerset_tx.clone(),
)
.await?
.map(move |res| match res {
.map(|res| match res {
Ok(()) => (),
Err(e @ JoinError { .. }) => {
if e.is_panic() {
Expand All @@ -632,12 +633,12 @@ where
HANDSHAKE_TIMEOUT + Duration::from_millis(500),
handshake_task,
)
.map(move |res| match res {
.map(|res| match res {
Ok(()) => (),
Err(_e @ Elapsed { .. }) => {
info!(
"timeout in spawned accept_inbound_handshake() task: \
inner task should have timeout out already"
inner task should have timed out already"
);
}
});
Expand Down Expand Up @@ -677,6 +678,9 @@ where
///
/// Uses `handshaker` to perform a Zcash network protocol handshake, and sends
/// the [`peer::Client`] result over `peerset_tx`.
//
// TODO: when we support inbound proxies, distinguish between proxied listeners and
// direct listeners in the span generated by this instrument macro
#[instrument(skip(handshaker, tcp_stream, connection_tracker, peerset_tx))]
async fn accept_inbound_handshake<S>(
addr: PeerSocketAddr,
Expand All @@ -701,8 +705,6 @@ where
//
// This await is okay because the handshaker's `poll_ready` method always returns Ready.
handshaker.ready().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);

// Construct a handshake future but do not drive it yet....
let handshake = handshaker.call(HandshakeRequest {
Expand All @@ -724,7 +726,7 @@ where
debug!(?handshake_result, "error handshaking with inbound peer");
}
}
.instrument(handshaker_span),
.in_current_span(),
);

Ok(handshake_task)
Expand Down Expand Up @@ -924,8 +926,8 @@ where

Ok(DemandCrawlFinished)
}
})
.map(move |res| match res {
}.in_current_span())
.map(|res| match res {
Ok(crawler_action) => crawler_action,
Err(e @ JoinError {..}) => {
if e.is_panic() {
Expand All @@ -936,8 +938,7 @@ where
// Just fake it
Ok(HandshakeFinished)
}
})
.in_current_span();
});

handshakes.push(Box::pin(handshake_or_crawl_handle));
}
Expand All @@ -954,7 +955,7 @@ where
crawl(candidates, demand_tx).await?;

Ok(TimerCrawlFinished)
})
}.in_current_span())
.map(move |res| match res {
Ok(crawler_action) => crawler_action,
Err(e @ JoinError {..}) => {
Expand All @@ -966,8 +967,7 @@ where
// Just fake it
Ok(TimerCrawlFinished)
}
})
.in_current_span();
});

handshakes.push(Box::pin(crawl_handle));
}
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer_set/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! # Implementation
//!
//! The [`PeerSet`] implementation is adapted from the one in [tower::Balance][tower-balance].
//!
//! As described in Tower's documentation, it:
//!
//! > Distributes requests across inner services using the [Power of Two Choices][p2c].
Expand Down
9 changes: 6 additions & 3 deletions zebra-network/src/peer_set/unready_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/// Services that are busy or newly created.
///
/// Adapted from tower-balance.
//! Services that are busy or newly created.
//!
//! The [`UnreadyService`] implementation is adapted from the one in [tower::Balance][tower-balance].
//!
//! [tower-balance]: https://github.com/tower-rs/tower/tree/master/tower/src/balance
use std::{
future::Future,
marker::PhantomData,
Expand Down

0 comments on commit ad7af3e

Please sign in to comment.