From 76198f63d73019311cc09d870b8397e46d8c0f2d Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 4 Jan 2019 11:42:33 -0800 Subject: [PATCH] Provide optional features on tokio crate (#808) Disabling all features means the only dependency is `futures`. Relevant pieces of the API can then be enabled with the following features: - `codec` - `fs` - `io` - `reactor` - `tcp` - `timer` - `udp` - `uds` This also introduces the beginnings of enabling only certain pieces of the `Runtime`. As a start, the entire default runtime API is enabled via the `rt-full` feature. --- .travis.yml | 16 + Cargo.toml | 91 ++-- src/io.rs | 1 + src/lib.rs | 46 +- src/net.rs | 12 +- src/prelude.rs | 1 + src/runtime/current_thread/mod.rs | 15 + src/runtime/mod.rs | 401 +----------------- src/runtime/{ => threadpool}/builder.rs | 2 +- src/runtime/threadpool/mod.rs | 395 +++++++++++++++++ src/runtime/{ => threadpool}/shutdown.rs | 2 +- src/runtime/{ => threadpool}/task_executor.rs | 0 src/util/future.rs | 6 + src/util/stream.rs | 4 + 14 files changed, 557 insertions(+), 435 deletions(-) rename src/runtime/{ => threadpool}/builder.rs (99%) create mode 100644 src/runtime/threadpool/mod.rs rename src/runtime/{ => threadpool}/shutdown.rs (97%) rename src/runtime/{ => threadpool}/task_executor.rs (100%) diff --git a/.travis.yml b/.travis.yml index 55689e2ae82..47aede415c6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,22 @@ matrix: - env: TARGET=i686-unknown-freebsd - env: TARGET=i686-unknown-linux-gnu + # Test combinations of enabled features. + - rust: stable + script: | + shopt -s expand_aliases + alias check="cargo check --no-default-features" + check + check --features codec + check --features fs + check --features io + check --features reactor + check --features rt-full + check --features tcp + check --features timer + check --features udp + check --features uds + # Test the async / await preview. We don't want to block PRs on this failing # though. - rust: nightly diff --git a/Cargo.toml b/Cargo.toml index 819b99bb091..15343975a1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,35 @@ members = [ ] [features] +default = [ + "codec", + "fs", + "io", + "reactor", + "rt-full", + "tcp", + "timer", + "udp", + "uds", +] + +codec = ["tokio-codec"] +fs = ["tokio-fs"] +io = ["bytes", "tokio-io"] +reactor = ["io", "mio", "tokio-reactor"] +rt-full = [ + "num_cpus", + "reactor", + "timer", + "tokio-current-thread", + "tokio-executor", + "tokio-threadpool", +] +tcp = ["tokio-tcp"] +timer = ["tokio-timer"] +udp = ["tokio-udp"] +uds = ["tokio-uds"] + # This feature comes with no promise of stability. Things will # break with each patch release. Use at your own risk. async-await-preview = [ @@ -54,29 +83,31 @@ travis-ci = { repository = "tokio-rs/tokio" } appveyor = { repository = "carllerche/tokio", id = "s83yxhy9qeb58va7" } [dependencies] -bytes = "0.4" -num_cpus = "1.8.0" -tokio-codec = { version = "0.1.0", path = "tokio-codec" } -tokio-current-thread = { version = "0.1.3", path = "tokio-current-thread" } -tokio-io = { version = "0.1.6", path = "tokio-io" } -tokio-executor = { version = "0.1.5", path = "tokio-executor" } -tokio-reactor = { version = "0.1.1", path = "tokio-reactor" } -tokio-threadpool = { version = "0.1.4", path = "tokio-threadpool" } -tokio-tcp = { version = "0.1.0", path = "tokio-tcp" } -tokio-udp = { version = "0.1.0", path = "tokio-udp" } -tokio-timer = { version = "0.2.8", path = "tokio-timer" } -tokio-fs = { version = "0.1.3", path = "tokio-fs" } - +# Only non-optional dependency... futures = "0.1.20" +# Everything else is optional... +bytes = { version = "0.4", optional = true } +num_cpus = { version = "1.8.0", optional = true } +tokio-codec = { version = "0.1.0", path = "tokio-codec", optional = true } +tokio-current-thread = { version = "0.1.3", path = "tokio-current-thread", optional = true } +tokio-fs = { version = "0.1.3", path = "tokio-fs", optional = true } +tokio-io = { version = "0.1.6", path = "tokio-io", optional = true } +tokio-executor = { version = "0.1.5", path = "tokio-executor", optional = true } +tokio-reactor = { version = "0.1.1", path = "tokio-reactor", optional = true } +tokio-threadpool = { version = "0.1.4", path = "tokio-threadpool", optional = true } +tokio-tcp = { version = "0.1.0", path = "tokio-tcp", optional = true } +tokio-udp = { version = "0.1.0", path = "tokio-udp", optional = true } +tokio-timer = { version = "0.2.8", path = "tokio-timer", optional = true } + # Needed until `reactor` is removed from `tokio`. -mio = "0.6.14" +mio = { version = "0.6.14", optional = true } # Needed for async/await preview support tokio-async-await = { version = "0.1.0", path = "tokio-async-await", optional = true } [target.'cfg(unix)'.dependencies] -tokio-uds = { version = "0.2.1", path = "tokio-uds" } +tokio-uds = { version = "0.2.1", path = "tokio-uds", optional = true } [dev-dependencies] env_logger = { version = "0.5", default-features = false } @@ -92,18 +123,18 @@ serde_json = "1.0" time = "0.1" [patch.crates-io] -tokio = { path = "." } -tokio-async-await = { path = "./tokio-async-await" } -tokio-codec = { path = "./tokio-codec" } -tokio-current-thread = { path = "./tokio-current-thread" } -tokio-executor = { path = "./tokio-executor" } -tokio-fs = { path = "./tokio-fs" } -tokio-io = { path = "./tokio-io" } -tokio-reactor = { path = "./tokio-reactor" } -tokio-signal = { path = "./tokio-signal" } -tokio-tcp = { path = "./tokio-tcp" } -tokio-threadpool = { path = "./tokio-threadpool" } -tokio-timer = { path = "./tokio-timer" } -tokio-tls = { path = "./tokio-tls" } -tokio-udp = { path = "./tokio-udp" } -tokio-uds = { path = "./tokio-uds" } +#tokio = { path = "." } +#tokio-async-await = { path = "./tokio-async-await" } +#tokio-codec = { path = "./tokio-codec" } +#tokio-current-thread = { path = "./tokio-current-thread" } +#tokio-executor = { path = "./tokio-executor" } +#tokio-fs = { path = "./tokio-fs" } +#tokio-io = { path = "./tokio-io" } +#tokio-reactor = { path = "./tokio-reactor" } +#tokio-signal = { path = "./tokio-signal" } +#tokio-tcp = { path = "./tokio-tcp" } +#tokio-threadpool = { path = "./tokio-threadpool" } +#tokio-timer = { path = "./tokio-timer" } +#tokio-tls = { path = "./tokio-tls" } +#tokio-udp = { path = "./tokio-udp" } +#tokio-uds = { path = "./tokio-uds" } diff --git a/src/io.rs b/src/io.rs index 0f1fcbca65a..1f7ecc28cc0 100644 --- a/src/io.rs +++ b/src/io.rs @@ -51,6 +51,7 @@ pub use tokio_io::{ }; // standard input, output, and error +#[cfg(feature = "fs")] pub use tokio_fs::{ stdin, Stdin, diff --git a/src/lib.rs b/src/lib.rs index 401d90f8988..23c89ecec60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -72,42 +72,72 @@ //! } //! ``` -extern crate bytes; -#[macro_use] +macro_rules! if_runtime { + ($($i:item)*) => ($( + #[cfg(any(feature = "rt-full"))] + $i + )*) +} + +#[cfg_attr(feature = "rt-full", macro_use)] extern crate futures; + +#[cfg(feature = "io")] +extern crate bytes; +#[cfg(feature = "reactor")] extern crate mio; +#[cfg(feature = "rt-full")] extern crate num_cpus; +#[cfg(feature = "rt-full")] extern crate tokio_current_thread; +#[cfg(feature = "io")] extern crate tokio_io; -extern crate tokio_executor; +#[cfg(feature = "codec")] extern crate tokio_codec; +#[cfg(feature = "fs")] extern crate tokio_fs; +#[cfg(feature = "reactor")] extern crate tokio_reactor; +#[cfg(feature = "rt-full")] extern crate tokio_threadpool; +#[cfg(feature = "timer")] extern crate tokio_timer; +#[cfg(feature = "tcp")] extern crate tokio_tcp; +#[cfg(feature = "udp")] extern crate tokio_udp; #[cfg(feature = "async-await-preview")] extern crate tokio_async_await; -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] extern crate tokio_uds; +#[cfg(feature = "timer")] pub mod clock; +#[cfg(feature = "codec")] pub mod codec; -pub mod executor; +#[cfg(feature = "fs")] pub mod fs; +#[cfg(feature = "io")] pub mod io; +#[cfg(any(feature = "tcp", feature = "udp", feature = "uds"))] pub mod net; pub mod prelude; +#[cfg(feature = "reactor")] pub mod reactor; -pub mod runtime; +#[cfg(feature = "timer")] pub mod timer; pub mod util; -pub use executor::spawn; -pub use runtime::run; +if_runtime! { + extern crate tokio_executor; + pub mod executor; + pub mod runtime; + + pub use executor::spawn; + pub use runtime::run; +} // ===== Experimental async/await support ===== diff --git a/src/net.rs b/src/net.rs index be8612ae791..a6b425da691 100644 --- a/src/net.rs +++ b/src/net.rs @@ -22,6 +22,7 @@ //! [`UnixDatagram`]: struct.UnixDatagram.html //! [`UnixDatagramFramed`]: struct.UnixDatagramFramed.html +#[cfg(feature = "tcp")] pub mod tcp { //! TCP bindings for `tokio`. //! @@ -42,15 +43,19 @@ pub mod tcp { //! [`Incoming`]: struct.Incoming.html pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; } +#[cfg(feature = "tcp")] pub use self::tcp::{TcpListener, TcpStream}; +#[cfg(feature = "tcp")] #[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] #[doc(hidden)] pub type ConnectFuture = self::tcp::ConnectFuture; +#[cfg(feature = "tcp")] #[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] #[doc(hidden)] pub type Incoming = self::tcp::Incoming; +#[cfg(feature = "udp")] pub mod udp { //! UDP bindings for `tokio`. //! @@ -68,16 +73,19 @@ pub mod udp { //! [`framed`]: struct.UdpSocket.html#method.framed pub use tokio_udp::{RecvDgram, SendDgram, UdpFramed, UdpSocket}; } +#[cfg(feature = "udp")] pub use self::udp::{UdpFramed, UdpSocket}; +#[cfg(feature = "udp")] #[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] #[doc(hidden)] pub type RecvDgram = self::udp::RecvDgram; +#[cfg(feature = "udp")] #[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] #[doc(hidden)] pub type SendDgram = self::udp::SendDgram; -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] pub mod unix { //! Unix domain socket bindings for `tokio` (only available on unix systems). @@ -86,5 +94,5 @@ pub mod unix { UnixListener, UnixStream, }; } -#[cfg(unix)] +#[cfg(all(unix, feature = "uds"))] pub use self::unix::{UnixDatagram, UnixDatagramFramed, UnixListener, UnixStream}; diff --git a/src/prelude.rs b/src/prelude.rs index ecd82fe40cb..5ca20399e84 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -10,6 +10,7 @@ //! //! The prelude may grow over time as additional items see ubiquitous use. +#[cfg(feature = "io")] pub use tokio_io::{ AsyncRead, AsyncWrite, diff --git a/src/runtime/current_thread/mod.rs b/src/runtime/current_thread/mod.rs index dca41711e80..45dc4efeff6 100644 --- a/src/runtime/current_thread/mod.rs +++ b/src/runtime/current_thread/mod.rs @@ -90,3 +90,18 @@ where r.run().expect("failed to resolve remaining futures"); Ok(v) } + +/// Start a current-thread runtime using the supplied future to bootstrap execution. +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +pub fn run(future: F) +where + F: Future + 'static, +{ + + let mut r = Runtime::new().expect("failed to start runtime on current thread"); + r.spawn(future); + r.run().expect("failed to resolve remaining futures"); +} diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs index 2ea0d548309..9e7b700c939 100644 --- a/src/runtime/mod.rs +++ b/src/runtime/mod.rs @@ -112,399 +112,14 @@ //! [`tokio::spawn`]: ../executor/fn.spawn.html //! [`Timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/timer/struct.Timer.html -mod builder; pub mod current_thread; -mod shutdown; -mod task_executor; +mod threadpool; -pub use self::builder::Builder; -pub use self::shutdown::Shutdown; -pub use self::task_executor::TaskExecutor; +pub use self::threadpool::{ + Builder, + Runtime, + Shutdown, + TaskExecutor, + run, +}; -use reactor::{Handle, Reactor}; - -use std::io; -use std::sync::Mutex; - -use tokio_executor::enter; -use tokio_threadpool as threadpool; - -use futures; -use futures::future::Future; - -/// Handle to the Tokio runtime. -/// -/// The Tokio runtime includes a reactor as well as an executor for running -/// tasks. -/// -/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, -/// most users will use [`tokio::run`], which uses a `Runtime` internally. -/// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: index.html -/// [`new`]: #method.new -/// [`Builder`]: struct.Builder.html -/// [`tokio::run`]: fn.run.html -#[derive(Debug)] -pub struct Runtime { - inner: Option, -} - -#[derive(Debug)] -struct Inner { - /// A handle to the reactor in the background thread. - reactor_handle: Handle, - - // TODO: This should go away in 0.2 - reactor: Mutex>, - - /// Task execution pool. - pool: threadpool::ThreadPool, -} - -// ===== impl Runtime ===== - -/// Start the Tokio runtime using the supplied future to bootstrap execution. -/// -/// This function is used to bootstrap the execution of a Tokio application. It -/// does the following: -/// -/// * Start the Tokio runtime using a default configuration. -/// * Spawn the given future onto the thread pool. -/// * Block the current thread until the runtime shuts down. -/// -/// Note that the function will not return immediately once `future` has -/// completed. Instead it waits for the entire runtime to become idle. -/// -/// See the [module level][mod] documentation for more details. -/// -/// # Examples -/// -/// ```rust -/// # extern crate tokio; -/// # extern crate futures; -/// # use futures::{Future, Stream}; -/// use tokio::net::TcpListener; -/// -/// # fn process(_: T) -> Box + Send> { -/// # unimplemented!(); -/// # } -/// # fn dox() { -/// # let addr = "127.0.0.1:8080".parse().unwrap(); -/// let listener = TcpListener::bind(&addr).unwrap(); -/// -/// let server = listener.incoming() -/// .map_err(|e| println!("error = {:?}", e)) -/// .for_each(|socket| { -/// tokio::spawn(process(socket)) -/// }); -/// -/// tokio::run(server); -/// # } -/// # pub fn main() {} -/// ``` -/// -/// # Panics -/// -/// This function panics if called from the context of an executor. -/// -/// [mod]: ../index.html -pub fn run(future: F) -where F: Future + Send + 'static, -{ - // Check enter before creating a new Runtime... - let mut entered = enter().expect("nested tokio::run"); - let mut runtime = Runtime::new().expect("failed to start new Runtime"); - runtime.spawn(future); - entered - .block_on(runtime.shutdown_on_idle()) - .expect("shutdown cannot error") -} - -impl Runtime { - /// Create a new runtime instance with default configuration values. - /// - /// This results in a reactor, thread pool, and timer being initialized. The - /// thread pool will not spawn any worker threads until it needs to, i.e. - /// tasks are scheduled to run. - /// - /// Most users will not need to call this function directly, instead they - /// will use [`tokio::run`](fn.run.html). - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// Creating a new `Runtime` with default configuration values. - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn new() -> io::Result { - Builder::new().build() - } - - #[deprecated(since = "0.1.5", note = "use `reactor` instead")] - #[doc(hidden)] - pub fn handle(&self) -> &Handle { - #[allow(deprecated)] - self.reactor() - } - - /// Return a reference to the reactor handle for this runtime instance. - /// - /// The returned handle reference can be cloned in order to get an owned - /// value of the handle. This handle can be used to initialize I/O resources - /// (like TCP or UDP sockets) that will not be used on the runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let reactor_handle = rt.reactor().clone(); - /// - /// // use `reactor_handle` - /// ``` - #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")] - pub fn reactor(&self) -> &Handle { - let mut reactor = self.inner().reactor.lock().unwrap(); - if let Some(reactor) = reactor.take() { - if let Ok(background) = reactor.background() { - background.forget(); - } - } - - &self.inner().reactor_handle - } - - /// Return a handle to the runtime's executor. - /// - /// The returned handle can be used to spawn tasks that run on this runtime. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// let executor_handle = rt.executor(); - /// - /// // use `executor_handle` - /// ``` - pub fn executor(&self) -> TaskExecutor { - let inner = self.inner().pool.sender().clone(); - TaskExecutor { inner } - } - - /// Spawn a future onto the Tokio runtime. - /// - /// This spawns the given future onto the runtime's executor, usually a - /// thread pool. The thread pool is then responsible for polling the future - /// until it completes. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ```rust - /// # extern crate tokio; - /// # extern crate futures; - /// # use futures::{future, Future, Stream}; - /// use tokio::runtime::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|| { - /// println!("now running on a worker thread"); - /// Ok(()) - /// })); - /// # } - /// # pub fn main() {} - /// ``` - /// - /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the executor - /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&mut self, future: F) -> &mut Self - where F: Future + Send + 'static, - { - self.inner_mut().pool.sender().spawn(future).unwrap(); - self - } - - /// Run a future to completion on the Tokio runtime. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, and yielding its resolved result. Any tasks or timers which - /// the future spawns internally will be executed on the runtime. - /// - /// This method should not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the executor is at capacity, if the provided - /// future panics, or if called within an asynchronous execution context. - pub fn block_on(&mut self, future: F) -> Result - where - F: Send + 'static + Future, - R: Send + 'static, - E: Send + 'static, - { - let mut entered = enter().expect("nested block_on"); - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - entered.block_on(rx).unwrap() - } - - /// Run a future to completion on the Tokio runtime, then wait for all - /// background futures to complete too. - /// - /// This runs the given future on the runtime, blocking until it is - /// complete, waiting for background futures to complete, and yielding - /// its resolved result. Any tasks or timers which the future spawns - /// internally will be executed on the runtime and waited for completion. - /// - /// This method should not be called from an asynchronous context. - /// - /// # Panics - /// - /// This function panics if the executor is at capacity, if the provided - /// future panics, or if called within an asynchronous execution context. - pub fn block_on_all(mut self, future: F) -> Result - where - F: Send + 'static + Future, - R: Send + 'static, - E: Send + 'static, - { - let mut entered = enter().expect("nested block_on_all"); - let (tx, rx) = futures::sync::oneshot::channel(); - self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); - let block = rx - .map_err(|_| unreachable!()) - .and_then(move |r| { - self.shutdown_on_idle() - .map(move |()| r) - }); - entered.block_on(block).unwrap() - } - - /// Signals the runtime to shutdown once it becomes idle. - /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// - /// This function can be used to perform a graceful shutdown of the runtime. - /// - /// The runtime enters an idle state once **all** of the following occur. - /// - /// * The thread pool has no tasks to execute, i.e., all tasks that were - /// spawned have completed. - /// * The reactor is not managing any I/O resources. - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_on_idle() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn shutdown_on_idle(mut self) -> Shutdown { - let inner = self.inner.take().unwrap(); - let inner = inner.pool.shutdown_on_idle(); - Shutdown { inner } - } - - /// Signals the runtime to shutdown immediately. - /// - /// Returns a future that completes once the shutdown operation has - /// completed. - /// - /// This function will forcibly shutdown the runtime, causing any - /// in-progress work to become canceled. The shutdown steps are: - /// - /// * Drain any scheduled work queues. - /// * Drop any futures that have not yet completed. - /// * Drop the reactor. - /// - /// Once the reactor has dropped, any outstanding I/O resources bound to - /// that reactor will no longer function. Calling any method on them will - /// result in an error. - /// - /// See [module level][mod] documentation for more details. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::prelude::*; - /// - /// let rt = Runtime::new() - /// .unwrap(); - /// - /// // Use the runtime... - /// - /// // Shutdown the runtime - /// rt.shutdown_now() - /// .wait().unwrap(); - /// ``` - /// - /// [mod]: index.html - pub fn shutdown_now(mut self) -> Shutdown { - let inner = self.inner.take().unwrap(); - Shutdown::shutdown_now(inner) - } - - fn inner(&self) -> &Inner { - self.inner.as_ref().unwrap() - } - - fn inner_mut(&mut self) -> &mut Inner { - self.inner.as_mut().unwrap() - } -} - -impl Drop for Runtime { - fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - let shutdown = Shutdown::shutdown_now(inner); - let _ = shutdown.wait(); - } - } -} diff --git a/src/runtime/builder.rs b/src/runtime/threadpool/builder.rs similarity index 99% rename from src/runtime/builder.rs rename to src/runtime/threadpool/builder.rs index 8a5dca77259..f7af4493ef3 100644 --- a/src/runtime/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -1,4 +1,4 @@ -use runtime::{Inner, Runtime}; +use super::{Inner, Runtime}; use reactor::Reactor; diff --git a/src/runtime/threadpool/mod.rs b/src/runtime/threadpool/mod.rs new file mode 100644 index 00000000000..77d89498543 --- /dev/null +++ b/src/runtime/threadpool/mod.rs @@ -0,0 +1,395 @@ +mod builder; +mod shutdown; +mod task_executor; + +pub use self::builder::Builder; +pub use self::shutdown::Shutdown; +pub use self::task_executor::TaskExecutor; + +use reactor::{Handle, Reactor}; + +use std::io; +use std::sync::Mutex; + +use tokio_executor::enter; +use tokio_threadpool as threadpool; + +use futures; +use futures::future::Future; + +/// Handle to the Tokio runtime. +/// +/// The Tokio runtime includes a reactor as well as an executor for running +/// tasks. +/// +/// Instances of `Runtime` can be created using [`new`] or [`Builder`]. However, +/// most users will use [`tokio::run`], which uses a `Runtime` internally. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +/// [`new`]: #method.new +/// [`Builder`]: struct.Builder.html +/// [`tokio::run`]: fn.run.html +#[derive(Debug)] +pub struct Runtime { + inner: Option, +} + +#[derive(Debug)] +struct Inner { + /// A handle to the reactor in the background thread. + reactor_handle: Handle, + + // TODO: This should go away in 0.2 + reactor: Mutex>, + + /// Task execution pool. + pool: threadpool::ThreadPool, +} + +// ===== impl Runtime ===== + +/// Start the Tokio runtime using the supplied future to bootstrap execution. +/// +/// This function is used to bootstrap the execution of a Tokio application. It +/// does the following: +/// +/// * Start the Tokio runtime using a default configuration. +/// * Spawn the given future onto the thread pool. +/// * Block the current thread until the runtime shuts down. +/// +/// Note that the function will not return immediately once `future` has +/// completed. Instead it waits for the entire runtime to become idle. +/// +/// See the [module level][mod] documentation for more details. +/// +/// # Examples +/// +/// ```rust +/// # extern crate tokio; +/// # extern crate futures; +/// # use futures::{Future, Stream}; +/// use tokio::net::TcpListener; +/// +/// # fn process(_: T) -> Box + Send> { +/// # unimplemented!(); +/// # } +/// # fn dox() { +/// # let addr = "127.0.0.1:8080".parse().unwrap(); +/// let listener = TcpListener::bind(&addr).unwrap(); +/// +/// let server = listener.incoming() +/// .map_err(|e| println!("error = {:?}", e)) +/// .for_each(|socket| { +/// tokio::spawn(process(socket)) +/// }); +/// +/// tokio::run(server); +/// # } +/// # pub fn main() {} +/// ``` +/// +/// # Panics +/// +/// This function panics if called from the context of an executor. +/// +/// [mod]: ../index.html +pub fn run(future: F) +where F: Future + Send + 'static, +{ + // Check enter before creating a new Runtime... + let mut entered = enter().expect("nested tokio::run"); + let mut runtime = Runtime::new().expect("failed to start new Runtime"); + runtime.spawn(future); + entered + .block_on(runtime.shutdown_on_idle()) + .expect("shutdown cannot error") +} + +impl Runtime { + /// Create a new runtime instance with default configuration values. + /// + /// This results in a reactor, thread pool, and timer being initialized. The + /// thread pool will not spawn any worker threads until it needs to, i.e. + /// tasks are scheduled to run. + /// + /// Most users will not need to call this function directly, instead they + /// will use [`tokio::run`](fn.run.html). + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// Creating a new `Runtime` with default configuration values. + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn new() -> io::Result { + Builder::new().build() + } + + #[deprecated(since = "0.1.5", note = "use `reactor` instead")] + #[doc(hidden)] + pub fn handle(&self) -> &Handle { + #[allow(deprecated)] + self.reactor() + } + + /// Return a reference to the reactor handle for this runtime instance. + /// + /// The returned handle reference can be cloned in order to get an owned + /// value of the handle. This handle can be used to initialize I/O resources + /// (like TCP or UDP sockets) that will not be used on the runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let reactor_handle = rt.reactor().clone(); + /// + /// // use `reactor_handle` + /// ``` + #[deprecated(since = "0.1.11", note = "there is now a reactor per worker thread")] + pub fn reactor(&self) -> &Handle { + let mut reactor = self.inner().reactor.lock().unwrap(); + if let Some(reactor) = reactor.take() { + if let Ok(background) = reactor.background() { + background.forget(); + } + } + + &self.inner().reactor_handle + } + + /// Return a handle to the runtime's executor. + /// + /// The returned handle can be used to spawn tasks that run on this runtime. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// let executor_handle = rt.executor(); + /// + /// // use `executor_handle` + /// ``` + pub fn executor(&self) -> TaskExecutor { + let inner = self.inner().pool.sender().clone(); + TaskExecutor { inner } + } + + /// Spawn a future onto the Tokio runtime. + /// + /// This spawns the given future onto the runtime's executor, usually a + /// thread pool. The thread pool is then responsible for polling the future + /// until it completes. + /// + /// See [module level][mod] documentation for more details. + /// + /// [mod]: index.html + /// + /// # Examples + /// + /// ```rust + /// # extern crate tokio; + /// # extern crate futures; + /// # use futures::{future, Future, Stream}; + /// use tokio::runtime::Runtime; + /// + /// # fn dox() { + /// // Create the runtime + /// let mut rt = Runtime::new().unwrap(); + /// + /// // Spawn a future onto the runtime + /// rt.spawn(future::lazy(|| { + /// println!("now running on a worker thread"); + /// Ok(()) + /// })); + /// # } + /// # pub fn main() {} + /// ``` + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the executor + /// is currently at capacity and is unable to spawn a new future. + pub fn spawn(&mut self, future: F) -> &mut Self + where F: Future + Send + 'static, + { + self.inner_mut().pool.sender().spawn(future).unwrap(); + self + } + + /// Run a future to completion on the Tokio runtime. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, and yielding its resolved result. Any tasks or timers which + /// the future spawns internally will be executed on the runtime. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on(&mut self, future: F) -> Result + where + F: Send + 'static + Future, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + entered.block_on(rx).unwrap() + } + + /// Run a future to completion on the Tokio runtime, then wait for all + /// background futures to complete too. + /// + /// This runs the given future on the runtime, blocking until it is + /// complete, waiting for background futures to complete, and yielding + /// its resolved result. Any tasks or timers which the future spawns + /// internally will be executed on the runtime and waited for completion. + /// + /// This method should not be called from an asynchronous context. + /// + /// # Panics + /// + /// This function panics if the executor is at capacity, if the provided + /// future panics, or if called within an asynchronous execution context. + pub fn block_on_all(mut self, future: F) -> Result + where + F: Send + 'static + Future, + R: Send + 'static, + E: Send + 'static, + { + let mut entered = enter().expect("nested block_on_all"); + let (tx, rx) = futures::sync::oneshot::channel(); + self.spawn(future.then(move |r| tx.send(r).map_err(|_| unreachable!()))); + let block = rx + .map_err(|_| unreachable!()) + .and_then(move |r| { + self.shutdown_on_idle() + .map(move |()| r) + }); + entered.block_on(block).unwrap() + } + + /// Signals the runtime to shutdown once it becomes idle. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function can be used to perform a graceful shutdown of the runtime. + /// + /// The runtime enters an idle state once **all** of the following occur. + /// + /// * The thread pool has no tasks to execute, i.e., all tasks that were + /// spawned have completed. + /// * The reactor is not managing any I/O resources. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_on_idle() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_on_idle(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + let inner = inner.pool.shutdown_on_idle(); + Shutdown { inner } + } + + /// Signals the runtime to shutdown immediately. + /// + /// Returns a future that completes once the shutdown operation has + /// completed. + /// + /// This function will forcibly shutdown the runtime, causing any + /// in-progress work to become canceled. The shutdown steps are: + /// + /// * Drain any scheduled work queues. + /// * Drop any futures that have not yet completed. + /// * Drop the reactor. + /// + /// Once the reactor has dropped, any outstanding I/O resources bound to + /// that reactor will no longer function. Calling any method on them will + /// result in an error. + /// + /// See [module level][mod] documentation for more details. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::prelude::*; + /// + /// let rt = Runtime::new() + /// .unwrap(); + /// + /// // Use the runtime... + /// + /// // Shutdown the runtime + /// rt.shutdown_now() + /// .wait().unwrap(); + /// ``` + /// + /// [mod]: index.html + pub fn shutdown_now(mut self) -> Shutdown { + let inner = self.inner.take().unwrap(); + Shutdown::shutdown_now(inner) + } + + fn inner(&self) -> &Inner { + self.inner.as_ref().unwrap() + } + + fn inner_mut(&mut self) -> &mut Inner { + self.inner.as_mut().unwrap() + } +} + +impl Drop for Runtime { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let shutdown = Shutdown::shutdown_now(inner); + let _ = shutdown.wait(); + } + } +} diff --git a/src/runtime/shutdown.rs b/src/runtime/threadpool/shutdown.rs similarity index 97% rename from src/runtime/shutdown.rs rename to src/runtime/threadpool/shutdown.rs index 1b6ba465652..66f81460800 100644 --- a/src/runtime/shutdown.rs +++ b/src/runtime/threadpool/shutdown.rs @@ -1,4 +1,4 @@ -use runtime::Inner; +use super::Inner; use tokio_threadpool as threadpool; use std::fmt; diff --git a/src/runtime/task_executor.rs b/src/runtime/threadpool/task_executor.rs similarity index 100% rename from src/runtime/task_executor.rs rename to src/runtime/threadpool/task_executor.rs diff --git a/src/util/future.rs b/src/util/future.rs index 92097ad9eaf..cae0fc9bb5c 100644 --- a/src/util/future.rs +++ b/src/util/future.rs @@ -1,9 +1,12 @@ +#[cfg(feature = "timer")] #[allow(deprecated)] use tokio_timer::Deadline; +#[cfg(feature = "timer")] use tokio_timer::Timeout; use futures::Future; +#[cfg(feature = "timer")] use std::time::{Instant, Duration}; @@ -55,12 +58,14 @@ pub trait FutureExt: Future { /// tokio::run(future); /// # } /// ``` + #[cfg(feature = "timer")] fn timeout(self, timeout: Duration) -> Timeout where Self: Sized, { Timeout::new(self, timeout) } + #[cfg(feature = "timer")] #[deprecated(since = "0.1.8", note = "use `timeout` instead")] #[allow(deprecated)] #[doc(hidden)] @@ -78,6 +83,7 @@ mod test { use super::*; use prelude::future; + #[cfg(feature = "timer")] #[test] fn timeout_polls_at_least_once() { let base_future = future::result::<(), ()>(Ok(())); diff --git a/src/util/stream.rs b/src/util/stream.rs index 8204ae0d55b..76e6c27e05e 100644 --- a/src/util/stream.rs +++ b/src/util/stream.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "timer")] use tokio_timer::{ throttle::Throttle, Timeout, @@ -5,6 +6,7 @@ use tokio_timer::{ use futures::Stream; +#[cfg(feature = "timer")] use std::time::Duration; @@ -25,6 +27,7 @@ pub trait StreamExt: Stream { /// Throttle down the stream by enforcing a fixed delay between items. /// /// Errors are also delayed. + #[cfg(feature = "timer")] fn throttle(self, duration: Duration) -> Throttle where Self: Sized { @@ -63,6 +66,7 @@ pub trait StreamExt: Stream { /// tokio::run(stream); /// # } /// ``` + #[cfg(feature = "timer")] fn timeout(self, timeout: Duration) -> Timeout where Self: Sized, {