Skip to content

Commit

Permalink
Merge remote-tracking branch 'libp2p/develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Jan 13, 2022
2 parents 22a7052 + 94a93ee commit 1c80ffb
Show file tree
Hide file tree
Showing 14 changed files with 1,076 additions and 490 deletions.
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2
updates:
- package-ecosystem: "cargo"
directory: "/"
schedule:
interval: "daily"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "daily"
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v2.4.0
- name: Build
run: cargo build --verbose
- name: Run tests
Expand Down
30 changes: 30 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
# 0.10.0

- Default to `WindowUpdateMode::OnRead`, thus enabling full Yamux flow-control,
exercising back pressure on senders, preventing stream resets due to reaching
the buffer limit.

See the [`WindowUpdateMode` documentation] for details, especially the section
on deadlocking when sending data larger than the receivers window.

[`WindowUpdateMode` documentation]: https://docs.rs/yamux/0.9.0/yamux/enum.WindowUpdateMode.html

# 0.9.0

- Force-split larger frames, for better interleaving of
reads and writes between different substreams and to avoid
single, large writes. By default frames are capped at, and
thus split at, `16KiB`, which can be adjusted by a new
configuration option, if necessary.

- Send window updates earlier, when half of the window has
been consumed, to minimise pauses due to transmission delays,
particularly if there is just a single dominant substream.

- Avoid possible premature stream resets of streams that
have been properly closed and already dropped but receive
window update or other frames while the remaining buffered
frames are still sent out. Incoming frames for unknown streams
are now ignored, instead of triggering a stream reset for the
remote.

# 0.8.1

- Avoid possible premature stream resets of streams that have been properly
Expand Down
15 changes: 8 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yamux"
version = "0.8.1"
version = "0.10.0"
authors = ["Parity Technologies <[email protected]>"]
license = "Apache-2.0 OR MIT"
description = "Multiplexer over reliable, ordered connections"
Expand All @@ -11,22 +11,23 @@ readme = "README.md"
edition = "2018"

[dependencies]
futures = { version = "0.3.4", default-features = false, features = ["std"] }
futures = { version = "0.3.12", default-features = false, features = ["std"] }
log = "0.4.8"
nohash-hasher = "0.2"
parking_lot = "0.11"
rand = "0.7.2"
rand = "0.8.3"
static_assertions = "1"

[dev-dependencies]
anyhow = "1"
criterion = "0.3"
env_logger = "0.9"
futures = "0.3.4"
quickcheck = "0.9"
tokio = { version = "0.2", features = ["tcp", "rt-threaded", "macros"] }
tokio-util = { version = "0.3", features = ["compat"] }
quickcheck = "1.0"
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
tokio-util = { version = "0.6", features = ["compat"] }
constrained-connection = "0.1"

[[bench]]
name = "concurrent"
harness = false

218 changes: 76 additions & 142 deletions benches/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,16 @@
// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license
// at https://opensource.org/licenses/MIT.

use criterion::{criterion_group, criterion_main, Criterion};
use futures::{channel::mpsc, future, prelude::*, ready};
use std::{fmt, io, pin::Pin, sync::Arc, task::{Context, Poll}};
use constrained_connection::{Endpoint, new_unconstrained_connection, samples};
use criterion::{BenchmarkId, criterion_group, criterion_main, Criterion, Throughput};
use futures::{channel::mpsc, future, prelude::*, io::AsyncReadExt};
use std::sync::Arc;
use tokio::{runtime::Runtime, task};
use yamux::{Config, Connection, Mode};

criterion_group!(benches, concurrent);
criterion_main!(benches);

#[derive(Copy, Clone)]
struct Params { streams: usize, messages: usize }

impl fmt::Debug for Params {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "((streams {}) (messages {}))", self.streams, self.messages)
}
}

#[derive(Debug, Clone)]
struct Bytes(Arc<Vec<u8>>);

Expand All @@ -36,157 +28,99 @@ impl AsRef<[u8]> for Bytes {
}

