-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
benches: Benchmark with constrained connections #102
Conversation
Previously the benchmarks would use an unbounded channel to simulate a network connection. While this is helpful to measure the CPU impact of Yamux specific changes, it is difficult to estimate how changes might effect network throughput in different environments. The crate `constrained-connection` can help simulate different environments (DSL, HSDPA, GBit LAN, ...) by providing a channel enforcing a specified bandwidth and delay. That said, it does not simulate the OS network stack, routers, buffers, packet loss, ... Thus as one would expect this can be used to detect trends, but not to e.g. determine bandwidth expectations. This commit makes the benchmarks run on top of a subset of the connection types provided by `constrained-connection`. In addition it retains the simulation on top of an unconstrained connection. When simulating with low bandwidth-delay-product connections, yamux can deadlock in the roundtrip scenario. Imagine a client sending a large payload exceeding the bandwidth-delay-product of the connection to a server. The server will try to echo the payload back to the client. Once both client and server exhausted the buffer provided through the bandwidth-delay-product of the connection, neither will be able to make progress as the other won't read from the connection. This commit rewrites the benchmarks, only having the client send to the server, but not the server echoing back to the client.
By "payload" do you mean a single message? This only happens with |
Yes.
As far as I can tell that is not the case. I have attached a unit test below which reproduces a variant of the deadlock. It uses the default
Debugging the issue, I am afraid this is inherent to the design of Off the top of my head I would deem the probability for this edge case (deadlock due to a low capacity link) to be low, especially since it would require the mutual dependency between server and client, one blocking the other. I would expect the OS networking stack to add many more bytes to the capacity of the link due to its various buffers. That said, I am happy to look into ways how to continue reading when blocked on writing a single frame, given that the issue might cause deadlocks in other scenarios as well. Please take the above with a grain of salt. After all I am relatively new to this Yamux implementation. I might as well me missing something. @romanb First off, thank you for the elaborate comment. Does the above make sense to you? I am happy to extend my (already lengthy) reply. Unit test to reproduce deadlock.diff --git a/src/tests.rs b/src/tests.rs
index fc661a6..cbccbb1 100644
--- a/src/tests.rs
+++ b/src/tests.rs
@@ -16,6 +16,14 @@ use rand::Rng;
use std::{fmt::Debug, io, net::{Ipv4Addr, SocketAddr, SocketAddrV4}};
use tokio::{net::{TcpStream, TcpListener}, runtime::Runtime, task};
use tokio_util::compat::{Compat, Tokio02AsyncReadCompatExt};
+use futures::channel::mpsc::{unbounded, UnboundedSender, UnboundedReceiver};
+use std::sync::{Arc, Mutex};
+use std::task::{Context, Poll, Waker};
+use std::pin::Pin;
+use futures::io::AsyncReadExt;
+use futures::future::join;
+use futures::task::Spawn;
+use futures::executor::block_on;
#[test]
fn prop_config_send_recv_single() {
@@ -298,3 +306,190 @@ where
Ok(result)
}
+#[test]
+fn deadlock_low_capacity_link() {
+ let mut pool = futures::executor::LocalPool::new();
+ // Message is 10x smaller than link capacity.
+ let msg = vec![1u8; 1024];
+ // On my machine stalls with > 12.
+ let num_streams = 13;
+ let (tx, rx) = unbounded();
+
+ let (server_endpoint, client_endpoint) = bounded_capacity::Endpoint::new(10 * 1024);
+
+ // Create and spawn a server that echoes every message back to the client.
+ let server = Connection::new(server_endpoint, Config::default(), Mode::Server);
+ pool.spawner().spawn_obj( async move {
+ crate::into_stream(server)
+ .try_for_each_concurrent(None, |mut stream| async move {
+ {
+ let (mut r, mut w) = futures::io::AsyncReadExt::split(&mut stream);
+ futures::io::copy(&mut r, &mut w).await?;
+ }
+ stream.close().await?;
+ Ok(())
+ })
+ .await
+ .expect("server works")
+ }.boxed().into()).unwrap();
+
+ // Create and spawn a client.
+ let client = Connection::new(client_endpoint, Config::default(), Mode::Client);
+ let ctrl = client.control();
+ pool.spawner()
+ .spawn_obj(crate::into_stream(client).for_each(|r| async { r.unwrap(); }).boxed().into())
+ .unwrap();
+
+ // Create `num_streams` streams, sending and receiving `msg` on each.
+ for _ in 0..num_streams {
+ let msg = msg.clone();
+ let mut ctrl = ctrl.clone();
+ let tx = tx.clone();
+ pool.spawner().spawn_obj(async move {
+ let stream = ctrl.open_stream().await.unwrap();
+ let (mut reader, mut writer) = AsyncReadExt::split(stream);
+ let mut b = vec![0; msg.len()];
+ join(
+ writer.write_all(msg.as_ref()).map_err(|e| panic!(e)),
+ reader.read_exact(&mut b[..]).map_err(|e| panic!(e)),
+ ).await;
+ let mut stream = reader.reunite(writer).unwrap();
+ stream.close().await.unwrap();
+ tx.unbounded_send(b.len()).unwrap();
+ }.boxed().into()).unwrap();
+ };
+
+ // Run all tasks until none make any more progress.
+ pool.run_until_stalled();
+ drop(pool);
+ drop(tx);
+
+ // Expect each of the `num_streams` tasks to finish, reporting the amount of bytes they send and
+ // received.
+ let n = block_on(rx.fold(0, |acc, n| future::ready(acc + n)));
+ assert_eq!(n, num_streams * msg.len());
+}
+
+mod bounded_capacity {
+ use super::*;
+ use futures::ready;
+ use std::io::{Error, ErrorKind, Result};
+
+ pub struct Endpoint {
+ sender: UnboundedSender<Vec<u8>>,
+ receiver: UnboundedReceiver<Vec<u8>>,
+ next_item: Option<Vec<u8>>,
+
+ shared_send: Arc<Mutex<Shared>>,
+ shared_receive: Arc<Mutex<Shared>>,
+
+ capacity: usize,
+ }
+
+ #[derive(Default)]
+ struct Shared {
+ size: usize,
+ waker_write: Option<Waker>,
+ }
+
+ impl Endpoint {
+ pub fn new(capacity: usize) -> (Endpoint, Endpoint) {
+ let (a_to_b_sender, a_to_b_receiver) = unbounded();
+ let (b_to_a_sender, b_to_a_receiver) = unbounded();
+
+ let a_to_b_shared = Arc::new(Mutex::new(Default::default()));
+ let b_to_a_shared = Arc::new(Mutex::new(Default::default()));
+
+ let a = Endpoint {
+ sender: a_to_b_sender,
+ receiver: b_to_a_receiver,
+ next_item: None,
+ shared_send: a_to_b_shared.clone(),
+ shared_receive: b_to_a_shared.clone(),
+ capacity,
+ };
+
+ let b = Endpoint {
+ sender: b_to_a_sender,
+ receiver: a_to_b_receiver,
+ next_item: None,
+ shared_send: b_to_a_shared,
+ shared_receive: a_to_b_shared,
+ capacity,
+ };
+
+ (a, b)
+
+ }
+ }
+
+ impl AsyncRead for Endpoint {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<Result<usize>> {
+ let item = match self.next_item.as_mut() {
+ Some(item) => item,
+ None => match ready!(self.receiver.poll_next_unpin(cx)) {
+ Some(item) => {
+ self.next_item = Some(item);
+ self.next_item.as_mut().unwrap()
+ }
+ None => {
+ return Poll::Ready(Ok(0));
+ }
+ },
+ };
+
+ let n = std::cmp::min(buf.len(), item.len());
+
+ buf[0..n].copy_from_slice(&item[0..n]);
+
+ if n < item.len() {
+ *item = item.split_off(n);
+ } else {
+ self.next_item.take().unwrap();
+ }
+
+ let mut shared = self.shared_receive.lock().unwrap();
+ if let Some(waker) = shared.waker_write.take() {
+ waker.wake();
+ }
+
+ debug_assert!(shared.size >= n);
+ shared.size -= n;
+
+ Poll::Ready(Ok(n))
+ }
+ }
+
+ impl AsyncWrite for Endpoint {
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let mut shared = self.shared_send.lock().unwrap();
+ let n = std::cmp::min(self.capacity - shared.size, buf.len());
+ if n == 0 {
+ shared.waker_write = Some(cx.waker().clone());
+ return Poll::Pending;
+ }
+
+ self.sender
+ .unbounded_send(buf[0..n].to_vec())
+ .map_err(|e| Error::new(ErrorKind::ConnectionAborted, e))?;
+
+ shared.size += n;
+
+ Poll::Ready(Ok(n))
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ ready!(self.sender.poll_flush_unpin(cx)).unwrap();
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ ready!(self.sender.poll_close_unpin(cx)).unwrap();
+ Poll::Ready(Ok(()))
+ }
+ }
+} |
I think you're right and I think it can be problematic. I will take a closer look, thanks for the test case. With that being clarified, I don't think it needs to block your changes to the benchmarks here, which generally look good to me. |
Sounds good to me. In case we do end up resolving the deadlock, I will partially revert these changes, benchmarking a round-trip instead of a one-way data transfer. |
A small follow-up to that test: I could not see a deadlock as the reason for it failing with larger numbers of substreams, but rather simply because it was using Nevertheless, regarding deadlocks due to reads waiting for writes to finish on both ends, I think I did manage to trigger a stall with So long story short, this still seems to need further investigation and better reproduction. |
See #104 for a follow-up. |
I don't follow. Isn't the fact that all futures in the pool are pending a proof of a deadlock? I am not aware of any external entities (e.g. timers) being used which could wake either of them up again. |
As far as I can tell, no. You can observe the behaviour by running the modified test I linked to earlier, replacing |
Previously the benchmarks would use an unbounded channel to simulate a
network connection. While this is helpful to measure the CPU impact of
Yamux specific changes, it is difficult to estimate how changes might
effect network throughput in different environments.
The crate
constrained-connection
can help simulate differentenvironments (DSL, HSDPA, GBit LAN, ...) by providing a channel
enforcing a specified bandwidth and delay. That said, it does not
simulate the OS network stack, routers, buffers, packet loss, ... Thus
as one would expect this can be used to detect trends, but not to e.g.
determine bandwidth expectations.
This commit makes the benchmarks run on top of a subset of the
connection types provided by
constrained-connection
. In addition itretains the simulation on top of an unconstrained connection.
When simulating with low bandwidth-delay-product connections, yamux can
deadlock in the roundtrip scenario. Imagine a client sending a large
payload exceeding the bandwidth-delay-product of the connection to a
server. The server will try to echo the payload back to the client. Once
both client and server exhausted the buffer provided through the
bandwidth-delay-product of the connection, neither will be able to make
progress as the other won't read from the connection. This commit
rewrites the benchmarks, only having the client send to the server, but
not the server echoing back to the client.
I hope for these benchmarks to be helpful when measuring the impact of libp2p/rust-libp2p#1849 and #100.
As an example below you can find the
critcmp
output of a run withWindowUpdateMode::OnReceive
andWindowUpdateMode::OnRead
. While the comparison of the two modes does not show a lot of difference, the throughput measurements on single / multi streams do.You can find the bandwidth and the delay for each connection type (mobile, adsl2+, ...) here.