Skip to content

Commit

Permalink
Merge pull request #15 from twittner/notifier
Browse files Browse the repository at this point in the history
Use `futures::executor::Spawn` with a `Notify` impl.
  • Loading branch information
twittner authored Sep 18, 2018
2 parents db62858 + 8230b5c commit 5acf79e
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MIT"
[dependencies]
bytes = "0.4"
futures = "0.1"
nohash-hasher = "0.1"
log = "0.4"
parking_lot = "0.6"
quick-error = "0.1"
Expand Down
38 changes: 14 additions & 24 deletions src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use frame::{
RawFrame,
WindowUpdate
};
use futures::{prelude::*, stream::{Fuse, Stream}, task::{self, Task}};
use futures::{executor, prelude::*, stream::{Fuse, Stream}};
use notify::Notifier;
use parking_lot::{Mutex, MutexGuard};
use std::{
cmp::min,
Expand Down Expand Up @@ -158,9 +159,7 @@ impl<'a, T> Drop for Use<'a, T> {
debug!("{:?}: destroying connection", self.inner.mode);
self.inner.is_dead = true;
self.inner.streams.clear();
for task in self.inner.tasks.drain(..) {
task.notify()
}
self.inner.tasks.notify_all()
}
}
}
Expand All @@ -171,10 +170,10 @@ struct Inner<T> {
is_dead: bool,
config: Config,
streams: BTreeMap<stream::Id, StreamEntry>,
resource: Fuse<Framed<T, FrameCodec>>,
resource: executor::Spawn<Fuse<Framed<T, FrameCodec>>>,
incoming: VecDeque<stream::Id>,
pending: VecDeque<RawFrame>,
tasks: Vec<Task>,
tasks: Arc<Notifier>,
next_id: u32
}

Expand Down Expand Up @@ -209,10 +208,10 @@ where
is_dead: false,
config,
streams: BTreeMap::new(),
resource: framed,
resource: executor::spawn(framed),
incoming: VecDeque::new(),
pending: VecDeque::new(),
tasks: Vec::new(),
tasks: Arc::new(Notifier::new()),
next_id: match mode {
Mode::Client => 1,
Mode::Server => 2
Expand Down Expand Up @@ -242,15 +241,15 @@ where
}

fn flush_pending(&mut self) -> Poll<(), ConnectionError> {
try_ready!(self.resource.poll_complete());
try_ready!(self.resource.poll_flush_notify(&self.tasks, 0));
while let Some(frame) = self.pending.pop_front() {
trace!("{:?}: send: {:?}", self.mode, frame.header);
if let AsyncSink::NotReady(frame) = self.resource.start_send(frame)? {
if let AsyncSink::NotReady(frame) = self.resource.start_send_notify(frame, &self.tasks, 0)? {
self.pending.push_front(frame);
return Ok(Async::NotReady)
}
}
try_ready!(self.resource.poll_complete());
try_ready!(self.resource.poll_flush_notify(&self.tasks, 0));
Ok(Async::Ready(()))
}

Expand All @@ -260,10 +259,10 @@ where
}
loop {
if !self.pending.is_empty() && self.flush_pending()?.is_not_ready() {
self.tasks.push(task::current());
self.tasks.insert_current();
return Ok(Async::NotReady)
}
match self.resource.poll()? {
match self.resource.poll_stream_notify(&self.tasks, 0)? {
Async::Ready(Some(frame)) => {
trace!("{:?}: recv: {:?}", self.mode, frame.header);
let response = match frame.dyn_type() {
Expand All @@ -275,29 +274,20 @@ where
self.on_ping(&Frame::assert(frame)).map(Frame::into_raw),
Type::GoAway => {
self.is_dead = true;
for task in self.tasks.drain(..) {
task.notify()
}
return Ok(Async::Ready(()))
}
};
if let Some(frame) = response {
self.pending.push_back(frame)
}
for task in self.tasks.drain(..) {
task.notify()
}
}
Async::Ready(None) => {
trace!("{:?}: eof: {:?}", self.mode, self);
self.is_dead = true;
for task in self.tasks.drain(..) {
task.notify()
}
return Ok(Async::Ready(()))
}
Async::NotReady => {
self.tasks.push(task::current());
self.tasks.insert_current();
return Ok(Async::NotReady)
}
}
Expand Down Expand Up @@ -547,7 +537,7 @@ where
}
let frame = match inner.streams.get(&self.id).map(|s| s.credit) {
Some(0) => {
inner.tasks.push(task::current());
inner.tasks.insert_current();
inner.on_drop(Action::None);
return Err(io::ErrorKind::WouldBlock.into())
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
extern crate bytes;
#[macro_use]
extern crate futures;
extern crate nohash_hasher;
#[macro_use]
extern crate log;
extern crate parking_lot;
Expand All @@ -34,6 +35,7 @@ mod connection;
mod error;
#[allow(dead_code)]
mod frame;
mod notify;
mod stream;

pub use connection::{Connection, Mode, StreamHandle};
Expand Down
73 changes: 73 additions & 0 deletions src/notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
// OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.


use futures::{executor, task};
use parking_lot::Mutex;
use nohash_hasher::IntMap;
use std::sync::atomic::{AtomicUsize, Ordering};


static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0);

task_local!{
static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed)
}


/// A notifier maintains a collection of tasks which should be
/// notified at some point. Useful in conjuction with `futures::executor::Spawn`.
pub struct Notifier {
tasks: Mutex<IntMap<usize, task::Task>>
}

impl Notifier {
pub fn new() -> Self {
Notifier { tasks: Mutex::new(IntMap::default()) }
}

/// Insert the current task to the set of tasks to be notified.
///
/// # Panics
///
/// If called outside of a futures task.
pub fn insert_current(&self) {
self.tasks.lock().insert(TASK_ID.with(|&t| t), task::current());
}

/// Notify all registered tasks.
pub fn notify_all(&self) {
let mut tasks = self.tasks.lock();
for (_, t) in tasks.drain() {
t.notify();
}
}

/// Return the number of currently registered tasks.
pub fn len(&self) -> usize {
self.tasks.lock().len()
}
}

impl executor::Notify for Notifier {
fn notify(&self, _: usize) {
self.notify_all()
}
}

0 comments on commit 5acf79e

Please sign in to comment.