fn concurrent(c: &mut Criterion) {
let params = &[
Params { streams: 1, messages: 1 },
Params { streams: 10, messages: 1 },
Params { streams: 1, messages: 10 },
Params { streams: 100, messages: 1 },
Params { streams: 1, messages: 100 },
Params { streams: 10, messages: 100 },
Params { streams: 100, messages: 10 }
let data = Bytes(Arc::new(vec![0x42; 4096]));
let networks = vec![
("mobile", (|| samples::mobile_hsdpa().2) as fn() -> (_, _)),
("adsl2+", (|| samples::residential_adsl2().2) as fn() -> (_, _)),
("gbit-lan", (|| samples::gbit_lan().2) as fn() -> (_, _)),
("unconstrained", new_unconstrained_connection as fn() -> (_, _)),
];

let data0 = Bytes(Arc::new(vec![0x42; 4096]));
let data1 = data0.clone();
let data2 = data0.clone();

c.bench_function_over_inputs("one by one", move |b, &&params| {
let data = data1.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), false))
})
},
params);

c.bench_function_over_inputs("all at once", move |b, &&params| {
let data = data2.clone();
let mut rt = Runtime::new().unwrap();
b.iter(move || {
rt.block_on(roundtrip(params.streams, params.messages, data.clone(), true))
})
},
params);
let mut group = c.benchmark_group("concurrent");
group.sample_size(10);

for (network_name, new_connection) in networks.into_iter() {
for nstreams in [1, 10, 100].iter() {
for nmessages in [1, 10, 100].iter() {
let data = data.clone();
let rt = Runtime::new().unwrap();

group.throughput(Throughput::Bytes((nstreams * nmessages * data.0.len()) as u64));
group.bench_function(
BenchmarkId::from_parameter(
format!("{}/#streams{}/#messages{}", network_name, nstreams, nmessages),
),
|b| b.iter(|| {
let (server, client) = new_connection();
rt.block_on(oneway(*nstreams, *nmessages, data.clone(), server, client))
}),
);
}
}
}

group.finish();
}

fn config() -> Config {
let mut c = Config::default();
c.set_window_update_mode(yamux::WindowUpdateMode::OnRead);
c
}

async fn roundtrip(nstreams: usize, nmessages: usize, data: Bytes, send_all: bool) {
async fn oneway(
nstreams: usize,
nmessages: usize,
data: Bytes,
server: Endpoint,
client: Endpoint,
) {
let msg_len = data.0.len();
let (server, client) = Endpoint::new();
let server = server.into_async_read();
let client = client.into_async_read();
let (tx, rx) = mpsc::unbounded();

let server = async move {
yamux::into_stream(Connection::new(server, Config::default(), Mode::Server))
.try_for_each_concurrent(None, |mut stream| async move {
{
let (mut r, mut w) = futures::io::AsyncReadExt::split(&mut stream);
futures::io::copy(&mut r, &mut w).await?;
let mut connection = Connection::new(server, config(), Mode::Server);

while let Some(mut stream) = connection.next_stream().await.unwrap() {
let tx = tx.clone();

task::spawn(async move {
let mut n = 0;
let mut b = vec![0; msg_len];

// Receive `nmessages` messages.
for _ in 0 .. nmessages {
stream.read_exact(&mut b[..]).await.unwrap();
n += b.len();
}
stream.close().await?;
Ok(())
})
.await
.expect("server works")
};

tx.unbounded_send(n).expect("unbounded_send");
stream.close().await.unwrap();
});
}
};
task::spawn(server);

let (tx, rx) = mpsc::unbounded();
let conn = Connection::new(client, Config::default(), Mode::Client);
let conn = Connection::new(client, config(), Mode::Client);
let mut ctrl = conn.control();
task::spawn(yamux::into_stream(conn).for_each(|_| future::ready(())));
task::spawn(yamux::into_stream(conn).for_each(|r| {r.unwrap(); future::ready(())} ));

for _ in 0 .. nstreams {
let data = data.clone();
let tx = tx.clone();
let mut ctrl = ctrl.clone();
task::spawn(async move {
let mut stream = ctrl.open_stream().await?;
if send_all {
// Send `nmessages` messages and receive `nmessages` messages.
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await?
}
stream.close().await?;
let mut n = 0;
let mut b = vec![0; data.0.len()];
loop {
let k = stream.read(&mut b).await?;
if k == 0 { break }
n += k
}
tx.unbounded_send(n).expect("unbounded_send")
} else {
// Send and receive `nmessages` messages.
let mut n = 0;
let mut b = vec![0; data.0.len()];
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await?;
stream.read_exact(&mut b[..]).await?;
n += b.len()
}
stream.close().await?;
tx.unbounded_send(n).expect("unbounded_send");
let mut stream = ctrl.open_stream().await.unwrap();

// Send `nmessages` messages.
for _ in 0 .. nmessages {
stream.write_all(data.as_ref()).await.unwrap();
}
Ok::<(), yamux::ConnectionError>(())

stream.close().await.unwrap();
});
}

let n = rx.take(nstreams).fold(0, |acc, n| future::ready(acc + n)).await;
assert_eq!(n, nstreams * nmessages * msg_len);
ctrl.close().await.expect("close")
}

