Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a timeout to the PeerServer event loop. #65

Merged
merged 4 commits into from
Oct 15, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion zebra-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pin-project = "0.4"
indexmap = { version = "1.2", default-features = false }

tokio = "=0.2.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
futures-preview = "=0.3.0-alpha.19"

tracing = "0.1"
tracing-futures = { version = "0.1", features = ["tokio-alpha"], default-features = false }
Expand Down
3 changes: 3 additions & 0 deletions zebra-network/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use std::time::Duration;
// XXX should these constants be split into protocol also?
use crate::protocol::types::*;

/// The timeout for requests made to a remote peer.
pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);

/// We expect to receive a message from a live peer at least once in this time duration.
/// XXX this needs to be synchronized with the ping transmission times.
pub const LIVE_PEER_DURATION: Duration = Duration::from_secs(12);
Expand Down
1 change: 1 addition & 0 deletions zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ where
client_rx: rx,
error_slot: slot,
peer_tx,
request_timer: None,
};

let hooked_peer_rx = peer_rx
Expand Down
114 changes: 60 additions & 54 deletions zebra-network/src/peer/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ use std::sync::{Arc, Mutex};
use failure::Error;
use futures::{
channel::{mpsc, oneshot},
future::{self, Either},
stream::Stream,
};
use tokio::prelude::*;
use tokio::{
prelude::*,
timer::{delay_for, Delay},
};
use tower::Service;

use crate::{
constants,
protocol::{
internal::{Request, Response},
message::Message,
Expand Down Expand Up @@ -43,6 +48,10 @@ pub(super) enum ServerState {
/// The "server" duplex half of a peer connection.
pub struct PeerServer<S, Tx> {
pub(super) state: ServerState,
/// A timeout for a client request. This is stored separately from
/// ServerState so that we can move the future out of it independently of
/// other state handling.
pub(super) request_timer: Option<Delay>,
pub(super) svc: S,
pub(super) client_rx: mpsc::Receiver<ClientRequest>,
/// A slot shared between the PeerServer and PeerClient for storing an error.
Expand Down Expand Up @@ -82,66 +91,60 @@ where
//
// If there is a pending request, we wait only on an incoming peer message, and
// check whether it can be interpreted as a response to the pending request.

use futures::future::FutureExt;
use futures::select;

// This future represents the next message received from the peer.
// It needs to be stored outside of the event loop, so that we can overwrite
// it with the new "next message future" every time we get a new message.
let mut peer_rx_fut = peer_rx.next().fuse();
loop {
match self.state {
// We're awaiting a client request, so listen for both
// client requests and peer messages simultaneously.
ServerState::AwaitingRequest => select! {
req = self.client_rx.next() => {
match req {
Some(req) => self.handle_client_request(req).await,
None => {
trace!("client_rx closed, shutting down");
return;
}
ServerState::AwaitingRequest => {
trace!("awaiting client request or peer message");
match future::select(peer_rx.next(), self.client_rx.next()).await {
Either::Left((None, _)) => {
info!("peer stream closed, shutting down");
return;
}
}
msg = peer_rx_fut => {
peer_rx_fut = peer_rx.next().fuse();
match msg {
None => {
trace!("peer stream closed, shutting down");
return;
}
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => self.handle_message_as_request(msg).await,
// XXX switch back to hard failure when we parse all message types
//Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Err(e)), _)) => error!(%e),
Either::Left((Some(Ok(msg)), _)) => {
self.handle_message_as_request(msg).await
}
Either::Right((None, _)) => {
info!("client stream closed, shutting down");
return;
}
Either::Right((Some(req), _)) => self.handle_client_request(req).await,
}
},
}
// We're awaiting a response to a client request,
// so only listen to peer messages, not further requests.
// so wait on either a peer message, or on a request timeout.
ServerState::AwaitingResponse { .. } => {
let msg = peer_rx_fut.await;
peer_rx_fut = peer_rx.next().fuse();
match msg {
// The peer channel has closed -- no more messages.
// However, we still need to flush pending client requests.
None => self.fail_with(format_err!("peer closed connection").into()),
// We got a peer message but it was malformed.
//Some(Err(e)) => self.fail_with(e.into()),
// XXX remove this when we parse all message types
Some(Err(e)) => {
error!(%e);
trace!("awaiting response to client request");
let timer_ref = self
.request_timer
.as_mut()
.expect("timeout must be set while awaiting response");
match future::select(peer_rx.next(), timer_ref).await {
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
Either::Left((None, _)) => {
self.fail_with(format_err!("peer closed connection").into())
}
// XXX switch back to hard failure when we parse all message types
//Either::Left((Some(Err(e)), _)) => self.fail_with(e.into()),
Either::Left((Some(Err(e)), _)) => error!(%e),
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
Either::Left((Some(Ok(msg)), _)) => {
dconnolly marked this conversation as resolved.
Show resolved Hide resolved
match self.handle_message_as_response(msg) {
None => continue,
Some(msg) => self.handle_message_as_request(msg).await,
}
}
Either::Right(((), _)) => {
trace!("client request timed out");
// Re-matching lets us take ownership of tx
self.state = match self.state {
ServerState::AwaitingResponse(_, tx) => {
let _ = tx.send(Err(format_err!("request timed out").into()));
ServerState::AwaitingRequest
}
_ => panic!("unreachable"),
};
}
// We got a peer message and it was well-formed.
Some(Ok(msg)) => match self.handle_message_as_response(msg) {
None => continue,
Some(msg) => self.handle_message_as_request(msg).await,
},
}
}
// We've failed, but we need to flush all pending client
Expand Down Expand Up @@ -229,7 +232,10 @@ where
AwaitingRequest
}),
} {
Ok(new_state) => self.state = new_state,
Ok(new_state) => {
self.state = new_state;
self.request_timer = Some(delay_for(constants::REQUEST_TIMEOUT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
Err(e) => self.fail_with(e),
}
}
Expand Down
22 changes: 8 additions & 14 deletions zebra-network/src/timestamp_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,15 @@ impl TimestampCollector {

// Construct and then spawn a worker.
let worker = async move {
use futures::select;
use futures::future::{self, Either};
loop {
select! {
_ = shutdown_rx.next() => return,
msg = worker_rx.next() => {
match msg {
Some(event) => {
data2
.lock()
.expect("mutex should be unpoisoned")
.update(event)
}
None => return,
}
}
match future::select(shutdown_rx.next(), worker_rx.next()).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Either::Left((_, _)) => return, // shutdown signal
Either::Right((None, _)) => return, // all workers are gone
Either::Right((Some(event), _)) => data2
.lock()
.expect("mutex should be unpoisoned")
.update(event),
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion zebrad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ serde = { version = "1", features = ["serde_derive"] }
toml = "0.5"

tokio = "=0.2.0-alpha.6"
futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
futures-preview = "=0.3.0-alpha.19"

tracing = "0.1"
tracing-futures = { version = "0.1", features = ["tokio-alpha"], default-features = false }
Expand Down