Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start hooking up data receiving #14

Merged
merged 3 commits into from
Aug 8, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use proto::{self, Connection};
use error::Reason::*;

use http::{self, Request, Response};
use futures::{Future, Poll, Sink, AsyncSink};
use futures::{self, Future, Poll, Sink, AsyncSink};
use tokio_io::{AsyncRead, AsyncWrite};
use bytes::{Bytes, IntoBuf};

Expand All @@ -23,12 +23,21 @@ pub struct Client<T, B: IntoBuf> {
connection: Connection<T, Peer, B>,
}

/// Client half of an active HTTP/2.0 stream.
#[derive(Debug)]
pub struct Stream<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}

#[derive(Debug)]
pub struct Body<B: IntoBuf> {
inner: proto::StreamRef<Peer, B::Buf>,
}

#[derive(Debug)]
pub struct Chunk<B: IntoBuf> {
inner: proto::Chunk<Peer, B::Buf>,
}

impl<T> Client<T, Bytes>
where T: AsyncRead + AsyncWrite + 'static,
{
Expand Down Expand Up @@ -140,8 +149,11 @@ impl<T, B> fmt::Debug for Handshake<T, B>

impl<B: IntoBuf> Stream<B> {
/// Receive the HTTP/2.0 response, if it is ready.
pub fn poll_response(&mut self) -> Poll<Response<()>, ConnectionError> {
self.inner.poll_response()
pub fn poll_response(&mut self) -> Poll<Response<Body<B>>, ConnectionError> {
let (parts, _) = try_ready!(self.inner.poll_response()).into_parts();
let body = Body { inner: self.inner.clone() };

Ok(Response::from_parts(parts, body).into())
}

/// Send data
Expand All @@ -160,14 +172,36 @@ impl<B: IntoBuf> Stream<B> {
}

impl<B: IntoBuf> Future for Stream<B> {
type Item = Response<()>;
type Item = Response<Body<B>>;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.poll_response()
}
}

// ===== impl Body =====

impl<B: IntoBuf> futures::Stream for Body<B> {
type Item = Chunk<B>;
type Error = ConnectionError;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let chunk = try_ready!(self.inner.poll_data())
.map(|inner| Chunk { inner });

Ok(chunk.into())
}
}

// ===== impl Chunk =====

impl<B: IntoBuf> Chunk<B> {
pub fn pop_bytes(&mut self) -> Option<Bytes> {
self.inner.pop_bytes()
}
}

// ===== impl Peer =====

impl proto::Peer for Peer {
Expand Down
9 changes: 9 additions & 0 deletions src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ pub enum Frame<T = Bytes> {
}

impl<T> Frame<T> {
/// Returns true if the frame is a DATA frame.
pub fn is_data(&self) -> bool {
use self::Frame::*;

match *self {
Data(..) => true,
_ => false,
}
}
}

impl<T> fmt::Debug for Frame<T> {
Expand Down
13 changes: 1 addition & 12 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,8 @@ impl<T, P, B> Connection<T, P, B>
*/
}
Some(Data(frame)) => {
unimplemented!();
/*
trace!("recv DATA; frame={:?}", frame);
try!(self.streams.recv_data(&frame));

let frame = Frame::Data {
id: frame.stream_id(),
end_of_stream: frame.is_end_stream(),
data: frame.into_payload(),
};

return Ok(Some(frame).into());
*/
try!(self.streams.recv_data(frame));
}
Some(Reset(frame)) => {
unimplemented!();
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod settings;
mod streams;

pub use self::connection::Connection;
pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};

use self::framed_read::FramedRead;
use self::framed_write::FramedWrite;
Expand Down
49 changes: 49 additions & 0 deletions src/proto/streams/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,53 @@ impl<B> Deque<B> {
None => None,
}
}

pub fn take_while<F>(&mut self, buf: &mut Buffer<B>, mut f: F) -> Self
where F: FnMut(&Frame<B>) -> bool
{
match self.indices {
Some(mut idxs) => {
if !f(&buf.slab[idxs.head].frame) {
return Deque::new();
}

let head = idxs.head;
let mut tail = idxs.head;

loop {
let next = match buf.slab[tail].next {
Some(next) => next,
None => {
self.indices = None;
return Deque {
indices: Some(idxs),
_p: PhantomData,
};
}
};

if !f(&buf.slab[next].frame) {
// Split the linked list
buf.slab[tail].next = None;

self.indices = Some(Indices {
head: next,
tail: idxs.tail,
});

return Deque {
indices: Some(Indices {
head: head,
tail: tail,
}),
_p: PhantomData,
}
}

tail = next;
}
}
None => Deque::new(),
}
}
}
2 changes: 1 addition & 1 deletion src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod store;
mod stream;
mod streams;

pub use self::streams::{Streams, StreamRef};
pub use self::streams::{Streams, StreamRef, Chunk};

