From 012db860c202d6e25062149a93cbac8364f81d3f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 9 Jan 2017 12:18:56 -0800 Subject: [PATCH] Enable configurating multiplex behavior Adds a `MultiplexConfig` struct that is used to configure how the multiplexer behaves. --- src/streaming/multiplex/advanced.rs | 19 ++++--- src/streaming/multiplex/client.rs | 13 +++-- src/streaming/multiplex/mod.rs | 50 +++++++++++++++++- src/streaming/multiplex/server.rs | 44 +++++++++++++--- tests/support/mock.rs | 49 +++++++++++++++--- tests/test_multiplex_server.rs | 79 ++++++++++++++++++++++++++--- 6 files changed, 217 insertions(+), 37 deletions(-) diff --git a/src/streaming/multiplex/advanced.rs b/src/streaming/multiplex/advanced.rs index 92e18d3e..5a53cf20 100644 --- a/src/streaming/multiplex/advanced.rs +++ b/src/streaming/multiplex/advanced.rs @@ -12,7 +12,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; use std::io; use super::frame_buf::{FrameBuf, FrameDeque}; -use super::{Frame, RequestId, Transport}; +use super::{Frame, RequestId, Transport, MultiplexConfig}; use buffer_one::BufferOne; /* @@ -29,12 +29,6 @@ use buffer_one::BufferOne; * */ -/// The max number of buffered frames that the connection can support. Once -/// this number is reached. -/// -/// See module docs for more detail -const MAX_BUFFERED_FRAMES: usize = 128; - /// Task that drives multiplexed protocols /// /// Provides protocol multiplexing functionality in a generic way over clients @@ -186,16 +180,21 @@ pub trait Dispatch { */ impl Multiplex where T: Dispatch { - /// Create a new pipeline `Multiplex` dispatcher with the given service and - /// transport + /// Create a new `Multiplex` pub fn new(dispatch: T) -> Multiplex { + let config = MultiplexConfig::default(); + Multiplex::new_configured(dispatch, &config) + } + + /// Create a new `Multiplex` with the given configuration + pub fn new_configured(dispatch: T, config: &MultiplexConfig) -> Multiplex { // Add `Sink` impl for `Dispatch` let dispatch = DispatchSink { inner: dispatch }; // Add a single slot buffer for the sink let dispatch = BufferOne::new(dispatch); - let frame_buf = FrameBuf::with_capacity(MAX_BUFFERED_FRAMES); + let frame_buf = FrameBuf::with_capacity(config.max_buffered_frames); Multiplex { run: true, diff --git a/src/streaming/multiplex/client.rs b/src/streaming/multiplex/client.rs index ad5cea5f..cf5071be 100644 --- a/src/streaming/multiplex/client.rs +++ b/src/streaming/multiplex/client.rs @@ -1,4 +1,4 @@ -use super::{Frame, RequestId, StreamingMultiplex, Transport}; +use super::{Frame, RequestId, StreamingMultiplex, Transport, MultiplexConfig}; use super::advanced::{Multiplex, MultiplexMessage}; use BindClient; @@ -48,6 +48,11 @@ pub trait ClientProto: 'static { /// Build a transport from the given I/O object, using `self` for any /// configuration. fn bind_transport(&self, io: T) -> Self::BindTransport; + + /// Returns the multiplex configuration values + fn multiplex_config(&self) -> MultiplexConfig { + MultiplexConfig::default() + } } impl BindClient, T> for P where @@ -63,15 +68,17 @@ impl BindClient, T> for P where fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient { let (client, rx) = client_proxy::pair(); + let multiplex_config = self.multiplex_config(); - let task = self.bind_transport(io).into_future().and_then(|transport| { + let task = self.bind_transport(io).into_future().and_then(move |transport| { let dispatch: Dispatch = Dispatch { transport: transport, requests: rx, in_flight: HashMap::new(), next_request_id: 0, }; - Multiplex::new(dispatch) + + Multiplex::new_configured(dispatch, &multiplex_config) }).map_err(|e| { // TODO: where to punt this error to? debug!("multiplex task failed with error; err={:?}", e); diff --git a/src/streaming/multiplex/mod.rs b/src/streaming/multiplex/mod.rs index 726caa91..ba421842 100644 --- a/src/streaming/multiplex/mod.rs +++ b/src/streaming/multiplex/mod.rs @@ -2,7 +2,7 @@ //! //! See the crate-level docs for an overview. -use std::io; +use std::{io, u64}; use futures::{Stream, Sink, Async}; use tokio_core::io::{Io, Framed, Codec}; @@ -29,6 +29,54 @@ pub type RequestId = u64; /// implement the `ClientProto` or `ServerProto` traits in this module. pub struct StreamingMultiplex(B); +/// Multiplex configuration options +#[derive(Debug, Copy, Clone)] +pub struct MultiplexConfig { + /// Maximum number of in-flight requests + max_in_flight: usize, + max_response_displacement: u64, + max_buffered_frames: usize, +} + +impl MultiplexConfig { + /// Set the maximum number of requests to process concurrently + /// + /// Default: 100 + pub fn max_in_flight(&mut self, val: usize) -> &mut Self { + self.max_in_flight = val; + self + } + + /// Maximum number of frames to buffer before stopping to read from the + /// transport + pub fn max_buffered_frames(&mut self, val: usize) -> &mut Self { + self.max_buffered_frames = val; + self + } + + /// Set the maximum response displacement + /// + /// For each request, the response displacement is the number of requests + /// that arrived after and were completed before. A value of 0 is equivalent + /// to pipelining for protocols without streaming bodies. + /// + /// Default: `usize::MAX` + pub fn max_response_displacement(&mut self, val: u64) -> &mut Self { + self.max_response_displacement = val; + self + } +} + +impl Default for MultiplexConfig { + fn default() -> Self { + MultiplexConfig { + max_in_flight: 100, + max_buffered_frames: 128, + max_response_displacement: u64::MAX, + } + } +} + /// Additional transport details relevant to streaming, multiplexed protocols. /// /// All methods added in this trait have default implementations. diff --git a/src/streaming/multiplex/server.rs b/src/streaming/multiplex/server.rs index 9e7d0762..4786b5ce 100644 --- a/src/streaming/multiplex/server.rs +++ b/src/streaming/multiplex/server.rs @@ -1,4 +1,4 @@ -use super::{Frame, RequestId, Transport}; +use super::{Frame, RequestId, Transport, MultiplexConfig}; use super::advanced::{Multiplex, MultiplexMessage}; use BindServer; @@ -65,6 +65,11 @@ pub trait ServerProto: 'static { /// Build a transport from the given I/O object, using `self` for any /// configuration. fn bind_transport(&self, io: T) -> Self::BindTransport; + + /// Returns the multiplex configuration values + fn multiplex_config(&self) -> MultiplexConfig { + MultiplexConfig::default() + } } impl BindServer, T> for P where @@ -81,13 +86,18 @@ impl BindServer, T> for P where Response = Self::ServiceResponse, Error = Self::ServiceError> + 'static { - let task = self.bind_transport(io).into_future().and_then(|transport| { + let multiplex_config = self.multiplex_config(); + + let task = self.bind_transport(io).into_future().and_then(move |transport| { let dispatch: Dispatch = Dispatch { service: service, transport: transport, in_flight: vec![], + num_received: 0, + config: multiplex_config, }; - Multiplex::new(dispatch) + + Multiplex::new_configured(dispatch, &multiplex_config) }).map_err(|_| ()); // Spawn the multiplex dispatcher @@ -101,7 +111,11 @@ struct Dispatch where // The service handling the connection service: S, transport: P::Transport, - in_flight: Vec<(RequestId, InFlight)>, + in_flight: Vec<(RequestId, u64, InFlight)>, + // Counts the number of received requests + num_received: u64, + // Multiplex configuration settings + config: MultiplexConfig, } enum InFlight { @@ -137,15 +151,26 @@ impl super::advanced::Dispatch for Dispatch where let mut idx = None; - for (i, &mut (request_id, ref mut slot)) in self.in_flight.iter_mut().enumerate() { + // Get the number for the earliest in-flight + let earliest = match self.in_flight.first() { + Some(&(_, num, _)) => { + num + } + None => return Ok(Async::NotReady), + }; + + for (i, &mut (request_id, num, ref mut slot)) in self.in_flight.iter_mut().enumerate() { trace!(" --> poll; request_id={:?}", request_id); + if slot.poll() && idx.is_none() { - idx = Some(i); + if num - earliest <= self.config.max_response_displacement { + idx = Some(i); + } } } if let Some(idx) = idx { - let (request_id, message) = self.in_flight.remove(idx); + let (request_id, _, message) = self.in_flight.remove(idx); let message = MultiplexMessage { id: request_id, message: message.unwrap_done(), @@ -166,8 +191,11 @@ impl super::advanced::Dispatch for Dispatch where assert!(!solo); if let Ok(request) = message { + let num = self.num_received; + self.num_received += 1; + let response = self.service.call(request); - self.in_flight.push((id, InFlight::Active(response))); + self.in_flight.push((id, num, InFlight::Active(response))); } // TODO: Should the error be handled differently? diff --git a/tests/support/mock.rs b/tests/support/mock.rs index 64cdc41d..dae2dbf6 100644 --- a/tests/support/mock.rs +++ b/tests/support/mock.rs @@ -15,14 +15,17 @@ use self::futures::sync::oneshot; use self::futures::{Future, Stream, Sink, Poll, StartSend, Async}; use self::tokio_core::io::Io; use self::tokio_core::reactor::Core; -use self::tokio_proto::streaming::multiplex; +use self::tokio_proto::streaming::multiplex::{self, MultiplexConfig}; use self::tokio_proto::streaming::pipeline; use self::tokio_proto::streaming::{Message, Body}; use self::tokio_proto::util::client_proxy::Response; use self::tokio_proto::{BindClient, BindServer}; use self::tokio_service::Service; -struct MockProtocol(RefCell>>); +struct MockProtocol { + transport: RefCell>>, + multiplex_config: Option, +} impl pipeline::ClientProto for MockProtocol> where T: 'static, @@ -39,7 +42,7 @@ impl pipeline::ClientProto for MockProtocol Result>, io::Error> { - Ok(self.0.borrow_mut().take().unwrap()) + Ok(self.transport.borrow_mut().take().unwrap()) } } @@ -58,7 +61,11 @@ impl multiplex::ClientProto for MockProtocol Result>, io::Error> { - Ok(self.0.borrow_mut().take().unwrap()) + Ok(self.transport.borrow_mut().take().unwrap()) + } + + fn multiplex_config(&self) -> MultiplexConfig { + self.multiplex_config.unwrap_or(MultiplexConfig::default()) } } @@ -77,7 +84,7 @@ impl pipeline::ServerProto for MockProtocol Result>, io::Error> { - Ok(self.0.borrow_mut().take().unwrap()) + Ok(self.transport.borrow_mut().take().unwrap()) } } @@ -96,7 +103,11 @@ impl multiplex::ServerProto for MockProtocol Result>, io::Error> { - Ok(self.0.borrow_mut().take().unwrap()) + Ok(self.transport.borrow_mut().take().unwrap()) + } + + fn multiplex_config(&self) -> MultiplexConfig { + self.multiplex_config.unwrap_or(MultiplexConfig::default()) } } @@ -186,15 +197,23 @@ impl MockTransportCtl { fn transport() -> (MockTransportCtl, MockProtocol) { let (tx1, rx1) = mpsc::channel(1); let (tx2, rx2) = mpsc::unbounded(); + let ctl = MockTransportCtl { tx: Some(tx2), rx: rx1.wait(), }; + let transport = MockTransport { tx: tx1, rx: rx2, }; - (ctl, MockProtocol(RefCell::new(Some(transport)))) + + let proto = MockProtocol { + transport: RefCell::new(Some(transport)), + multiplex_config: None, + }; + + (ctl, proto) } struct CompleteOnDrop { @@ -301,15 +320,29 @@ pub fn multiplex_client() return (ctl, Box::new(service), Box::new(srv)); } +/// Spawns a new multiplex server with default settings, returning a handle to +/// the mock transport as well as a handle to the running server pub fn multiplex_server(s: S) -> (MockTransportCtl>, Box) where S: Service>, Response = Message<&'static str, MockBodyStream>, Error = io::Error> + Send + 'static, +{ + configured_multiplex_server(s, MultiplexConfig::default()) +} + +/// Spawns a new multiplex server, returning a handle to the mock transport as +/// well as a handle to the running server +pub fn configured_multiplex_server(s: S, config: MultiplexConfig) + -> (MockTransportCtl>, Box) + where S: Service>, + Response = Message<&'static str, MockBodyStream>, + Error = io::Error> + Send + 'static, { drop(env_logger::init()); - let (ctl, proto) = transport(); + let (ctl, mut proto) = transport(); + proto.multiplex_config = Some(config); let (finished_tx, finished_rx) = oneshot::channel(); let t = thread::spawn(move || { diff --git a/tests/test_multiplex_server.rs b/tests/test_multiplex_server.rs index 893ba3e0..8d46895e 100644 --- a/tests/test_multiplex_server.rs +++ b/tests/test_multiplex_server.rs @@ -10,7 +10,7 @@ extern crate rand; extern crate log; use std::io; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::cell::RefCell; use std::thread; @@ -21,7 +21,7 @@ use futures::future; use futures::sync::oneshot; use futures::sync::mpsc; use tokio_proto::streaming::{Message, Body}; -use tokio_proto::streaming::multiplex::{Frame, RequestId}; +use tokio_proto::streaming::multiplex::{Frame, RequestId, MultiplexConfig}; use rand::Rng; mod support; @@ -426,14 +426,82 @@ fn test_interleaving_request_body_chunks() { } #[test] +fn test_response_displacement() { + let mut config = MultiplexConfig::default(); + + // No response displacement. This pipelines response heads, but allows the + // response body to be multiplexed. + config.max_response_displacement(0); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let expect = Mutex::new(vec![ + ("hello", rx1), + ("world", rx2), + ]); + + let service = simple_service(move |req: Message<&'static str, Body>| { + let (expect, resp) = expect.lock().unwrap().remove(0); + assert_eq!(req, expect); + + Box::new( + resp.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .and_then(|resp| Ok(resp))) + }); + + let (mut mock, _other) = mock::configured_multiplex_server(service, config); + + // Send two requests + mock.send(msg(0, "hello")); + mock.send(msg(1, "world")); + + // First complete the second response + let (body2, rx2) = Body::pair(); + tx2.complete(Message::WithBody("response 2", Box::new(rx2))); + + // Wait a bit + thread::sleep(Duration::from_millis(250)); + + // Complete the first response + let (body1, rx1) = Body::pair(); + tx1.complete(Message::WithBody("response 1", Box::new(rx1))); + + // Assert the responses were pipelined + let wr = mock.next_write(); + assert_eq!(0, wr.request_id()); + assert_eq!("response 1", wr.unwrap_msg()); + + let wr = mock.next_write(); + assert_eq!(1, wr.request_id()); + assert_eq!("response 2", wr.unwrap_msg()); + + // Assert the response bodies are multiplexed + let _body2 = body2.send(Ok(123)).wait(); + + let wr = mock.next_write(); + assert_eq!(1, wr.request_id()); + assert_eq!(123, wr.unwrap_body().unwrap()); + + let _body1 = body1.send(Ok(456)).wait(); + + let wr = mock.next_write(); + assert_eq!(0, wr.request_id()); + assert_eq!(456, wr.unwrap_body().unwrap()); +} + +#[test] +#[ignore] fn test_interleaving_response_body_chunks() { } #[test] +#[ignore] fn test_transport_provides_invalid_request_ids() { } #[test] +#[ignore] fn test_reaching_max_buffered_frames() { } @@ -457,16 +525,13 @@ fn test_read_error_as_first_frame() { } #[test] +#[ignore] fn test_read_error_during_stream() { } #[test] +#[ignore] fn test_error_handling_before_message_dispatched() { - /* - let service = simple_service(|_| { - unimplemented!(); - }); - */ } fn msg(id: RequestId, msg: &'static str) -> Frame<&'static str, u32, io::Error> {