diff --git a/src/streaming/pipeline/server.rs b/src/streaming/pipeline/server.rs index 5a18f267..8170dcd0 100644 --- a/src/streaming/pipeline/server.rs +++ b/src/streaming/pipeline/server.rs @@ -1,8 +1,12 @@ use BindServer; use futures::stream::Stream; +use futures::task::{EventSet, UnparkEvent, with_unpark_event}; use futures::{Future, IntoFuture, Poll, Async}; +use std::collections::BTreeSet; use std::collections::VecDeque; use std::io; +use std::num::Wrapping; +use std::sync::{Arc, Mutex}; use streaming::{Message, Body}; use super::advanced::{Pipeline, PipelineMessage}; use super::{Frame, Transport}; @@ -72,6 +76,9 @@ impl BindServer, T> for P where service: service, transport: transport, in_flight: VecDeque::with_capacity(32), + event_offset: Wrapping(0), + polled_to: 0, + event_set: Arc::new(DispatchEventSet{ inner: Mutex::new(BTreeSet::new()) }), }; Pipeline::new(dispatch) }); @@ -88,6 +95,9 @@ struct Dispatch where service: S, transport: P::Transport, in_flight: VecDeque>, + event_offset: Wrapping, + polled_to: usize, + event_set: Arc } enum InFlight { @@ -131,8 +141,28 @@ impl super::advanced::Dispatch for Dispatch where } fn poll(&mut self) -> Poll>, io::Error> { - for slot in self.in_flight.iter_mut() { - slot.poll(); + // new Futures must be polled once to start processing + if self.polled_to < self.in_flight.len() { + for (i, slot) in self.in_flight.iter_mut().enumerate().skip(self.polled_to) { + let event = UnparkEvent::new(self.event_set.clone(), (self.event_offset + Wrapping(i)).0); + with_unpark_event(event, || slot.poll() ); + } + self.polled_to = self.in_flight.len(); + } + + // get list of Futures which received unpark events, and empty that list + let poll_ids = { + let mut events = self.event_set.inner.lock().unwrap(); + let before = events.clone(); + *events = BTreeSet::new(); + before + }; + + // call poll for Futures which received unpark events + for id in poll_ids { + let event = UnparkEvent::new(self.event_set.clone(), id); + let slot = self.in_flight.get_mut((Wrapping(id) - self.event_offset).0).unwrap(); + with_unpark_event(event, || slot.poll() ); } match self.in_flight.front() { @@ -141,7 +171,11 @@ impl super::advanced::Dispatch for Dispatch where } match self.in_flight.pop_front() { - Some(InFlight::Done(res)) => Ok(Async::Ready(Some(res))), + Some(InFlight::Done(res)) => { + self.event_offset += Wrapping(1); + self.polled_to -= 1; + Ok(Async::Ready(Some(res))) + }, _ => panic!(), } } @@ -166,3 +200,13 @@ impl InFlight { *self = InFlight::Done(res); } } + +struct DispatchEventSet { + inner: Mutex> +} + +impl EventSet for DispatchEventSet { + fn insert(&self, id: usize) { + self.inner.lock().unwrap().insert(id); + } +}