use self::buffer::Buffer;
use self::flow_control::FlowControl;
Expand Down
43 changes: 42 additions & 1 deletion src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ pub(super) struct Recv<P, B> {
_p: PhantomData<(P, B)>,
}

#[derive(Debug)]
pub(super) struct Chunk {
/// Data frames pending receival
pub pending_recv: buffer::Deque<Bytes>,
}

impl<P, B> Recv<P, B>
where P: Peer,
B: Buf,
Expand Down Expand Up @@ -98,7 +104,7 @@ impl<P, B> Recv<P, B>
}

pub fn recv_data(&mut self,
frame: &frame::Data,
frame: frame::Data,
stream: &mut Stream<B>)
-> Result<(), ConnectionError>
{
Expand Down Expand Up @@ -130,6 +136,10 @@ impl<P, B> Recv<P, B>
try!(stream.state.recv_close());
}

// Push the frame onto the recv buffer
stream.pending_recv.push_back(&mut self.buffer, frame.into());
stream.notify_recv();

Ok(())
}

Expand Down Expand Up @@ -218,6 +228,37 @@ impl<P, B> Recv<P, B>
Ok(().into())
}


pub fn poll_chunk(&mut self, stream: &mut Stream<B>)
-> Poll<Option<Chunk>, ConnectionError>
{
let frames = stream.pending_recv
.take_while(&mut self.buffer, |frame| frame.is_data());

if frames.is_empty() {
if stream.state.is_recv_closed() {
Ok(None.into())
} else {
stream.recv_task = Some(task::current());
Ok(Async::NotReady)
}
} else {
Ok(Some(Chunk {
pending_recv: frames,
}).into())
}
}

pub fn pop_bytes(&mut self, chunk: &mut Chunk) -> Option<Bytes> {
match chunk.pending_recv.pop_front(&mut self.buffer) {
Some(Frame::Data(frame)) => {
Some(frame.into_payload())
}
None => None,
_ => panic!("unexpected frame type"),
}
}

/// Send stream level window update
pub fn send_stream_window_update<T>(&mut self,
streams: &mut Store<B>,
Expand Down
7 changes: 7 additions & 0 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,13 @@ impl State {
}
}

pub fn is_recv_closed(&self) -> bool {
match self.inner {
Closed(..) | HalfClosedRemote(..) => true,
_ => false,
}
}

pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> {
match self.inner {
Open { ref mut remote, .. } |
Expand Down
80 changes: 79 additions & 1 deletion src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ pub struct StreamRef<P, B> {
key: store::Key,
}

#[derive(Debug)]
pub struct Chunk<P, B>
where P: Peer,
B: Buf,
{
inner: Arc<Mutex<Inner<P, B>>>,
recv: recv::Chunk,
}

/// Fields needed to manage state related to managing the set of streams. This
/// is mostly split out to make ownership happy.
///
Expand Down Expand Up @@ -103,7 +112,7 @@ impl<P, B> Streams<P, B>
Ok(ret)
}

pub fn recv_data(&mut self, frame: &frame::Data)
pub fn recv_data(&mut self, frame: frame::Data)
-> Result<(), ConnectionError>
{
let id = frame.stream_id();
Expand Down Expand Up @@ -305,6 +314,34 @@ impl<B> Streams<client::Peer, B>
}
}

// ===== impl StreamRef =====

impl<P, B> StreamRef<P, B>
where P: Peer,
B: Buf,
{
pub fn poll_data(&mut self) -> Poll<Option<Chunk<P, B>>, ConnectionError> {
let recv = {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut stream = me.store.resolve(self.key);

try_ready!(me.actions.recv.poll_chunk(&mut stream))
};

// Convert to a chunk
let chunk = recv.map(|recv| {
Chunk {
inner: self.inner.clone(),
recv: recv,
}
});

Ok(chunk.into())
}
}

impl<B> StreamRef<client::Peer, B>
where B: Buf,
{
Expand All @@ -318,6 +355,47 @@ impl<B> StreamRef<client::Peer, B>
}
}



impl<P, B> Clone for StreamRef<P, B> {
fn clone(&self) -> Self {
StreamRef {
inner: self.inner.clone(),
key: self.key.clone(),
}
}
}

// ===== impl Chunk =====

impl<P, B> Chunk<P, B>
where P: Peer,
B: Buf,
{
// TODO: Come up w/ a better API
pub fn pop_bytes(&mut self) -> Option<Bytes> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

me.actions.recv.pop_bytes(&mut self.recv)
}
}

impl<P, B> Drop for Chunk<P, B>
where P: Peer,
B: Buf,
{
fn drop(&mut self) {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

while let Some(_) = me.actions.recv.pop_bytes(&mut self.recv) {
}
}
}

// ===== impl Actions =====

impl<P, B> Actions<P, B>
where P: Peer,
B: Buf,
Expand Down
Loading