Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(uid-mux): pre-allocate streams #32

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 82 additions & 8 deletions uid-mux/src/yamux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This module provides a [`yamux`](https://crates.io/crates/yamux) wrapper which implements [`UidMux`](crate::UidMux).

use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
fmt,
future::IntoFuture,
pin::Pin,
Expand Down Expand Up @@ -58,6 +58,7 @@ pub struct Yamux<Io> {
struct Queue {
waiting: HashMap<InternalId, oneshot::Sender<Stream>>,
ready: HashMap<InternalId, Stream>,
alloc: usize,
waker: Option<Waker>,
}

Expand All @@ -66,6 +67,7 @@ impl Default for Queue {
Self {
waiting: Default::default(),
ready: Default::default(),
alloc: 0,
waker: None,
}
}
Expand Down Expand Up @@ -116,6 +118,7 @@ where
role: self.role,
conn: self.conn,
incoming: Default::default(),
allocated: Default::default(),
outgoing: Default::default(),
queue: self.queue,
closed: false,
Expand All @@ -134,6 +137,8 @@ pub struct YamuxFuture<Io> {
conn: Connection<Io>,
/// Pending incoming streams, waiting for ids to be received.
incoming: FuturesUnordered<ReadId<Stream>>,
/// Streams which have been allocated but not assigned an id.
allocated: VecDeque<Stream>,
/// Pending outgoing streams, waiting to send ids and return streams
/// to callers.
outgoing: FuturesUnordered<ReturnStream<Stream>>,
Expand Down Expand Up @@ -172,19 +177,35 @@ where
// Putting this in a block so the lock is released as soon as possible.
{
let mut queue = self.queue.lock().unwrap();
while !queue.waiting.is_empty() {
if let Poll::Ready(stream) = self.conn.poll_new_outbound(cx)? {
let id = *queue.waiting.keys().next().unwrap();
let sender = queue.waiting.remove(&id).unwrap();

debug!("opened new stream: {}", id);

self.outgoing.push(ReturnStream::new(id, stream, sender));
// Allocate new streams.
while queue.alloc > 0 {
if let Poll::Ready(stream) = self.conn.poll_new_outbound(cx)? {
self.allocated.push_back(stream);
queue.alloc -= 1;
debug!("allocated new stream");
} else {
break;
}
}

while !queue.waiting.is_empty() {
let stream = if let Some(stream) = self.allocated.pop_front() {
stream
} else if let Poll::Ready(stream) = self.conn.poll_new_outbound(cx)? {
stream
} else {
break;
};

let id = *queue.waiting.keys().next().unwrap();
let sender = queue.waiting.remove(&id).unwrap();

debug!("opened new stream: {}", id);

self.outgoing.push(ReturnStream::new(id, stream, sender));
}

// Set the waker so `YamuxCtrl` can wake up the connection.
queue.waker = Some(cx.waker().clone());
}
Expand Down Expand Up @@ -332,6 +353,24 @@ pub struct YamuxCtrl {
}

impl YamuxCtrl {
/// Allocates `count` streams.
///
/// This can be used to efficiently pre-allocate streams prior to assigning ids to them.
///
/// # Note
///
/// This method only has an effect for the client side of the connection.
pub fn alloc(&self, count: usize) {
if let Role::Server = self.role {
warn!("alloc has no effect for server side of connection");
return;
}

let mut queue = self.queue.lock().unwrap();
queue.alloc += count;
queue.waker.as_ref().map(|waker| waker.wake_by_ref());
}

/// Closes the yamux connection.
pub fn close(&self) {
self.shutdown_notify.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -540,4 +579,39 @@ mod tests {
// But caller gets an error.
assert!(fut_open.await.is_err());
}

#[tokio::test]
async fn test_yamux_alloc() {
let (client_io, server_io) = duplex(1024);
let client = Yamux::new(client_io.compat(), Config::default(), Mode::Client);
let server = Yamux::new(server_io.compat(), Config::default(), Mode::Server);

let client_ctrl = client.control();
let server_ctrl = server.control();

let mut fut_client = client.into_future();
let mut fut_server = server.into_future();

client_ctrl.alloc(1);
assert_eq!(fut_client.queue.lock().unwrap().alloc, 1);

let mut fut_conn = futures::future::try_join(&mut fut_client, &mut fut_server);
_ = futures::poll!(&mut fut_conn);
drop(fut_conn);

assert_eq!(fut_client.queue.lock().unwrap().alloc, 0);
assert_eq!(fut_client.allocated.len(), 1);

let fut_open = futures::future::try_join(client_ctrl.open(b"0"), server_ctrl.open(b"0"));

let fut_conn = futures::future::try_join(&mut fut_client, &mut fut_server);

futures::select! {
_ = fut_open.fuse() => {},
_ = fut_conn.fuse() => panic!("connection closed before stream opened"),
}

// Assert that the pre-allocated stream was consumed.
assert!(fut_client.allocated.is_empty());
}
}
Loading