Skip to content
This repository has been archived by the owner on Sep 13, 2018. It is now read-only.

Enable configurating multiplex behavior #112

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions src/streaming/multiplex/advanced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::collections::hash_map::Entry;
use std::collections::{HashMap, VecDeque};
use std::io;
use super::frame_buf::{FrameBuf, FrameDeque};
use super::{Frame, RequestId, Transport};
use super::{Frame, RequestId, Transport, MultiplexConfig};
use buffer_one::BufferOne;

/*
Expand All @@ -29,12 +29,6 @@ use buffer_one::BufferOne;
*
*/

/// The max number of buffered frames that the connection can support. Once
/// this number is reached.
///
/// See module docs for more detail
const MAX_BUFFERED_FRAMES: usize = 128;

/// Task that drives multiplexed protocols
///
/// Provides protocol multiplexing functionality in a generic way over clients
Expand Down Expand Up @@ -186,16 +180,21 @@ pub trait Dispatch {
*/

impl<T> Multiplex<T> where T: Dispatch {
/// Create a new pipeline `Multiplex` dispatcher with the given service and
/// transport
/// Create a new `Multiplex`
pub fn new(dispatch: T) -> Multiplex<T> {
let config = MultiplexConfig::default();
Multiplex::new_configured(dispatch, &config)
}

/// Create a new `Multiplex` with the given configuration
pub fn new_configured(dispatch: T, config: &MultiplexConfig) -> Multiplex<T> {
// Add `Sink` impl for `Dispatch`
let dispatch = DispatchSink { inner: dispatch };

// Add a single slot buffer for the sink
let dispatch = BufferOne::new(dispatch);

let frame_buf = FrameBuf::with_capacity(MAX_BUFFERED_FRAMES);
let frame_buf = FrameBuf::with_capacity(config.max_buffered_frames);

Multiplex {
run: true,
Expand Down
13 changes: 10 additions & 3 deletions src/streaming/multiplex/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Frame, RequestId, StreamingMultiplex, Transport};
use super::{Frame, RequestId, StreamingMultiplex, Transport, MultiplexConfig};
use super::advanced::{Multiplex, MultiplexMessage};

use BindClient;
Expand Down Expand Up @@ -48,6 +48,11 @@ pub trait ClientProto<T: 'static>: 'static {
/// Build a transport from the given I/O object, using `self` for any
/// configuration.
fn bind_transport(&self, io: T) -> Self::BindTransport;

/// Returns the multiplex configuration values
fn multiplex_config(&self) -> MultiplexConfig {
MultiplexConfig::default()
}
}

impl<P, T, B> BindClient<StreamingMultiplex<B>, T> for P where
Expand All @@ -63,15 +68,17 @@ impl<P, T, B> BindClient<StreamingMultiplex<B>, T> for P where

fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
let (client, rx) = client_proxy::pair();
let multiplex_config = self.multiplex_config();

let task = self.bind_transport(io).into_future().and_then(|transport| {
let task = self.bind_transport(io).into_future().and_then(move |transport| {
let dispatch: Dispatch<P, T, B> = Dispatch {
transport: transport,
requests: rx,
in_flight: HashMap::new(),
next_request_id: 0,
};
Multiplex::new(dispatch)

Multiplex::new_configured(dispatch, &multiplex_config)
}).map_err(|e| {
// TODO: where to punt this error to?
debug!("multiplex task failed with error; err={:?}", e);
Expand Down
50 changes: 49 additions & 1 deletion src/streaming/multiplex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! See the crate-level docs for an overview.

use std::io;
use std::{io, u64};
use futures::{Stream, Sink, Async};
use tokio_core::io::{Io, Framed, Codec};

Expand All @@ -29,6 +29,54 @@ pub type RequestId = u64;
/// implement the `ClientProto` or `ServerProto` traits in this module.
pub struct StreamingMultiplex<B>(B);

/// Multiplex configuration options
#[derive(Debug, Copy, Clone)]
pub struct MultiplexConfig {
/// Maximum number of in-flight requests
max_in_flight: usize,
max_response_displacement: u64,
max_buffered_frames: usize,
}

impl MultiplexConfig {
/// Set the maximum number of requests to process concurrently
///
/// Default: 100
pub fn max_in_flight(&mut self, val: usize) -> &mut Self {
self.max_in_flight = val;
self
}

/// Maximum number of frames to buffer before stopping to read from the
/// transport
pub fn max_buffered_frames(&mut self, val: usize) -> &mut Self {
self.max_buffered_frames = val;
self
}

/// Set the maximum response displacement
///
/// For each request, the response displacement is the number of requests
/// that arrived after and were completed before. A value of 0 is equivalent
/// to pipelining for protocols without streaming bodies.
///
/// Default: `usize::MAX`
pub fn max_response_displacement(&mut self, val: u64) -> &mut Self {
self.max_response_displacement = val;
self
}
}

impl Default for MultiplexConfig {
fn default() -> Self {
MultiplexConfig {
max_in_flight: 100,
max_buffered_frames: 128,
max_response_displacement: u64::MAX,
}
}
}

/// Additional transport details relevant to streaming, multiplexed protocols.
///
/// All methods added in this trait have default implementations.
Expand Down
44 changes: 36 additions & 8 deletions src/streaming/multiplex/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Frame, RequestId, Transport};
use super::{Frame, RequestId, Transport, MultiplexConfig};
use super::advanced::{Multiplex, MultiplexMessage};

use BindServer;
Expand Down Expand Up @@ -65,6 +65,11 @@ pub trait ServerProto<T: 'static>: 'static {
/// Build a transport from the given I/O object, using `self` for any
/// configuration.
fn bind_transport(&self, io: T) -> Self::BindTransport;

/// Returns the multiplex configuration values
fn multiplex_config(&self) -> MultiplexConfig {
MultiplexConfig::default()
}
}

impl<P, T, B> BindServer<super::StreamingMultiplex<B>, T> for P where
Expand All @@ -81,13 +86,18 @@ impl<P, T, B> BindServer<super::StreamingMultiplex<B>, T> for P where
Response = Self::ServiceResponse,
Error = Self::ServiceError> + 'static
{
let task = self.bind_transport(io).into_future().and_then(|transport| {
let multiplex_config = self.multiplex_config();

let task = self.bind_transport(io).into_future().and_then(move |transport| {
let dispatch: Dispatch<S, T, P> = Dispatch {
service: service,
transport: transport,
in_flight: vec![],
num_received: 0,
config: multiplex_config,
};
Multiplex::new(dispatch)

Multiplex::new_configured(dispatch, &multiplex_config)
}).map_err(|_| ());

// Spawn the multiplex dispatcher
Expand All @@ -101,7 +111,11 @@ struct Dispatch<S, T, P> where
// The service handling the connection
service: S,
transport: P::Transport,
in_flight: Vec<(RequestId, InFlight<S::Future>)>,
in_flight: Vec<(RequestId, u64, InFlight<S::Future>)>,
// Counts the number of received requests
num_received: u64,
// Multiplex configuration settings
config: MultiplexConfig,
}

enum InFlight<F: Future> {
Expand Down Expand Up @@ -137,15 +151,26 @@ impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where

let mut idx = None;

for (i, &mut (request_id, ref mut slot)) in self.in_flight.iter_mut().enumerate() {
// Get the number for the earliest in-flight
let earliest = match self.in_flight.first() {
Some(&(_, num, _)) => {
num
}
None => return Ok(Async::NotReady),
};

for (i, &mut (request_id, num, ref mut slot)) in self.in_flight.iter_mut().enumerate() {
trace!(" --> poll; request_id={:?}", request_id);

if slot.poll() && idx.is_none() {
idx = Some(i);
if num - earliest <= self.config.max_response_displacement {
idx = Some(i);
}
}
}

if let Some(idx) = idx {
let (request_id, message) = self.in_flight.remove(idx);
let (request_id, _, message) = self.in_flight.remove(idx);
let message = MultiplexMessage {
id: request_id,
message: message.unwrap_done(),
Expand All @@ -166,8 +191,11 @@ impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
assert!(!solo);

if let Ok(request) = message {
let num = self.num_received;
self.num_received += 1;

let response = self.service.call(request);
self.in_flight.push((id, InFlight::Active(response)));
self.in_flight.push((id, num, InFlight::Active(response)));
}

// TODO: Should the error be handled differently?
Expand Down
49 changes: 41 additions & 8 deletions tests/support/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ use self::futures::sync::oneshot;
use self::futures::{Future, Stream, Sink, Poll, StartSend, Async};
use self::tokio_core::io::Io;
use self::tokio_core::reactor::Core;
use self::tokio_proto::streaming::multiplex;
use self::tokio_proto::streaming::multiplex::{self, MultiplexConfig};
use self::tokio_proto::streaming::pipeline;
use self::tokio_proto::streaming::{Message, Body};
use self::tokio_proto::util::client_proxy::Response;
use self::tokio_proto::{BindClient, BindServer};
use self::tokio_service::Service;

struct MockProtocol<T>(RefCell<Option<MockTransport<T>>>);
struct MockProtocol<T> {
transport: RefCell<Option<MockTransport<T>>>,
multiplex_config: Option<MultiplexConfig>,
}

impl<T, U, I> pipeline::ClientProto<I> for MockProtocol<pipeline::Frame<T, U, io::Error>>
where T: 'static,
Expand All @@ -39,7 +42,7 @@ impl<T, U, I> pipeline::ClientProto<I> for MockProtocol<pipeline::Frame<T, U, io

fn bind_transport(&self, _io: I)
-> Result<MockTransport<pipeline::Frame<T, U, io::Error>>, io::Error> {
Ok(self.0.borrow_mut().take().unwrap())
Ok(self.transport.borrow_mut().take().unwrap())
}
}

Expand All @@ -58,7 +61,11 @@ impl<T, U, I> multiplex::ClientProto<I> for MockProtocol<multiplex::Frame<T, U,

fn bind_transport(&self, _io: I)
-> Result<MockTransport<multiplex::Frame<T, U, io::Error>>, io::Error> {
Ok(self.0.borrow_mut().take().unwrap())
Ok(self.transport.borrow_mut().take().unwrap())
}

fn multiplex_config(&self) -> MultiplexConfig {
self.multiplex_config.unwrap_or(MultiplexConfig::default())
}
}

Expand All @@ -77,7 +84,7 @@ impl<T, U, I> pipeline::ServerProto<I> for MockProtocol<pipeline::Frame<T, U, io

fn bind_transport(&self, _io: I)
-> Result<MockTransport<pipeline::Frame<T, U, io::Error>>, io::Error> {
Ok(self.0.borrow_mut().take().unwrap())
Ok(self.transport.borrow_mut().take().unwrap())
}
}

Expand All @@ -96,7 +103,11 @@ impl<T, U, I> multiplex::ServerProto<I> for MockProtocol<multiplex::Frame<T, U,

fn bind_transport(&self, _io: I)
-> Result<MockTransport<multiplex::Frame<T, U, io::Error>>, io::Error> {
Ok(self.0.borrow_mut().take().unwrap())
Ok(self.transport.borrow_mut().take().unwrap())
}

fn multiplex_config(&self) -> MultiplexConfig {
self.multiplex_config.unwrap_or(MultiplexConfig::default())
}
}

Expand Down Expand Up @@ -186,15 +197,23 @@ impl<T> MockTransportCtl<T> {
fn transport<T>() -> (MockTransportCtl<T>, MockProtocol<T>) {
let (tx1, rx1) = mpsc::channel(1);
let (tx2, rx2) = mpsc::unbounded();

let ctl = MockTransportCtl {
tx: Some(tx2),
rx: rx1.wait(),
};

let transport = MockTransport {
tx: tx1,
rx: rx2,
};
(ctl, MockProtocol(RefCell::new(Some(transport))))

let proto = MockProtocol {
transport: RefCell::new(Some(transport)),
multiplex_config: None,
};

(ctl, proto)
}

struct CompleteOnDrop {
Expand Down Expand Up @@ -301,15 +320,29 @@ pub fn multiplex_client()
return (ctl, Box::new(service), Box::new(srv));
}

/// Spawns a new multiplex server with default settings, returning a handle to
/// the mock transport as well as a handle to the running server
pub fn multiplex_server<S>(s: S)
-> (MockTransportCtl<multiplex::Frame<&'static str, u32, io::Error>>, Box<Any>)
where S: Service<Request = Message<&'static str, Body<u32, io::Error>>,
Response = Message<&'static str, MockBodyStream>,
Error = io::Error> + Send + 'static,
{
configured_multiplex_server(s, MultiplexConfig::default())
}

/// Spawns a new multiplex server, returning a handle to the mock transport as
/// well as a handle to the running server
pub fn configured_multiplex_server<S>(s: S, config: MultiplexConfig)
-> (MockTransportCtl<multiplex::Frame<&'static str, u32, io::Error>>, Box<Any>)
where S: Service<Request = Message<&'static str, Body<u32, io::Error>>,
Response = Message<&'static str, MockBodyStream>,
Error = io::Error> + Send + 'static,
{
drop(env_logger::init());

let (ctl, proto) = transport();
let (ctl, mut proto) = transport();
proto.multiplex_config = Some(config);

let (finished_tx, finished_rx) = oneshot::channel();
let t = thread::spawn(move || {
Expand Down
Loading