Skip to content

Commit

Permalink
consolidate shared quic trait methods and distinguish open errors fro…
Browse files Browse the repository at this point in the history
…m accept errors (#173)

Co-authored-by: ruben <[email protected]>
  • Loading branch information
FlorianUekermann and Ruben2424 authored Jun 1, 2024
1 parent 34bf403 commit 793bd17
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 59 deletions.
48 changes: 27 additions & 21 deletions h3-quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,14 @@ impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type BidiStream = BidiStream<B>;
type OpenStreams = OpenStreams;
type Error = ConnectionError;
type AcceptError = ConnectionError;

fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) {
Some(x) => x?,
None => return Poll::Ready(Ok(None)),
Expand All @@ -174,18 +172,35 @@ where
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) {
Some(x) => x?,
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
opening_bi: None,
opening_uni: None,
}
}
}

impl<B> quic::OpenStreams<B> for Connection
where
B: Buf,
{
type SendStream = SendStream<B>;
type BidiStream = BidiStream<B>;
type OpenError = ConnectionError;

fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
if self.opening_bi.is_none() {
self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
Some((conn.clone().open_bi().await, conn))
Expand All @@ -196,14 +211,14 @@ where
ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
Poll::Ready(Ok(Self::BidiStream {
send: Self::SendStream::new(send),
recv: Self::RecvStream::new(recv),
recv: RecvStream::new(recv),
}))
}

fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
if self.opening_uni.is_none() {
self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
Some((conn.open_uni().await, conn))
Expand All @@ -214,14 +229,6 @@ where
Poll::Ready(Ok(Self::SendStream::new(send)))
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
opening_bi: None,
opening_uni: None,
}
}

fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
self.conn.close(
VarInt::from_u64(code.value()).expect("error code VarInt"),
Expand Down Expand Up @@ -278,15 +285,14 @@ impl<B> quic::OpenStreams<B> for OpenStreams
where
B: Buf,
{
type RecvStream = RecvStream;
type SendStream = SendStream<B>;
type BidiStream = BidiStream<B>;
type Error = ConnectionError;
type OpenError = ConnectionError;

fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
if self.opening_bi.is_none() {
self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
Some((conn.open_bi().await, conn))
Expand All @@ -297,14 +303,14 @@ where
ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
Poll::Ready(Ok(Self::BidiStream {
send: Self::SendStream::new(send),
recv: Self::RecvStream::new(recv),
recv: RecvStream::new(recv),
}))
}

fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
if self.opening_uni.is_none() {
self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
Some((conn.open_uni().await, conn))
Expand Down
4 changes: 2 additions & 2 deletions h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,13 @@ where

/// Streams are opened, but the initial webtransport header has not been sent
type PendingStreams<C, B> = (
BidiStream<<C as quic::Connection<B>>::BidiStream, B>,
BidiStream<<C as quic::OpenStreams<B>>::BidiStream, B>,
WriteBuf<&'static [u8]>,
);

/// Streams are opened, but the initial webtransport header has not been sent
type PendingUniStreams<C, B> = (
SendStream<<C as quic::Connection<B>>::SendStream, B>,
SendStream<<C as quic::OpenStreams<B>>::SendStream, B>,
WriteBuf<&'static [u8]>,
);

Expand Down
44 changes: 9 additions & 35 deletions h3/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,56 +31,32 @@ impl<'a, E: Error + 'a> From<E> for Box<dyn Error + 'a> {
}

/// Trait representing a QUIC connection.
pub trait Connection<B: Buf> {
/// The type produced by `poll_accept_bidi()`
type BidiStream: SendStream<B> + RecvStream;
/// The type of the sending part of `BidiStream`
type SendStream: SendStream<B>;
pub trait Connection<B: Buf>: OpenStreams<B> {
/// The type produced by `poll_accept_recv()`
type RecvStream: RecvStream;
/// A producer of outgoing Unidirectional and Bidirectional streams.
type OpenStreams: OpenStreams<
B,
SendStream = Self::SendStream,
RecvStream = Self::RecvStream,
BidiStream = Self::BidiStream,
>;
/// Error type yielded by this trait methods
type Error: Into<Box<dyn Error>>;
type OpenStreams: OpenStreams<B, SendStream = Self::SendStream, BidiStream = Self::BidiStream>;
/// Error type yielded by these trait methods
type AcceptError: Into<Box<dyn Error>>;

/// Accept an incoming unidirectional stream
///
/// Returning `None` implies the connection is closing or closed.
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>>;
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>>;

/// Accept an incoming bidirectional stream
///
/// Returning `None` implies the connection is closing or closed.
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>>;

/// Poll the connection to create a new bidirectional stream.
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>>;

/// Poll the connection to create a new unidirectional stream.
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>>;
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>>;

/// Get an object to open outgoing streams.
fn opener(&self) -> Self::OpenStreams;

/// Close the connection immediately
fn close(&mut self, code: crate::error::Code, reason: &[u8]);
}

/// Extends the `Connection` trait for sending datagrams
Expand Down Expand Up @@ -116,22 +92,20 @@ pub trait OpenStreams<B: Buf> {
type BidiStream: SendStream<B> + RecvStream;
/// The type produced by `poll_open_send()`
type SendStream: SendStream<B>;
/// The type of the receiving part of `BidiStream`
type RecvStream: RecvStream;
/// Error type yielded by these trait methods
type Error: Into<Box<dyn Error>>;
type OpenError: Into<Box<dyn Error>>;

/// Poll the connection to create a new bidirectional stream.
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>>;
) -> Poll<Result<Self::BidiStream, Self::OpenError>>;

/// Poll the connection to create a new unidirectional stream.
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>>;
) -> Poll<Result<Self::SendStream, Self::OpenError>>;

/// Close the connection immediately
fn close(&mut self, code: crate::error::Code, reason: &[u8]);
Expand Down
2 changes: 1 addition & 1 deletion h3/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
//! async fn doc<C>(conn: C)
//! where
//! C: h3::quic::Connection<bytes::Bytes>,
//! <C as h3::quic::Connection<bytes::Bytes>>::BidiStream: Send + 'static
//! <C as h3::quic::OpenStreams<bytes::Bytes>>::BidiStream: Send + 'static
//! {
//! let mut server_builder = h3::server::builder();
//! // Build the Connection
Expand Down

0 comments on commit 793bd17

Please sign in to comment.