Skip to content

Commit

Permalink
feat(server): remove addrstream struct
Browse files Browse the repository at this point in the history
remove addrstream type, it provides no benefit over tokio::net::tcpstream

closes issue hyperium#2850
  • Loading branch information
oddgrd committed May 24, 2022
1 parent 775fac1 commit 5824538
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 133 deletions.
3 changes: 0 additions & 3 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,6 @@ cfg_feature! {
pub(super) use self::upgrades::UpgradeableConnection;
}

#[cfg(feature = "tcp")]
pub use super::tcp::{AddrIncoming, AddrStream};

/// A lower-level configuration of the HTTP protocol.
///
/// This structure is used to configure options for an HTTP server connection.
Expand Down
135 changes: 5 additions & 130 deletions src/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ use std::io;
use std::net::{SocketAddr, TcpListener as StdTcpListener};
use std::time::Duration;

use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use tokio::time::Sleep;
use tracing::{debug, error, trace};

use crate::common::{task, Future, Pin, Poll};

#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::addr_stream::AddrStream;
use super::accept::Accept;

/// A stream of connections from binding to an address.
Expand Down Expand Up @@ -98,7 +96,7 @@ impl AddrIncoming {
self.sleep_on_errors = val;
}

fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<AddrStream>> {
fn poll_next_(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<TcpStream>> {
// Check if a previous timeout is active that was set by IO errors.
if let Some(ref mut to) = self.timeout {
ready!(Pin::new(to).poll(cx));
Expand All @@ -107,7 +105,7 @@ impl AddrIncoming {

loop {
match ready!(self.listener.poll_accept(cx)) {
Ok((socket, remote_addr)) => {
Ok((socket, _)) => {
if let Some(dur) = self.tcp_keepalive_timeout {
let socket = socket2::SockRef::from(&socket);
let conf = socket2::TcpKeepalive::new().with_time(dur);
Expand All @@ -119,7 +117,7 @@ impl AddrIncoming {
trace!("error trying to set TCP nodelay: {}", e);
}
let local_addr = socket.local_addr()?;
return Poll::Ready(Ok(AddrStream::new(socket, remote_addr, local_addr)));
return Poll::Ready(Ok(socket));
}
Err(e) => {
// Connection errors can be ignored directly, continue by
Expand Down Expand Up @@ -155,7 +153,7 @@ impl AddrIncoming {
}

impl Accept for AddrIncoming {
type Conn = AddrStream;
type Conn = TcpStream;
type Error = io::Error;

fn poll_accept(
Expand Down Expand Up @@ -193,126 +191,3 @@ impl fmt::Debug for AddrIncoming {
.finish()
}
}

mod addr_stream {
use std::io;
use std::net::SocketAddr;
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;

use crate::common::{task, Pin, Poll};

pin_project_lite::pin_project! {
/// A transport returned yieled by `AddrIncoming`.
#[derive(Debug)]
pub struct AddrStream {
#[pin]
inner: TcpStream,
pub(super) remote_addr: SocketAddr,
pub(super) local_addr: SocketAddr
}
}

impl AddrStream {
pub(super) fn new(
tcp: TcpStream,
remote_addr: SocketAddr,
local_addr: SocketAddr,
) -> AddrStream {
AddrStream {
inner: tcp,
remote_addr,
local_addr,
}
}

/// Returns the remote (peer) address of this connection.
#[inline]
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}

/// Returns the local address of this connection.
#[inline]
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}

/// Consumes the AddrStream and returns the underlying IO object
#[inline]
pub fn into_inner(self) -> TcpStream {
self.inner
}

/// Attempt to receive data on the socket, without removing that data
/// from the queue, registering the current task for wakeup if data is
/// not yet available.
pub fn poll_peek(
&mut self,
cx: &mut task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<usize>> {
self.inner.poll_peek(cx, buf)
}
}

impl AsyncRead for AddrStream {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}

impl AsyncWrite for AddrStream {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write(cx, buf)
}

#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
// TCP flush is a noop
Poll::Ready(Ok(()))
}

#[inline]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_shutdown(cx)
}

#[inline]
fn is_write_vectored(&self) -> bool {
// Note that since `self.inner` is a `TcpStream`, this could
// *probably* be hard-coded to return `true`...but it seems more
// correct to ask it anyway (maybe we're on some platform without
// scatter-gather IO?)
self.inner.is_write_vectored()
}
}

#[cfg(unix)]
impl AsRawFd for AddrStream {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
}

0 comments on commit 5824538

Please sign in to comment.