diff --git a/Cargo.toml b/Cargo.toml index cbdd95b0..f7012da6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ license = "MIT" [dependencies] bytes = "0.4" futures = "0.1" +nohash-hasher = "0.1" log = "0.4" parking_lot = "0.6" quick-error = "0.1" diff --git a/src/connection.rs b/src/connection.rs index cf7b555a..cf5bcee6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -29,7 +29,8 @@ use frame::{ RawFrame, WindowUpdate }; -use futures::{prelude::*, stream::{Fuse, Stream}, task::{self, Task}}; +use futures::{executor, prelude::*, stream::{Fuse, Stream}}; +use notify::Notifier; use parking_lot::{Mutex, MutexGuard}; use std::{ cmp::min, @@ -158,9 +159,7 @@ impl<'a, T> Drop for Use<'a, T> { debug!("{:?}: destroying connection", self.inner.mode); self.inner.is_dead = true; self.inner.streams.clear(); - for task in self.inner.tasks.drain(..) { - task.notify() - } + self.inner.tasks.notify_all() } } } @@ -171,10 +170,10 @@ struct Inner { is_dead: bool, config: Config, streams: BTreeMap, - resource: Fuse>, + resource: executor::Spawn>>, incoming: VecDeque, pending: VecDeque, - tasks: Vec, + tasks: Arc, next_id: u32 } @@ -209,10 +208,10 @@ where is_dead: false, config, streams: BTreeMap::new(), - resource: framed, + resource: executor::spawn(framed), incoming: VecDeque::new(), pending: VecDeque::new(), - tasks: Vec::new(), + tasks: Arc::new(Notifier::new()), next_id: match mode { Mode::Client => 1, Mode::Server => 2 @@ -242,15 +241,15 @@ where } fn flush_pending(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.resource.poll_complete()); + try_ready!(self.resource.poll_flush_notify(&self.tasks, 0)); while let Some(frame) = self.pending.pop_front() { trace!("{:?}: send: {:?}", self.mode, frame.header); - if let AsyncSink::NotReady(frame) = self.resource.start_send(frame)? { + if let AsyncSink::NotReady(frame) = self.resource.start_send_notify(frame, &self.tasks, 0)? { self.pending.push_front(frame); return Ok(Async::NotReady) } } - try_ready!(self.resource.poll_complete()); + try_ready!(self.resource.poll_flush_notify(&self.tasks, 0)); Ok(Async::Ready(())) } @@ -260,10 +259,10 @@ where } loop { if !self.pending.is_empty() && self.flush_pending()?.is_not_ready() { - self.tasks.push(task::current()); + self.tasks.insert_current(); return Ok(Async::NotReady) } - match self.resource.poll()? { + match self.resource.poll_stream_notify(&self.tasks, 0)? { Async::Ready(Some(frame)) => { trace!("{:?}: recv: {:?}", self.mode, frame.header); let response = match frame.dyn_type() { @@ -275,29 +274,20 @@ where self.on_ping(&Frame::assert(frame)).map(Frame::into_raw), Type::GoAway => { self.is_dead = true; - for task in self.tasks.drain(..) { - task.notify() - } return Ok(Async::Ready(())) } }; if let Some(frame) = response { self.pending.push_back(frame) } - for task in self.tasks.drain(..) { - task.notify() - } } Async::Ready(None) => { trace!("{:?}: eof: {:?}", self.mode, self); self.is_dead = true; - for task in self.tasks.drain(..) { - task.notify() - } return Ok(Async::Ready(())) } Async::NotReady => { - self.tasks.push(task::current()); + self.tasks.insert_current(); return Ok(Async::NotReady) } } @@ -547,7 +537,7 @@ where } let frame = match inner.streams.get(&self.id).map(|s| s.credit) { Some(0) => { - inner.tasks.push(task::current()); + inner.tasks.insert_current(); inner.on_drop(Action::None); return Err(io::ErrorKind::WouldBlock.into()) } diff --git a/src/lib.rs b/src/lib.rs index 73d836a3..2daf7c0e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ extern crate bytes; #[macro_use] extern crate futures; +extern crate nohash_hasher; #[macro_use] extern crate log; extern crate parking_lot; @@ -34,6 +35,7 @@ mod connection; mod error; #[allow(dead_code)] mod frame; +mod notify; mod stream; pub use connection::{Connection, Mode, StreamHandle}; diff --git a/src/notify.rs b/src/notify.rs new file mode 100644 index 00000000..b9a5bd34 --- /dev/null +++ b/src/notify.rs @@ -0,0 +1,73 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS +// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +use futures::{executor, task}; +use parking_lot::Mutex; +use nohash_hasher::IntMap; +use std::sync::atomic::{AtomicUsize, Ordering}; + + +static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); + +task_local!{ + static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) +} + + +/// A notifier maintains a collection of tasks which should be +/// notified at some point. Useful in conjuction with `futures::executor::Spawn`. +pub struct Notifier { + tasks: Mutex> +} + +impl Notifier { + pub fn new() -> Self { + Notifier { tasks: Mutex::new(IntMap::default()) } + } + + /// Insert the current task to the set of tasks to be notified. + /// + /// # Panics + /// + /// If called outside of a futures task. + pub fn insert_current(&self) { + self.tasks.lock().insert(TASK_ID.with(|&t| t), task::current()); + } + + /// Notify all registered tasks. + pub fn notify_all(&self) { + let mut tasks = self.tasks.lock(); + for (_, t) in tasks.drain() { + t.notify(); + } + } + + /// Return the number of currently registered tasks. + pub fn len(&self) -> usize { + self.tasks.lock().len() + } +} + +impl executor::Notify for Notifier { + fn notify(&self, _: usize) { + self.notify_all() + } +} +