#[derive(Debug)]
struct Endpoint {
incoming: mpsc::UnboundedReceiver<Vec<u8>>,
outgoing: mpsc::UnboundedSender<Vec<u8>>
}

impl Endpoint {
fn new() -> (Self, Self) {
let (tx_a, rx_a) = mpsc::unbounded();
let (tx_b, rx_b) = mpsc::unbounded();

let a = Endpoint { incoming: rx_a, outgoing: tx_b };
let b = Endpoint { incoming: rx_b, outgoing: tx_a };

(a, b)
}
}

impl Stream for Endpoint {
type Item = Result<Vec<u8>, io::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(b) = ready!(Pin::new(&mut self.incoming).poll_next(cx)) {
return Poll::Ready(Some(Ok(b)))
}
Poll::Pending
}
}

impl AsyncWrite for Endpoint {
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
if ready!(Pin::new(&mut self.outgoing).poll_ready(cx)).is_err() {
return Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into()))
}
let n = buf.len();
if Pin::new(&mut self.outgoing).start_send(Vec::from(buf)).is_err() {
return Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into()))
}
Poll::Ready(Ok(n))
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing)
.poll_flush(cx)
.map_err(|_| io::ErrorKind::ConnectionAborted.into())
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.outgoing)
.poll_close(cx)
.map_err(|_| io::ErrorKind::ConnectionAborted.into())
}
ctrl.close().await.expect("close");
}
18 changes: 10 additions & 8 deletions src/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,34 @@ use std::{collections::VecDeque, io};
/// [`Chunk`] elements.
#[derive(Debug)]
pub(crate) struct Chunks {
seq: VecDeque<Chunk>
seq: VecDeque<Chunk>,
len: usize
}

impl Chunks {
/// A new empty chunk list.
pub(crate) fn new() -> Self {
Chunks { seq: VecDeque::new() }
Chunks { seq: VecDeque::new(), len: 0 }
}

/// The total length of bytes contained in all `Chunk`s.
pub(crate) fn len(&self) -> Option<usize> {
self.seq.iter().fold(Some(0), |total, x| {
total.and_then(|n| n.checked_add(x.len()))
})
/// The total length of bytes yet-to-be-read in all `Chunk`s.
pub(crate) fn len(&self) -> usize {
self.len - self.seq.front().map(|c| c.offset()).unwrap_or(0)
}

/// Add another chunk of bytes to the end.
pub(crate) fn push(&mut self, x: Vec<u8>) {
self.len += x.len();
if !x.is_empty() {
self.seq.push_back(Chunk { cursor: io::Cursor::new(x) })
}
}

/// Remove and return the first chunk.
pub(crate) fn pop(&mut self) -> Option<Chunk> {
self.seq.pop_front()
let chunk = self.seq.pop_front();
self.len -= chunk.as_ref().map(|c| c.len() + c.offset()).unwrap_or(0);
chunk
}

/// Get a mutable reference to the first chunk.
Expand Down
Loading

0 comments on commit 1c80ffb

Please sign in to comment.