Skip to content

Commit

Permalink
Merge branch 'main' into zcash-script-v5-tx
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 1, 2022
2 parents 528a35b + 07f120a commit bc1ac4f
Show file tree
Hide file tree
Showing 9 changed files with 558 additions and 111 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ jobs:

- uses: actions-rs/[email protected]
with:
# Pinned to workaround issue making cargo-llvm-cov fail, see
# https://github.com/taiki-e/cargo-llvm-cov/issues/128
# TODO: restore to just `nightly` after it's fixed
toolchain: nightly-2022-01-14
toolchain: nightly
override: true
profile: minimal
components: llvm-tools-preview
Expand Down
3 changes: 2 additions & 1 deletion zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub const OUTBOUND_PEER_BIAS_DENOMINATOR: usize = 2;
/// buffer adds up to 6 seconds worth of blocks to the queue.
pub const PEERSET_BUFFER_SIZE: usize = 3;

/// The timeout for requests made to a remote peer.
/// The timeout for sending a message to a remote peer,
/// and receiving a response from a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(20);

/// The timeout for handshakes when connecting to new peers.
Expand Down
40 changes: 33 additions & 7 deletions zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ use crate::{
constants,
meta_addr::MetaAddr,
peer::{
error::AlreadyErrored, ClientRequestReceiver, ErrorSlot, InProgressClientRequest,
MustUseOneshotSender, PeerError, SharedPeerError,
connection::peer_tx::PeerTx, error::AlreadyErrored, ClientRequest, ClientRequestReceiver,
ConnectedAddr, ErrorSlot, InProgressClientRequest, MustUseOneshotSender, PeerError,
SharedPeerError,
},
peer_set::ConnectionTracker,
protocol::{
Expand All @@ -39,6 +40,8 @@ use crate::{
BoxError,
};

mod peer_tx;

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -437,7 +440,7 @@ impl From<Request> for InboundMessage {
}
}

/// The state associated with a peer connection.
/// The channels, services, and associated state for a peer connection.
pub struct Connection<S, Tx> {
/// The state of this connection's current request or response.
pub(super) state: State,
Expand Down Expand Up @@ -474,9 +477,7 @@ pub struct Connection<S, Tx> {
/// This channel accepts [`Message`]s.
///
/// The corresponding peer message receiver is passed to [`Connection::run`].
///
/// TODO: add a timeout when sending messages to the remote peer (#3234)
pub(super) peer_tx: Tx,
pub(super) peer_tx: PeerTx<Tx>,

/// A connection tracker that reduces the open connection count when dropped.
/// Used to limit the number of open connections in Zebra.
Expand All @@ -498,6 +499,31 @@ pub struct Connection<S, Tx> {
pub(super) last_metrics_state: Option<Cow<'static, str>>,
}

impl<S, Tx> Connection<S, Tx> {
/// Return a new connection from its channels, services, and shared state.
pub(crate) fn new(
inbound_service: S,
client_rx: futures::channel::mpsc::Receiver<ClientRequest>,
error_slot: ErrorSlot,
peer_tx: Tx,
connection_tracker: ConnectionTracker,
connected_addr: ConnectedAddr,
) -> Self {
Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: inbound_service,
client_rx: client_rx.into(),
error_slot,
peer_tx: peer_tx.into(),
connection_tracker,
metrics_label: connected_addr.get_transient_addr_label(),
last_metrics_state: None,
}
}
}

impl<S, Tx> Connection<S, Tx>
where
S: Service<Request, Response = Response, Error = BoxError>,
Expand Down Expand Up @@ -702,7 +728,7 @@ where
}
Either::Left((Either::Right(_), _peer_fut)) => {
trace!(parent: &span, "client request timed out");
let e = PeerError::ClientRequestTimeout;
let e = PeerError::ConnectionReceiveTimeout;

// Replace the state with a temporary value,
// so we can take ownership of the response sender.
Expand Down
37 changes: 37 additions & 0 deletions zebra-network/src/peer/connection/peer_tx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
//! The peer message sender channel.
use futures::{Sink, SinkExt};

use zebra_chain::serialization::SerializationError;

use crate::{constants::REQUEST_TIMEOUT, protocol::external::Message, PeerError};

/// A wrapper type for a peer connection message sender.
///
/// Used to apply a timeout to send messages.
#[derive(Clone, Debug)]
pub struct PeerTx<Tx> {
/// A channel for sending Zcash messages to the connected peer.
///
/// This channel accepts [`Message`]s.
inner: Tx,
}

impl<Tx> PeerTx<Tx>
where
Tx: Sink<Message, Error = SerializationError> + Unpin,
{
/// Sends `msg` on `self.inner`, returning a timeout error if it takes too long.
pub async fn send(&mut self, msg: Message) -> Result<(), PeerError> {
tokio::time::timeout(REQUEST_TIMEOUT, self.inner.send(msg))
.await
.map_err(|_| PeerError::ConnectionSendTimeout)?
.map_err(Into::into)
}
}

impl<Tx> From<Tx> for PeerTx<Tx> {
fn from(tx: Tx) -> Self {
PeerTx { inner: tx }
}
}
37 changes: 17 additions & 20 deletions zebra-network/src/peer/connection/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use zebra_chain::serialization::SerializationError;
use zebra_test::mock_service::MockService;

use crate::{
peer::{
client::ClientRequestReceiver, connection::State, ClientRequest, Connection, ErrorSlot,
},
peer::{ClientRequest, ConnectedAddr, Connection, ErrorSlot},
peer_set::ActiveConnectionCounter,
protocol::external::Message,
Request, Response,
Expand All @@ -23,17 +21,20 @@ mod vectors;
fn new_test_connection<A>() -> (
Connection<
MockService<Request, Response, A>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
SinkMapErr<mpsc::Sender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, A>,
mpsc::UnboundedReceiver<Message>,
mpsc::Receiver<Message>,
ErrorSlot,
) {
let mock_inbound_service = MockService::build().finish();
let (client_tx, client_rx) = mpsc::channel(1);
let (client_tx, client_rx) = mpsc::channel(0);
let shared_error_slot = ErrorSlot::default();
let (peer_outbound_tx, peer_outbound_rx) = mpsc::unbounded();

// Normally the network has more capacity than the sender's single implicit slot,
// but the smaller capacity makes some tests easier.
let (peer_tx, peer_rx) = mpsc::channel(0);

let error_converter: fn(mpsc::SendError) -> SerializationError = |_| {
io::Error::new(
Expand All @@ -42,26 +43,22 @@ fn new_test_connection<A>() -> (
)
.into()
};
let peer_tx = peer_outbound_tx.sink_map_err(error_converter);
let peer_tx = peer_tx.sink_map_err(error_converter);

let connection = Connection {
state: State::AwaitingRequest,
request_timer: None,
cached_addrs: Vec::new(),
svc: mock_inbound_service.clone(),
client_rx: ClientRequestReceiver::from(client_rx),
error_slot: shared_error_slot.clone(),
let connection = Connection::new(
mock_inbound_service.clone(),
client_rx,
shared_error_slot.clone(),
peer_tx,
connection_tracker: ActiveConnectionCounter::new_counter().track_connection(),
metrics_label: "test".to_string(),
last_metrics_state: None,
};
ActiveConnectionCounter::new_counter().track_connection(),
ConnectedAddr::Isolated,
);

(
connection,
client_tx,
mock_inbound_service,
peer_outbound_rx,
peer_rx,
shared_error_slot,
)
}
16 changes: 8 additions & 8 deletions zebra-network/src/peer/connection/tests/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ proptest! {
runtime.block_on(async move {
// The real stream and sink are from a split TCP connection,
// but that doesn't change how the state machine behaves.
let (mut peer_inbound_tx, peer_inbound_rx) = mpsc::channel(1);
let (mut peer_tx, peer_rx) = mpsc::channel(1);

let (
connection,
Expand All @@ -51,7 +51,7 @@ proptest! {
shared_error_slot,
) = new_test_connection();

let connection_task = tokio::spawn(connection.run(peer_inbound_rx));
let connection_task = tokio::spawn(connection.run(peer_rx));

let response_to_first_request = send_block_request(
first_block.hash(),
Expand All @@ -71,13 +71,13 @@ proptest! {
.await;

// Reply to first request
peer_inbound_tx
peer_tx
.send(Ok(Message::Block(first_block)))
.await
.expect("Failed to send response to first block request");

// Reply to second request
peer_inbound_tx
peer_tx
.send(Ok(Message::Block(second_block.clone())))
.await
.expect("Failed to send response to second block request");
Expand All @@ -100,7 +100,7 @@ proptest! {
inbound_service.expect_no_requests().await?;

// Stop the connection thread
mem::drop(peer_inbound_tx);
mem::drop(peer_tx);

let connection_task_result = connection_task.await;
prop_assert!(connection_task_result.is_ok());
Expand All @@ -114,11 +114,11 @@ proptest! {
fn new_test_connection() -> (
Connection<
MockService<Request, Response, PropTestAssertion>,
SinkMapErr<mpsc::UnboundedSender<Message>, fn(mpsc::SendError) -> SerializationError>,
SinkMapErr<mpsc::Sender<Message>, fn(mpsc::SendError) -> SerializationError>,
>,
mpsc::Sender<ClientRequest>,
MockService<Request, Response, PropTestAssertion>,
mpsc::UnboundedReceiver<Message>,
mpsc::Receiver<Message>,
ErrorSlot,
) {
super::new_test_connection()
Expand All @@ -127,7 +127,7 @@ fn new_test_connection() -> (
async fn send_block_request(
block: block::Hash,
client_requests: &mut mpsc::Sender<ClientRequest>,
outbound_messages: &mut mpsc::UnboundedReceiver<Message>,
outbound_messages: &mut mpsc::Receiver<Message>,
) -> oneshot::Receiver<Result<Response, SharedPeerError>> {
let (response_sender, response_receiver) = oneshot::channel();

Expand Down
Loading

0 comments on commit bc1ac4f

Please sign in to comment.