diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index 581d34b731..58f14a2b0e 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -13,7 +13,8 @@ use tokio::runtime::Runtime; use tokio::net::TcpListener; use hyper::{Body, Method, Request, Response}; -use hyper::server::Http; +use hyper::client::HttpConnector; +use hyper::server::conn::Http; #[bench] @@ -21,8 +22,10 @@ fn get_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let client = hyper::Client::configure() - .build_with_executor(&rt.reactor(), rt.executor()); + let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let client = hyper::Client::builder() + .executor(rt.executor()) + .build::<_, Body>(connector); let url: hyper::Uri = format!("http://{}/get", addr).parse().unwrap(); @@ -43,8 +46,10 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let client = hyper::Client::configure() - .build_with_executor(&rt.reactor(), rt.executor()); + let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let client = hyper::Client::builder() + .executor(rt.executor()) + .build::<_, Body>(connector); let url: hyper::Uri = format!("http://{}/post", addr).parse().unwrap(); @@ -71,7 +76,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr { let listener = TcpListener::bind(&addr).unwrap(); let addr = listener.local_addr().unwrap(); - let http = Http::::new(); + let http = Http::new(); let service = const_service(service_fn(|req: Request| { req.into_body() @@ -81,6 +86,7 @@ fn spawn_hello(rt: &mut Runtime) -> SocketAddr { }) })); + // Specifically only accept 1 connection. let srv = listener.incoming() .into_future() .map_err(|(e, _inc)| panic!("accept error: {}", e)) diff --git a/benches/server.rs b/benches/server.rs index 0a8b13cb31..858171099f 100644 --- a/benches/server.rs +++ b/benches/server.rs @@ -14,24 +14,28 @@ use std::sync::mpsc; use futures::{future, stream, Future, Stream}; use futures::sync::oneshot; -use hyper::{Body, Request, Response}; +use hyper::{Body, Request, Response, Server}; use hyper::server::Service; macro_rules! bench_server { ($b:ident, $header:expr, $body:expr) => ({ let _ = pretty_env_logger::try_init(); - let (_until_tx, until_rx) = oneshot::channel(); + let (_until_tx, until_rx) = oneshot::channel::<()>(); let addr = { let (addr_tx, addr_rx) = mpsc::channel(); ::std::thread::spawn(move || { let addr = "127.0.0.1:0".parse().unwrap(); - let srv = hyper::server::Http::new().bind(&addr, || Ok(BenchPayload { - header: $header, - body: $body, - })).unwrap(); - let addr = srv.local_addr().unwrap(); - addr_tx.send(addr).unwrap(); - tokio::run(srv.run_until(until_rx.map_err(|_| ())).map_err(|e| panic!("server error: {}", e))); + let srv = Server::bind(&addr) + .serve(|| Ok(BenchPayload { + header: $header, + body: $body, + })); + addr_tx.send(srv.local_addr()).unwrap(); + let fut = srv + .map_err(|e| panic!("server error: {}", e)) + .select(until_rx.then(|_| Ok(()))) + .then(|_| Ok(())); + tokio::run(fut); }); addr_rx.recv().unwrap() diff --git a/examples/hello.rs b/examples/hello.rs index a9a0c7e533..f3c476cd91 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -5,15 +5,15 @@ extern crate pretty_env_logger; extern crate tokio; use futures::Future; -use futures::future::lazy; use hyper::{Body, Response}; -use hyper::server::{Http, const_service, service_fn}; +use hyper::server::{Server, const_service, service_fn}; static PHRASE: &'static [u8] = b"Hello World!"; fn main() { pretty_env_logger::init(); + let addr = ([127, 0, 0, 1], 3000).into(); let new_service = const_service(service_fn(|_| { @@ -21,13 +21,11 @@ fn main() { Ok::<_, hyper::Error>(Response::new(Body::from(PHRASE))) })); - tokio::run(lazy(move || { - let server = Http::new() - .sleep_on_errors(true) - .bind(&addr, new_service) - .unwrap(); + let server = Server::bind(&addr) + .serve(new_service) + .map_err(|e| eprintln!("server error: {}", e)); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().map_err(|err| eprintln!("Server error {}", err)) - })); + println!("Listening on http://{}", addr); + + tokio::run(server); } diff --git a/examples/multi_server.rs b/examples/multi_server.rs index e25bfcec19..f8989fa334 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -4,11 +4,11 @@ extern crate futures; extern crate pretty_env_logger; extern crate tokio; -use futures::{Future, Stream}; +use futures::{Future}; use futures::future::{FutureResult, lazy}; use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Http, Service}; +use hyper::server::{Server, Service}; static INDEX1: &'static [u8] = b"The 1st service!"; static INDEX2: &'static [u8] = b"The 2nd service!"; @@ -40,25 +40,23 @@ impl Service for Srv { fn main() { pretty_env_logger::init(); - let addr1 = "127.0.0.1:1337".parse().unwrap(); - let addr2 = "127.0.0.1:1338".parse().unwrap(); + + let addr1 = ([127, 0, 0, 1], 1337).into(); + let addr2 = ([127, 0, 0, 1], 1338).into(); tokio::run(lazy(move || { - let srv1 = Http::new().serve_addr(&addr1, || Ok(Srv(INDEX1))).unwrap(); - let srv2 = Http::new().serve_addr(&addr2, || Ok(Srv(INDEX2))).unwrap(); + let srv1 = Server::bind(&addr1) + .serve(|| Ok(Srv(INDEX1))) + .map_err(|e| eprintln!("server 1 error: {}", e)); - println!("Listening on http://{}", srv1.incoming_ref().local_addr()); - println!("Listening on http://{}", srv2.incoming_ref().local_addr()); + let srv2 = Server::bind(&addr2) + .serve(|| Ok(Srv(INDEX2))) + .map_err(|e| eprintln!("server 2 error: {}", e)); - tokio::spawn(srv1.for_each(move |conn| { - tokio::spawn(conn.map_err(|err| println!("srv1 error: {:?}", err))); - Ok(()) - }).map_err(|_| ())); + println!("Listening on http://{} and http://{}", addr1, addr2); - tokio::spawn(srv2.for_each(move |conn| { - tokio::spawn(conn.map_err(|err| println!("srv2 error: {:?}", err))); - Ok(()) - }).map_err(|_| ())); + tokio::spawn(srv1); + tokio::spawn(srv2); Ok(()) })); diff --git a/examples/params.rs b/examples/params.rs index 0ba6f43805..05987a3d57 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -6,10 +6,9 @@ extern crate tokio; extern crate url; use futures::{Future, Stream}; -use futures::future::lazy; use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Http, Service}; +use hyper::server::{Server, Service}; use std::collections::HashMap; use url::form_urlencoded; @@ -96,11 +95,12 @@ impl Service for ParamExample { fn main() { pretty_env_logger::init(); - let addr = "127.0.0.1:1337".parse().unwrap(); - tokio::run(lazy(move || { - let server = Http::new().bind(&addr, || Ok(ParamExample)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().map_err(|err| eprintln!("Server error {}", err)) - })); + let addr = ([127, 0, 0, 1], 1337).into(); + + let server = Server::bind(&addr) + .serve(|| Ok(ParamExample)) + .map_err(|e| eprintln!("server error: {}", e)); + + tokio::run(server); } diff --git a/examples/send_file.rs b/examples/send_file.rs index 05a65d7bbf..222578425d 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -5,11 +5,10 @@ extern crate pretty_env_logger; extern crate tokio; use futures::{Future/*, Sink*/}; -use futures::future::lazy; use futures::sync::oneshot; use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; -use hyper::server::{Http, Service}; +use hyper::server::{Server, Service}; use std::fs::File; use std::io::{self, copy/*, Read*/}; @@ -138,11 +137,14 @@ impl Service for ResponseExamples { fn main() { pretty_env_logger::init(); + let addr = "127.0.0.1:1337".parse().unwrap(); - tokio::run(lazy(move || { - let server = Http::new().bind(&addr, || Ok(ResponseExamples)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().map_err(|err| eprintln!("Server error {}", err)) - })); + let server = Server::bind(&addr) + .serve(|| Ok(ResponseExamples)) + .map_err(|e| eprintln!("server error: {}", e)); + + println!("Listening on http://{}", addr); + + tokio::run(server); } diff --git a/examples/server.rs b/examples/server.rs index b5d2153958..e96d77e705 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -5,10 +5,10 @@ extern crate pretty_env_logger; extern crate tokio; use futures::Future; -use futures::future::{FutureResult, lazy}; +use futures::future::{FutureResult}; use hyper::{Body, Method, Request, Response, StatusCode}; -use hyper::server::{Http, Service}; +use hyper::server::{Server, Service}; static INDEX: &'static [u8] = b"Try POST /echo"; @@ -41,11 +41,14 @@ impl Service for Echo { fn main() { pretty_env_logger::init(); - let addr = "127.0.0.1:1337".parse().unwrap(); - tokio::run(lazy(move || { - let server = Http::new().bind(&addr, || Ok(Echo)).unwrap(); - println!("Listening on http://{} with 1 thread.", server.local_addr().unwrap()); - server.run().map_err(|err| eprintln!("Server error {}", err)) - })); + let addr = ([127, 0, 0, 1], 1337).into(); + + let server = Server::bind(&addr) + .serve(|| Ok(Echo)) + .map_err(|e| eprintln!("server error: {}", e)); + + println!("Listening on http://{}", addr); + + tokio::run(server); } diff --git a/examples/web_api.rs b/examples/web_api.rs index e20bbe4f6f..597449552c 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -9,7 +9,7 @@ use futures::future::lazy; use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode}; use hyper::client::HttpConnector; -use hyper::server::{Http, Service}; +use hyper::server::{Server, Service}; #[allow(unused, deprecated)] use std::ascii::AsciiExt; @@ -75,15 +75,17 @@ impl Service for ResponseExamples { fn main() { pretty_env_logger::init(); + let addr = "127.0.0.1:1337".parse().unwrap(); tokio::run(lazy(move || { let client = Client::new(); - let serve = Http::new().serve_addr(&addr, move || Ok(ResponseExamples(client.clone()))).unwrap(); - println!("Listening on http://{} with 1 thread.", serve.incoming_ref().local_addr()); + let server = Server::bind(&addr) + .serve(move || Ok(ResponseExamples(client.clone()))) + .map_err(|e| eprintln!("server error: {}", e)); + + println!("Listening on http://{}", addr); - serve.map_err(|_| ()).for_each(move |conn| { - tokio::spawn(conn.map_err(|err| println!("serve error: {:?}", err))) - }) + server })); } diff --git a/src/error.rs b/src/error.rs index f1491ad0fb..df8f02c847 100644 --- a/src/error.rs +++ b/src/error.rs @@ -171,12 +171,12 @@ impl Error { Error::new(Kind::Io, Some(cause.into())) } - pub(crate) fn new_listen(err: io::Error) -> Error { - Error::new(Kind::Listen, Some(err.into())) + pub(crate) fn new_listen>(cause: E) -> Error { + Error::new(Kind::Listen, Some(cause.into())) } - pub(crate) fn new_accept(err: io::Error) -> Error { - Error::new(Kind::Accept, Some(Box::new(err))) + pub(crate) fn new_accept>(cause: E) -> Error { + Error::new(Kind::Accept, Some(cause.into())) } pub(crate) fn new_connect>(cause: E) -> Error { diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index e00ffca8aa..a66e54f309 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -10,8 +10,19 @@ use tokio_io::{AsyncRead, AsyncWrite}; use proto::{Http1Transaction, MessageHead}; -const INIT_BUFFER_SIZE: usize = 8192; -pub const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; +/// The initial buffer size allocated before trying to read from IO. +pub(crate) const INIT_BUFFER_SIZE: usize = 8192; + +/// The default maximum read buffer size. If the buffer gets this big and +/// a message is still not complete, a `TooLarge` error is triggered. +// Note: if this changes, update server::conn::Http::max_buf_size docs. +pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100; + +/// The maximum number of distinct `Buf`s to hold in a list before requiring +/// a flush. Only affects when the buffer strategy is to queue buffers. +/// +/// Note that a flush can happen before reaching the maximum. This simply +/// forces a flush if the queue gets this big. const MAX_BUF_LIST_BUFFERS: usize = 16; pub struct Buffered { diff --git a/src/server/conn.rs b/src/server/conn.rs index 6e55da3eca..4d7ac346c0 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -5,19 +5,59 @@ //! are not handled at this level. This module provides the building blocks to //! customize those things externally. //! -//! If don't have need to manage connections yourself, consider using the +//! If you don't have need to manage connections yourself, consider using the //! higher-level [Server](super) API. use std::fmt; +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; use bytes::Bytes; -use futures::{Future, Poll}; -use futures::future::{Either}; +use futures::{Async, Future, Poll, Stream}; +use futures::future::{Either, Executor}; use tokio_io::{AsyncRead, AsyncWrite}; +//TODO: change these tokio:: to sub-crates +use tokio::reactor::Handle; +use common::Exec; use proto; use body::{Body, Payload}; -use super::{HyperService, Request, Response, Service}; +use super::{HyperService, NewService, Request, Response, Service}; + +pub use super::tcp::AddrIncoming; + +/// A lower-level configuration of the HTTP protocol. +/// +/// This structure is used to configure options for an HTTP server connection. +/// +/// If don't have need to manage connections yourself, consider using the +/// higher-level [Server](super) API. +#[derive(Clone, Debug)] +pub struct Http { + exec: Exec, + http2: bool, + keep_alive: bool, + max_buf_size: Option, + pipeline_flush: bool, +} + +/// A stream mapping incoming IOs to new services. +/// +/// Yields `Connection`s that are futures that should be put on a reactor. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct Serve { + incoming: I, + new_service: S, + protocol: Http, +} + +#[must_use = "futures do nothing unless polled"] +#[derive(Debug)] +pub(super) struct SpawnAll { + serve: Serve, +} /// A future binding a connection with a Service. /// @@ -63,6 +103,186 @@ pub struct Parts { _inner: (), } +// ===== impl Http ===== + +impl Http { + /// Creates a new instance of the HTTP protocol, ready to spawn a server or + /// start accepting connections. + pub fn new() -> Http { + Http { + exec: Exec::Default, + http2: false, + keep_alive: true, + max_buf_size: None, + pipeline_flush: false, + } + } + + /// Sets whether HTTP2 is required. + /// + /// Default is false + pub fn http2_only(&mut self, val: bool) -> &mut Self { + self.http2 = val; + self + } + + /// Enables or disables HTTP keep-alive. + /// + /// Default is true. + pub fn keep_alive(&mut self, val: bool) -> &mut Self { + self.keep_alive = val; + self + } + + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + pub fn max_buf_size(&mut self, max: usize) -> &mut Self { + self.max_buf_size = Some(max); + self + } + + /// Aggregates flushes to better support pipelined responses. + /// + /// Experimental, may be have bugs. + /// + /// Default is false. + pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { + self.pipeline_flush = enabled; + self + } + + /// Set the executor used to spawn background tasks. + /// + /// Default uses implicit default (like `tokio::spawn`). + pub fn executor(&mut self, exec: E) -> &mut Self + where + E: Executor + Send>> + Send + Sync + 'static + { + self.exec = Exec::Executor(Arc::new(exec)); + self + } + + /// Bind a connection together with a `Service`. + /// + /// This returns a Future that must be polled in order for HTTP to be + /// driven on the connection. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate hyper; + /// # extern crate tokio; + /// # extern crate tokio_io; + /// # use futures::Future; + /// # use hyper::{Body, Request, Response}; + /// # use hyper::server::{Http, Service}; + /// # use tokio_io::{AsyncRead, AsyncWrite}; + /// # use tokio::reactor::Handle; + /// # fn run(some_io: I, some_service: S) + /// # where + /// # I: AsyncRead + AsyncWrite + Send + 'static, + /// # S: Service, Response=Response, Error=hyper::Error> + Send + 'static, + /// # S::Future: Send + /// # { + /// let http = Http::new(); + /// let conn = http.serve_connection(some_io, some_service); + /// + /// let fut = conn.map_err(|e| { + /// eprintln!("server connection error: {}", e); + /// }); + /// + /// tokio::spawn(fut); + /// # } + /// # fn main() {} + /// ``` + pub fn serve_connection(&self, io: I, service: S) -> Connection + where + S: Service, Response = Response>, + S::Error: Into>, + S::Future: Send + 'static, + Bd: Payload, + I: AsyncRead + AsyncWrite, + { + let either = if !self.http2 { + let mut conn = proto::Conn::new(io); + if !self.keep_alive { + conn.disable_keep_alive(); + } + conn.set_flush_pipeline(self.pipeline_flush); + if let Some(max) = self.max_buf_size { + conn.set_max_buf_size(max); + } + let sd = proto::h1::dispatch::Server::new(service); + Either::A(proto::h1::Dispatcher::new(sd, conn)) + } else { + let h2 = proto::h2::Server::new(io, service, self.exec.clone()); + Either::B(h2) + }; + + Connection { + conn: either, + } + } + + /// Bind the provided `addr` with the default `Handle` and return [`Serve`](Serve). + /// + /// This method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided, creating a new service per + /// connection. + pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> + where + S: NewService, Response=Response>, + S::Error: Into>, + Bd: Payload, + { + let mut incoming = AddrIncoming::new(addr, None)?; + if self.keep_alive { + incoming.set_keepalive(Some(Duration::from_secs(90))); + } + Ok(self.serve_incoming(incoming, new_service)) + } + + /// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve) + /// + /// This method will bind the `addr` provided with a new TCP listener ready + /// to accept connections. Each connection will be processed with the + /// `new_service` object provided, creating a new service per + /// connection. + pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> + where + S: NewService, Response = Response>, + S::Error: Into>, + Bd: Payload, + { + let mut incoming = AddrIncoming::new(addr, Some(handle))?; + if self.keep_alive { + incoming.set_keepalive(Some(Duration::from_secs(90))); + } + Ok(self.serve_incoming(incoming, new_service)) + } + + /// Bind the provided stream of incoming IO objects with a `NewService`. + pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve + where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite, + S: NewService, Response = Response>, + S::Error: Into>, + Bd: Payload, + { + Serve { + incoming: incoming, + new_service: new_service, + protocol: self.clone(), + } + } +} + + // ===== impl Connection ===== impl Connection @@ -154,3 +374,89 @@ where } } +// ===== impl Serve ===== + +impl Serve { + /// Spawn all incoming connections onto the executor in `Http`. + pub(super) fn spawn_all(self) -> SpawnAll { + SpawnAll { + serve: self, + } + } + + /// Get a reference to the incoming stream. + #[inline] + pub fn incoming_ref(&self) -> &I { + &self.incoming + } + + /// Get a mutable reference to the incoming stream. + #[inline] + pub fn incoming_mut(&mut self) -> &mut I { + &mut self.incoming + } +} + +impl Stream for Serve +where + I: Stream, + I::Item: AsyncRead + AsyncWrite, + I::Error: Into>, + S: NewService, Response=Response>, + S::Error: Into>, + ::Future: Send + 'static, + B: Payload, +{ + type Item = Connection; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { + let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?; + Ok(Async::Ready(Some(self.protocol.serve_connection(io, service)))) + } else { + Ok(Async::Ready(None)) + } + } +} + +// ===== impl SpawnAll ===== + +impl SpawnAll { + pub(super) fn local_addr(&self) -> SocketAddr { + self.serve.incoming.local_addr() + } +} + +impl SpawnAll { + pub(super) fn incoming_ref(&self) -> &I { + self.serve.incoming_ref() + } +} + +impl Future for SpawnAll +where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, + S: NewService, Response = Response> + Send + 'static, + S::Error: Into>, + ::Instance: Send, + <::Instance as Service>::Future: Send + 'static, + B: Payload, +{ + type Item = (); + type Error = ::Error; + + fn poll(&mut self) -> Poll { + loop { + if let Some(conn) = try_ready!(self.serve.poll()) { + let fut = conn + .map_err(|err| debug!("conn error: {}", err)); + self.serve.protocol.exec.execute(fut); + } else { + return Ok(Async::Ready(())) + } + } + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index d6ba0909f7..942d87847b 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -2,797 +2,175 @@ //! //! A `Server` is created to listen on a port, parse HTTP requests, and hand //! them off to a `Service`. +//! +//! There are two levels of APIs provide for constructing HTTP servers: +//! +//! - The higher-level [`Server`](Server). +//! - The lower-level [conn](conn) module. pub mod conn; mod service; +mod tcp; use std::fmt; -use std::io; -use std::net::{SocketAddr, TcpListener as StdTcpListener}; -use std::sync::{Arc, Mutex, Weak}; +use std::net::SocketAddr; use std::time::Duration; -use futures::task::{self, Task}; -use futures::future::{self, Either, Executor}; -use futures::{Future, Stream, Poll, Async}; -use futures_timer::Delay; +use futures::{Future, Stream, Poll}; use http::{Request, Response}; use tokio_io::{AsyncRead, AsyncWrite}; -use tokio::spawn; -use tokio::reactor::Handle; -use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; use body::{Body, Payload}; -use common::Exec; -use proto; -use self::addr_stream::AddrStream; +// Renamed `Http` as `Http_` for now so that people upgrading don't see an +// error that `hyper::server::Http` is private... +use self::conn::{Http as Http_, SpawnAll}; use self::hyper_service::HyperService; +use self::tcp::{AddrIncoming}; -pub use self::conn::Connection; pub use self::service::{const_service, service_fn}; -/// A configuration of the HTTP protocol. +/// A listening HTTP server. /// -/// This structure is used to create instances of `Server` or to spawn off tasks -/// which handle a connection to an HTTP server. Each instance of `Http` can be -/// configured with various protocol-level options such as keepalive. -#[derive(Clone, Debug)] -pub struct Http { - exec: Exec, - http2: bool, - keep_alive: bool, - max_buf_size: Option, - pipeline: bool, - sleep_on_errors: bool, +/// `Server` is a `Future` mapping a bound listener with a set of service +/// handlers. It is built using the [`Builder`](Builder), and the future +/// completes when the server has been shutdown. It should be run by an +/// `Executor`. +pub struct Server { + spawn_all: SpawnAll, } -/// An instance of a server created through `Http::bind`. -/// -/// This server is intended as a convenience for creating a TCP listener on an -/// address and then serving TCP connections accepted with the service provided. -pub struct Server { - protocol: Http, - new_service: S, - handle: Handle, - listener: TcpListener, - shutdown_timeout: Duration, -} - -/// A stream mapping incoming IOs to new services. -/// -/// Yields `Connection`s that are futures that should be put on a reactor. -#[must_use = "streams do nothing unless polled"] +/// A builder for a [`Server`](Server). #[derive(Debug)] -pub struct Serve { +pub struct Builder { incoming: I, - new_service: S, - protocol: Http, -} - -/* -#[must_use = "futures do nothing unless polled"] -#[derive(Debug)] -pub struct SpawnAll { - executor: E, - serve: Serve, -} -*/ - -/// A stream of connections from binding to an address. -#[must_use = "streams do nothing unless polled"] -pub struct AddrIncoming { - addr: SocketAddr, - keep_alive_timeout: Option, - listener: TcpListener, - handle: Handle, - sleep_on_errors: bool, - timeout: Option, + protocol: Http_, } -impl fmt::Debug for AddrIncoming { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("AddrIncoming") - .field("addr", &self.addr) - .field("keep_alive_timeout", &self.keep_alive_timeout) - .field("listener", &self.listener) - .field("handle", &self.handle) - .field("sleep_on_errors", &self.sleep_on_errors) - .finish() - } -} - -// ===== impl Http ===== - -impl Http { - /// Creates a new instance of the HTTP protocol, ready to spawn a server or - /// start accepting connections. - pub fn new() -> Http { - Http { - exec: Exec::Default, - http2: false, - keep_alive: true, - max_buf_size: None, - pipeline: false, - sleep_on_errors: false, - } - } - - /// Sets whether HTTP2 is required. - /// - /// Default is false - pub fn http2_only(&mut self, val: bool) -> &mut Self { - self.http2 = val; - self - } - - /// Enables or disables HTTP keep-alive. - /// - /// Default is true. - pub fn keep_alive(&mut self, val: bool) -> &mut Self { - self.keep_alive = val; - self - } - - /// Set the maximum buffer size for the connection. - pub fn max_buf_size(&mut self, max: usize) -> &mut Self { - self.max_buf_size = Some(max); - self - } - - /// Aggregates flushes to better support pipelined responses. - /// - /// Experimental, may be have bugs. - /// - /// Default is false. - pub fn pipeline(&mut self, enabled: bool) -> &mut Self { - self.pipeline = enabled; - self - } - - /// Set the executor used to spawn background tasks. - /// - /// Default uses implicit default (like `tokio::spawn`). - pub fn executor(&mut self, exec: E) -> &mut Self - where - E: Executor + Send>> + Send + Sync + 'static - { - self.exec = Exec::Executor(Arc::new(exec)); - self - } - - /// Swallow connection accept errors. Instead of passing up IO errors when - /// the server is under heavy load the errors will be ignored. Some - /// connection accept errors (like "connection reset") can be ignored, some - /// (like "too many files open") may consume 100% CPU and a timout of 10ms - /// is used in that case. - /// - /// Default is false. - pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self { - self.sleep_on_errors = enabled; - self - } - - /// Bind the provided `addr` and return a server ready to handle - /// connections. - /// - /// This method will bind the `addr` provided with a new TCP listener ready - /// to accept connections. Each connection will be processed with the - /// `new_service` object provided as well, creating a new service per - /// connection. - /// - /// The returned `Server` contains one method, `run`, which is used to - /// actually run the server. - pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> - where - S: NewService, Response=Response> + 'static, - S::Error: Into>, - Bd: Payload, - { - let handle = Handle::current(); - let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; - let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; - - Ok(Server { - new_service: new_service, - handle: handle, - listener: listener, - protocol: self.clone(), - shutdown_timeout: Duration::new(1, 0), - }) - } - - /// Bind the provided `addr` and return a server with the default `Handle`. - /// - /// This is method will bind the `addr` provided with a new TCP listener ready - /// to accept connections. Each connection will be processed with the - /// `new_service` object provided as well, creating a new service per - /// connection. - pub fn serve_addr(&self, addr: &SocketAddr, new_service: S) -> ::Result> - where - S: NewService, Response=Response>, - S::Error: Into>, - Bd: Payload, - { - let handle = Handle::current(); - let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; - let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; - let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?; - if self.keep_alive { - incoming.set_keepalive(Some(Duration::from_secs(90))); - } - Ok(self.serve_incoming(incoming, new_service)) - } - - /// Bind the provided `addr` and return a server with a shared `Core`. - /// - /// This method allows the ability to share a `Core` with multiple servers. - /// - /// This is method will bind the `addr` provided with a new TCP listener ready - /// to accept connections. Each connection will be processed with the - /// `new_service` object provided as well, creating a new service per - /// connection. - pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> - where - S: NewService, Response = Response>, - S::Error: Into>, - Bd: Payload, - { - let std_listener = StdTcpListener::bind(addr).map_err(::Error::new_listen)?; - let listener = TcpListener::from_std(std_listener, &handle).map_err(::Error::new_listen)?; - let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors).map_err(::Error::new_listen)?; - - if self.keep_alive { - incoming.set_keepalive(Some(Duration::from_secs(90))); - } - Ok(self.serve_incoming(incoming, new_service)) - } +// ===== impl Server ===== - /// Bind the provided stream of incoming IO objects with a `NewService`. - /// - /// This method allows the ability to share a `Core` with multiple servers. - pub fn serve_incoming(&self, incoming: I, new_service: S) -> Serve - where - I: Stream, - I::Item: AsyncRead + AsyncWrite, - S: NewService, Response = Response>, - S::Error: Into>, - Bd: Payload, - { - Serve { - incoming: incoming, - new_service: new_service, - protocol: self.clone(), +impl Server { + /// Starts a [`Builder`](Builder) with the provided incoming stream. + pub fn builder(incoming: I) -> Builder { + Builder { + incoming, + protocol: Http_::new(), } } +} - /// Bind a connection together with a Service. - /// - /// This returns a Future that must be polled in order for HTTP to be - /// driven on the connection. - /// - /// # Example +impl Server { + /// Binds to the provided address, and returns a [`Builder`](Builder). /// - /// ``` - /// # extern crate futures; - /// # extern crate hyper; - /// # extern crate tokio; - /// # extern crate tokio_io; - /// # use futures::Future; - /// # use hyper::{Body, Request, Response}; - /// # use hyper::server::{Http, Service}; - /// # use tokio_io::{AsyncRead, AsyncWrite}; - /// # use tokio::reactor::Handle; - /// # fn run(some_io: I, some_service: S) - /// # where - /// # I: AsyncRead + AsyncWrite + Send + 'static, - /// # S: Service, Response=Response, Error=hyper::Error> + Send + 'static, - /// # S::Future: Send - /// # { - /// let http = Http::new(); - /// let conn = http.serve_connection(some_io, some_service); + /// # Panics /// - /// let fut = conn - /// .map_err(|e| eprintln!("server connection error: {}", e)); - /// - /// tokio::spawn(fut); - /// # } - /// # fn main() {} - /// ``` - pub fn serve_connection(&self, io: I, service: S) -> Connection - where - S: Service, Response = Response>, - S::Error: Into>, - S::Future: Send + 'static, - Bd: Payload, - I: AsyncRead + AsyncWrite, - { - let either = if !self.http2 { - let mut conn = proto::Conn::new(io); - if !self.keep_alive { - conn.disable_keep_alive(); - } - conn.set_flush_pipeline(self.pipeline); - if let Some(max) = self.max_buf_size { - conn.set_max_buf_size(max); - } - let sd = proto::h1::dispatch::Server::new(service); - Either::A(proto::h1::Dispatcher::new(sd, conn)) - } else { - let h2 = proto::h2::Server::new(io, service, self.exec.clone()); - Either::B(h2) - }; - - Connection { - conn: either, - } + /// This method will panic if binding to the address fails. For a method + /// to bind to an address and return a `Result`, see `Server::try_bind`. + pub fn bind(addr: &SocketAddr) -> Builder { + let incoming = AddrIncoming::new(addr, None) + .unwrap_or_else(|e| { + panic!("error binding to {}: {}", addr, e); + }); + Server::builder(incoming) } -} - - -// ===== impl Server ===== - -/// TODO: add docs -pub struct Run(Box + Send + 'static>); - -impl fmt::Debug for Run { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Run").finish() + /// Tries to bind to the provided address, and returns a [`Builder`](Builder). + pub fn try_bind(addr: &SocketAddr) -> ::Result> { + AddrIncoming::new(addr, None) + .map(Server::builder) } } -impl Future for Run { - type Item = (); - type Error = ::Error; - - fn poll(&mut self) -> Poll<(), ::Error> { - self.0.poll() +impl Server { + /// Returns the local address that this server is bound to. + pub fn local_addr(&self) -> SocketAddr { + self.spawn_all.local_addr() } } - -impl Server +impl Future for Server where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, S: NewService, Response = Response> + Send + 'static, S::Error: Into>, ::Instance: Send, <::Instance as Service>::Future: Send + 'static, - B: Payload + Send + 'static, - B::Data: Send, -{ - /// Returns the local address that this server is bound to. - pub fn local_addr(&self) -> ::Result { - //TODO: this shouldn't return an error at all, but should get the - //local_addr at construction - self.listener.local_addr().map_err(::Error::new_io) - } - - /// Configure the amount of time this server will wait for a "graceful - /// shutdown". - /// - /// This is the amount of time after the shutdown signal is received the - /// server will wait for all pending connections to finish. If the timeout - /// elapses then the server will be forcibly shut down. - /// - /// This defaults to 1s. - pub fn shutdown_timeout(&mut self, timeout: Duration) -> &mut Self { - self.shutdown_timeout = timeout; - self - } - - /// Execute this server infinitely. - /// - /// This method does not currently return, but it will return an error if - /// one occurs. - pub fn run(self) -> Run { - self.run_until(future::empty()) - } - - /// Execute this server until the given future, `shutdown_signal`, resolves. - /// - /// This method, like `run` above, is used to execute this HTTP server. The - /// difference with `run`, however, is that this method allows for shutdown - /// in a graceful fashion. The future provided is interpreted as a signal to - /// shut down the server when it resolves. - /// - /// This method will block the current thread executing the HTTP server. - /// When the `shutdown_signal` has resolved then the TCP listener will be - /// unbound (dropped). The thread will continue to block for a maximum of - /// `shutdown_timeout` time waiting for active connections to shut down. - /// Once the `shutdown_timeout` elapses or all active connections are - /// cleaned out then this method will return. - pub fn run_until(self, shutdown_signal: F) -> Run - where F: Future + Send + 'static, - { - let Server { protocol, new_service, handle, listener, shutdown_timeout } = self; - - let mut incoming = match AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors) { - Ok(incoming) => incoming, - Err(err) => return Run(Box::new(future::err(::Error::new_listen(err)))), - }; - - if protocol.keep_alive { - incoming.set_keepalive(Some(Duration::from_secs(90))); - } - - // Mini future to track the number of active services - let info = Arc::new(Mutex::new(Info { - active: 0, - blocker: None, - })); - - // Future for our server's execution - let info_cloned = info.clone(); - let srv = incoming.for_each(move |socket| { - let addr = socket.remote_addr; - debug!("accepted new connection ({})", addr); - - let service = new_service.new_service()?; - let s = NotifyService { - inner: service, - info: Arc::downgrade(&info_cloned), - }; - info_cloned.lock().unwrap().active += 1; - let fut = protocol.serve_connection(socket, s) - .map(|_| ()) - .map_err(move |err| error!("server connection error: ({}) {}", addr, err)); - spawn(fut); - Ok(()) - }); - - // for now, we don't care if the shutdown signal succeeds or errors - // as long as it resolves, we will shutdown. - let shutdown_signal = shutdown_signal.then(|_| Ok(())); - - // Main execution of the server. Here we use `select` to wait for either - // `incoming` or `f` to resolve. We know that `incoming` will never - // resolve with a success (it's infinite) so we're actually just waiting - // for an error or for `f`, our shutdown signal. - // - // When we get a shutdown signal (`Ok`) then we drop the TCP listener to - // stop accepting incoming connections. - let main_execution = shutdown_signal.select(srv).then(move |result| { - match result { - Ok(((), _incoming)) => {}, - Err((e, _other)) => return future::Either::A(future::err(::Error::new_accept(e))), - } - - // Ok we've stopped accepting new connections at this point, but we want - // to give existing connections a chance to clear themselves out. Wait - // at most `shutdown_timeout` time before we just return clearing - // everything out. - // - // Our custom `WaitUntilZero` will resolve once all services constructed - // here have been destroyed. - let timeout = Delay::new(shutdown_timeout); - let wait = WaitUntilZero { info: info.clone() }; - future::Either::B(wait.select(timeout).then(|result| { - match result { - Ok(_) => Ok(()), - //TODO: error variant should be "timed out waiting for graceful shutdown" - Err((e, _)) => Err(::Error::new_io(e)) - } - })) - }); - - Run(Box::new(main_execution)) - } -} - -impl fmt::Debug for Server -{ - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Server") - .field("listener", &self.listener) - .field("new_service", &self.new_service) - .field("protocol", &self.protocol) - .finish() - } -} - -// ===== impl Serve ===== - -impl Serve { - /* - /// Spawn all incoming connections onto the provide executor. - pub fn spawn_all(self, executor: E) -> SpawnAll { - SpawnAll { - executor: executor, - serve: self, - } - } - */ - - /// Get a reference to the incoming stream. - #[inline] - pub fn incoming_ref(&self) -> &I { - &self.incoming - } -} - -impl Stream for Serve -where - I: Stream, - I::Item: AsyncRead + AsyncWrite, - S: NewService, Response=Response>, - S::Error: Into>, - ::Future: Send + 'static, B: Payload, -{ - type Item = Connection; - type Error = ::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) { - let service = self.new_service.new_service().map_err(::Error::new_user_new_service)?; - Ok(Async::Ready(Some(self.protocol.serve_connection(io, service)))) - } else { - Ok(Async::Ready(None)) - } - } -} - -// ===== impl SpawnAll ===== - -/* -impl Future for SpawnAll -where - I: Stream, - I::Item: AsyncRead + AsyncWrite, - S: NewService, Response=Response, Error=::Error>, - B: Stream, - B::Item: AsRef<[u8]>, - //E: Executor>, { type Item = (); type Error = ::Error; fn poll(&mut self) -> Poll { - loop { - if let Some(conn) = try_ready!(self.serve.poll()) { - let fut = conn - .map(|_| ()) - .map_err(|err| debug!("conn error: {}", err)); - match self.executor.execute(fut) { - Ok(()) => (), - Err(err) => match err.kind() { - ExecuteErrorKind::NoCapacity => { - debug!("SpawnAll::poll; executor no capacity"); - // continue loop - }, - ExecuteErrorKind::Shutdown | _ => { - debug!("SpawnAll::poll; executor shutdown"); - return Ok(Async::Ready(())) - } - } - } - } else { - return Ok(Async::Ready(())) - } - } + self.spawn_all.poll() } } -*/ - -// ===== impl AddrIncoming ===== -impl AddrIncoming { - fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result { - Ok(AddrIncoming { - addr: listener.local_addr()?, - keep_alive_timeout: None, - listener: listener, - handle: handle, - sleep_on_errors: sleep_on_errors, - timeout: None, - }) - } - - /// Get the local address bound to this listener. - pub fn local_addr(&self) -> SocketAddr { - self.addr - } - - fn set_keepalive(&mut self, dur: Option) { - self.keep_alive_timeout = dur; - } - - /* - fn set_sleep_on_errors(&mut self, val: bool) { - self.sleep_on_errors = val; - } - */ -} - -impl Stream for AddrIncoming { - // currently unnameable... - type Item = AddrStream; - type Error = ::std::io::Error; - - fn poll(&mut self) -> Poll, Self::Error> { - // Check if a previous timeout is active that was set by IO errors. - if let Some(ref mut to) = self.timeout { - match to.poll().expect("timeout never fails") { - Async::Ready(_) => {} - Async::NotReady => return Ok(Async::NotReady), - } - } - self.timeout = None; - loop { - match self.listener.poll_accept() { - Ok(Async::Ready((socket, addr))) => { - if let Some(dur) = self.keep_alive_timeout { - if let Err(e) = socket.set_keepalive(Some(dur)) { - trace!("error trying to set TCP keepalive: {}", e); - } - } - return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); - }, - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(ref e) if self.sleep_on_errors => { - // Connection errors can be ignored directly, continue by - // accepting the next request. - if connection_error(e) { - continue; - } - // Sleep 10ms. - let delay = ::std::time::Duration::from_millis(10); - debug!("accept error: {}; sleeping {:?}", - e, delay); - let mut timeout = Delay::new(delay); - let result = timeout.poll() - .expect("timeout never fails"); - match result { - Async::Ready(()) => continue, - Async::NotReady => { - self.timeout = Some(timeout); - return Ok(Async::NotReady); - } - } - }, - Err(e) => return Err(e), - } - } +impl fmt::Debug for Server { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Server") + .field("listener", &self.spawn_all.incoming_ref()) + .finish() } } -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused || - e.kind() == io::ErrorKind::ConnectionAborted || - e.kind() == io::ErrorKind::ConnectionReset -} - -mod addr_stream { - use std::io::{self, Read, Write}; - use std::net::SocketAddr; - use bytes::{Buf, BufMut}; - use futures::Poll; - use tokio::net::TcpStream; - use tokio_io::{AsyncRead, AsyncWrite}; - - - #[derive(Debug)] - pub struct AddrStream { - inner: TcpStream, - pub(super) remote_addr: SocketAddr, - } - - impl AddrStream { - pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream { - AddrStream { - inner: tcp, - remote_addr: addr, - } - } - } +// ===== impl Builder ===== - impl Read for AddrStream { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.inner.read(buf) - } - } - - impl Write for AddrStream { - #[inline] - fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(buf) - } - - #[inline] - fn flush(&mut self ) -> io::Result<()> { - self.inner.flush() +impl Builder { + /// Start a new builder, wrapping an incoming stream and low-level options. + /// + /// For a more convenient constructor, see [`Server::bind`](Server::bind). + pub fn new(incoming: I, protocol: Http_) -> Self { + Builder { + incoming, + protocol, } } - impl AsyncRead for AddrStream { - #[inline] - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } - - #[inline] - fn read_buf(&mut self, buf: &mut B) -> Poll { - self.inner.read_buf(buf) - } + /// Sets whether HTTP/2 is required. + /// + /// Default is `false`. + pub fn http2_only(mut self, val: bool) -> Self { + self.protocol.http2_only(val); + self } - impl AsyncWrite for AddrStream { - #[inline] - fn shutdown(&mut self) -> Poll<(), io::Error> { - AsyncWrite::shutdown(&mut self.inner) - } - - #[inline] - fn write_buf(&mut self, buf: &mut B) -> Poll { - self.inner.write_buf(buf) + /// Consume this `Builder`, creating a [`Server`](Server). + pub fn serve(self, new_service: S) -> Server + where + I: Stream, + I::Error: Into>, + I::Item: AsyncRead + AsyncWrite + Send + 'static, + S: NewService, Response = Response>, + S::Error: Into>, + ::Instance: Send, + <::Instance as Service>::Future: Send + 'static, + B: Payload, + { + let serve = self.protocol.serve_incoming(self.incoming, new_service); + let spawn_all = serve.spawn_all(); + Server { + spawn_all, } } } -// ===== NotifyService ===== - -struct NotifyService { - inner: S, - info: Weak>, -} - -struct WaitUntilZero { - info: Arc>, -} - -struct Info { - active: usize, - blocker: Option, -} - -impl Service for NotifyService { - type Request = S::Request; - type Response = S::Response; - type Error = S::Error; - type Future = S::Future; - - fn call(&self, message: Self::Request) -> Self::Future { - self.inner.call(message) - } -} - -impl Drop for NotifyService { - fn drop(&mut self) { - let info = match self.info.upgrade() { - Some(info) => info, - None => return, - }; - let mut info = info.lock().unwrap(); - info.active -= 1; - if info.active == 0 { - if let Some(task) = info.blocker.take() { - task.notify(); - } - } +impl Builder { + /// Set whether TCP keepalive messages are enabled on accepted connections. + /// + /// If `None` is specified, keepalive is disabled, otherwise the duration + /// specified will be the time to remain idle before sending TCP keepalive + /// probes. + pub fn tcp_keepalive(mut self, keepalive: Option) -> Self { + self.incoming.set_keepalive(keepalive); + self } -} - -impl Future for WaitUntilZero { - type Item = (); - type Error = io::Error; - fn poll(&mut self) -> Poll<(), io::Error> { - let mut info = self.info.lock().unwrap(); - if info.active == 0 { - Ok(().into()) - } else { - info.blocker = Some(task::current()); - Ok(Async::NotReady) - } + /// Set the value of `TCP_NODELAY` option for accepted connections. + pub fn tcp_nodelay(mut self, enabled: bool) -> Self { + self.incoming.set_nodelay(enabled); + self } } diff --git a/src/server/tcp.rs b/src/server/tcp.rs new file mode 100644 index 0000000000..c573f55f35 --- /dev/null +++ b/src/server/tcp.rs @@ -0,0 +1,234 @@ +use std::fmt; +use std::io; +use std::net::{SocketAddr, TcpListener as StdTcpListener}; +use std::time::Duration; + +use futures::{Async, Future, Poll, Stream}; +use futures_timer::Delay; +//TODO: change to tokio_tcp::net::TcpListener +use tokio::net::TcpListener; +use tokio::reactor::Handle; + +use self::addr_stream::AddrStream; + +/// A stream of connections from binding to an address. +#[must_use = "streams do nothing unless polled"] +pub struct AddrIncoming { + addr: SocketAddr, + listener: TcpListener, + sleep_on_errors: bool, + tcp_keepalive_timeout: Option, + tcp_nodelay: bool, + timeout: Option, +} + +impl AddrIncoming { + pub(super) fn new(addr: &SocketAddr, handle: Option<&Handle>) -> ::Result { + let listener = if let Some(handle) = handle { + let std_listener = StdTcpListener::bind(addr) + .map_err(::Error::new_listen)?; + TcpListener::from_std(std_listener, handle) + .map_err(::Error::new_listen)? + } else { + TcpListener::bind(addr).map_err(::Error::new_listen)? + }; + + let addr = listener.local_addr().map_err(::Error::new_listen)?; + + Ok(AddrIncoming { + addr: addr, + listener: listener, + sleep_on_errors: true, + tcp_keepalive_timeout: None, + tcp_nodelay: false, + timeout: None, + }) + } + + /// Get the local address bound to this listener. + pub fn local_addr(&self) -> SocketAddr { + self.addr + } + + /// Set whether TCP keepalive messages are enabled on accepted connections. + /// + /// If `None` is specified, keepalive is disabled, otherwise the duration + /// specified will be the time to remain idle before sending TCP keepalive + /// probes. + pub fn set_keepalive(&mut self, keepalive: Option) -> &mut Self { + self.tcp_keepalive_timeout = keepalive; + self + } + + /// Set the value of `TCP_NODELAY` option for accepted connections. + pub fn set_nodelay(&mut self, enabled: bool) -> &mut Self { + self.tcp_nodelay = enabled; + self + } + + /// Set whether to sleep on accept errors. + /// + /// A possible scenario is that the process has hit the max open files + /// allowed, and so trying to accept a new connection will fail with + /// `EMFILE`. In some cases, it's preferable to just wait for some time, if + /// the application will likely close some files (or connections), and try + /// to accept the connection again. If this option is `true`, the error + /// will be logged at the `error` level, since it is still a big deal, + /// and then the listener will sleep for 1 second. + /// + /// In other cases, hitting the max open files should be treat similarly + /// to being out-of-memory, and simply error (and shutdown). Setting + /// this option to `false` will allow that. + /// + /// Default is `true`. + pub fn set_sleep_on_errors(&mut self, val: bool) { + self.sleep_on_errors = val; + } +} + +impl Stream for AddrIncoming { + // currently unnameable... + type Item = AddrStream; + type Error = ::std::io::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + // Check if a previous timeout is active that was set by IO errors. + if let Some(ref mut to) = self.timeout { + match to.poll().expect("timeout never fails") { + Async::Ready(_) => {} + Async::NotReady => return Ok(Async::NotReady), + } + } + self.timeout = None; + loop { + match self.listener.poll_accept() { + Ok(Async::Ready((socket, addr))) => { + if let Some(dur) = self.tcp_keepalive_timeout { + if let Err(e) = socket.set_keepalive(Some(dur)) { + trace!("error trying to set TCP keepalive: {}", e); + } + } + if let Err(e) = socket.set_nodelay(self.tcp_nodelay) { + trace!("error trying to set TCP nodelay: {}", e); + } + return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); + }, + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(ref e) if self.sleep_on_errors => { + // Connection errors can be ignored directly, continue by + // accepting the next request. + if is_connection_error(e) { + debug!("accepted connection already errored: {}", e); + continue; + } + // Sleep 1s. + let delay = Duration::from_secs(1); + error!("accept error: {}", e); + let mut timeout = Delay::new(delay); + let result = timeout.poll() + .expect("timeout never fails"); + match result { + Async::Ready(()) => continue, + Async::NotReady => { + self.timeout = Some(timeout); + return Ok(Async::NotReady); + } + } + }, + Err(e) => return Err(e), + } + } + } +} + +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` is performed. +/// The timeout is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn is_connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused || + e.kind() == io::ErrorKind::ConnectionAborted || + e.kind() == io::ErrorKind::ConnectionReset +} + +impl fmt::Debug for AddrIncoming { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("AddrIncoming") + .field("addr", &self.addr) + .field("sleep_on_errors", &self.sleep_on_errors) + .field("tcp_keepalive_timeout", &self.tcp_keepalive_timeout) + .field("tcp_nodelay", &self.tcp_nodelay) + .finish() + } +} + +mod addr_stream { + use std::io::{self, Read, Write}; + use std::net::SocketAddr; + use bytes::{Buf, BufMut}; + use futures::Poll; + use tokio::net::TcpStream; + use tokio_io::{AsyncRead, AsyncWrite}; + + + #[derive(Debug)] + pub struct AddrStream { + inner: TcpStream, + pub(super) remote_addr: SocketAddr, + } + + impl AddrStream { + pub(super) fn new(tcp: TcpStream, addr: SocketAddr) -> AddrStream { + AddrStream { + inner: tcp, + remote_addr: addr, + } + } + } + + impl Read for AddrStream { + #[inline] + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } + } + + impl Write for AddrStream { + #[inline] + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + + #[inline] + fn flush(&mut self ) -> io::Result<()> { + self.inner.flush() + } + } + + impl AsyncRead for AddrStream { + #[inline] + unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { + self.inner.prepare_uninitialized_buffer(buf) + } + + #[inline] + fn read_buf(&mut self, buf: &mut B) -> Poll { + self.inner.read_buf(buf) + } + } + + impl AsyncWrite for AddrStream { + #[inline] + fn shutdown(&mut self) -> Poll<(), io::Error> { + AsyncWrite::shutdown(&mut self.inner) + } + + #[inline] + fn write_buf(&mut self, buf: &mut B) -> Poll { + self.inner.write_buf(buf) + } + } +} diff --git a/tests/integration.rs b/tests/integration.rs index ea65712cd9..8b9553f052 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -1,3 +1,4 @@ +#![deny(warnings)] #[macro_use] mod support; use self::support::*; diff --git a/tests/server.rs b/tests/server.rs index 0cc22aa2d4..a1ac357459 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -24,7 +24,6 @@ use futures::future::{self, FutureResult, Either}; use futures::sync::oneshot; use futures_timer::Delay; use http::header::{HeaderName, HeaderValue}; -//use net2::TcpBuilder; use tokio::net::TcpListener; use tokio::runtime::Runtime; use tokio::reactor::Handle; @@ -32,7 +31,8 @@ use tokio_io::{AsyncRead, AsyncWrite}; use hyper::{Body, Request, Response, StatusCode}; -use hyper::server::{Http, Service, NewService, service_fn}; +use hyper::server::{Service, NewService, service_fn}; +use hyper::server::conn::Http; fn tcp_bind(addr: &SocketAddr, handle: &Handle) -> ::tokio::io::Result { let std_listener = StdTcpListener::bind(addr).unwrap(); @@ -1363,8 +1363,9 @@ fn serve_with_options(options: ServeOptions) -> Serve { let (msg_tx, msg_rx) = mpsc::channel(); let (reply_tx, reply_rx) = spmc::channel(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let shutdown_rx = shutdown_rx.then(|_| Ok(())); - let addr = "127.0.0.1:0".parse().unwrap(); + let addr = ([127, 0, 0, 1], 0).into(); let keep_alive = !options.keep_alive_disabled; let pipeline = options.pipeline; @@ -1372,22 +1373,40 @@ fn serve_with_options(options: ServeOptions) -> Serve { let thread_name = format!("test-server-{:?}", dur); let thread = thread::Builder::new().name(thread_name).spawn(move || { - tokio::run(::futures::future::lazy(move || { - let srv = Http::new() - .keep_alive(keep_alive) - .pipeline(pipeline) - .bind(&addr, TestService { - tx: Arc::new(Mutex::new(msg_tx.clone())), - _timeout: dur, - reply: reply_rx, - }).unwrap(); - addr_tx.send(srv.local_addr().unwrap()).unwrap(); - srv.run_until(shutdown_rx.then(|_| Ok(()))) - .map_err(|err| println!("error {}", err)) - })) - }).unwrap(); + let serve = Http::new() + .keep_alive(keep_alive) + .pipeline_flush(pipeline) + .serve_addr(&addr, TestService { + tx: Arc::new(Mutex::new(msg_tx.clone())), + _timeout: dur, + reply: reply_rx, + }) + .expect("bind to address"); + + addr_tx.send( + serve + .incoming_ref() + .local_addr() + ).expect("server addr tx"); + + // spawn_all() is private for now, so just duplicate it here + let spawn_all = serve.for_each(|conn| { + tokio::spawn(conn.map_err(|e| { + println!("server error: {}", e); + })); + Ok(()) + }).map_err(|e| { + println!("accept error: {}", e) + }); + + let fut = spawn_all + .select(shutdown_rx) + .then(|_| Ok(())); + + tokio::run(fut); + }).expect("thread spawn"); - let addr = addr_rx.recv().unwrap(); + let addr = addr_rx.recv().expect("server addr rx"); Serve { msg_rx: msg_rx, diff --git a/tests/support/mod.rs b/tests/support/mod.rs index decfe43df2..507aa9f184 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -123,9 +123,11 @@ macro_rules! __internal_req_res_prop { ); (headers: $map:tt) => ({ #[allow(unused_mut)] + { let mut headers = HeaderMap::new(); __internal_headers!(headers, $map); headers + } }); ($prop_name:ident: $prop_val:expr) => ( From::from($prop_val) @@ -228,7 +230,7 @@ pub fn __run_test(cfg: __TestConfig) { }); let new_service = hyper::server::const_service(service); - let serve = hyper::server::Http::new() + let serve = hyper::server::conn::Http::new() .http2_only(cfg.server_version == 2) .executor(rt.executor()) .serve_addr_handle( @@ -244,7 +246,7 @@ pub fn __run_test(cfg: __TestConfig) { let (success_tx, success_rx) = oneshot::channel(); let expected_connections = cfg.connections; let server = serve - .fold(0, move |cnt, conn: hyper::server::Connection<_, _>| { + .fold(0, move |cnt, conn| { exe.spawn(conn.map_err(|e| panic!("server connection error: {}", e))); Ok::<_, hyper::Error>(cnt + 1) })