Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Async Gzip Decoding #165

Merged
merged 2 commits into from
Aug 21, 2017

Conversation

KodrAus
Copy link
Contributor

@KodrAus KodrAus commented Jul 14, 2017

For #161

I figured I'd open this up for visibility, and because it's easier to talk about code in PRs than issues or gitter. It's an attempt to support decoding in the async implementation, but there may be better approaches. This does introduce a bit of indirection and complexity.

let work = client.get("https://hyper.rs").unwrap().send().map(|res| {
println!("{}", res.status());
});
let work = client.get("http://localhost:8080/range/4096?duration=5&chunk_size=300")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just been using the samples to test. I'll go revert them at some point.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idea that might make testing this as you go easier.

Not sure how easy it is to set up the test stub server (usage here) to chunk and stutter the packets back to behave like more like a scenario you'd want to test handling using async, but if its not too difficult I think it would make debugging and testing as you go easier than using thee example. At least that was my experience when I initially added gzip decoding. Also means you won't have to do that work at the end 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @echochamber! That's a great idea. Debugging has been tricky, since there are a few permutations of this code that behave differently. I need to get some proper tests in there.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see adding something like chunk_size: 300 as part of the server! macro, to change from using write_all on the whole response to looping and trying to write that number of bytes at a time.

Usage of the macro could look like this:

let server = server! {
    request: expected_req,
    response: bytes_to_respond_with,
    chunk_size: 300
};

What does duration do? Is that a timeout between chunks?

Implementation in the support/server.rs would need these things:

  • add chunk_size: Option<usize> to the Txn struct
  • Update the [write portion[(https://github.com/seanmonstar/reqwest/blob/master/tests/support/server.rs#L57-L64) to loop on writing if let Some(size) = self.chunk_size.

Copy link
Contributor Author

@KodrAus KodrAus Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll work on the server and revert these examples 👍

What does duration do? Is that a timeout between chunks?

Yeh this is currently just pointing at a local httpbin (the hosted one is behind nginx so it won't actually chunk responses). From memory it's a delay between the chunks. It'd be nice to be able to tell the server when to send the next chunk so we can force WouldBlock imperatively, but don't want to complicate it too much.

/// A lazily constructed response decoder.
pub struct LazyDecoder<TBodyStream>(LazyDecoderInner<TBodyStream>);

enum LazyDecoderInner<TBodyStream> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... Looking at this now I think this could all go away by just converting the gzip encoder into inner. So it doesn't matter if the body stream is already in a decoder.

@KodrAus
Copy link
Contributor Author

KodrAus commented Jul 17, 2017

The current tests picked up a nice little bug in my changes where a small chunk was being dropped rather than retained to be errored on later. I've fixed that up.

If you're happy with the general approach @seanmonstar then I'll go add a bunch of test cases. It'd be nice to make a test server that can programmatically delay chunks so we can test WouldBlock paths.

@KodrAus KodrAus changed the title [WIP] Support Async Gzip Decoding Support Async Gzip Decoding Jul 18, 2017
Copy link
Owner

@seanmonstar seanmonstar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks for getting this started!

I'd probably go about it this way:

  • Create a Decoder in async_impl/decoder.rs. This would implement Stream, and have an inner enum of whether it is plain text or gzip (or someday brotli or whatever). I haven't read the libflate source, so I don't know the answer to this: is it always possible to decode a chunk, no matter the size? If not, then the gzip variant would need a buffer, and try to decode and yield a Chunk when decoding is successful.
  • I'd leave the ReadableBody thing in src/response.rs, as a way of doing synchronous reading.

let work = client.get("https://hyper.rs").unwrap().send().map(|res| {
println!("{}", res.status());
});
let work = client.get("http://localhost:8080/range/4096?duration=5&chunk_size=300")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see adding something like chunk_size: 300 as part of the server! macro, to change from using write_all on the whole response to looping and trying to write that number of bytes at a time.

Usage of the macro could look like this:

let server = server! {
    request: expected_req,
    response: bytes_to_respond_with,
    chunk_size: 300
};

What does duration do? Is that a timeout between chunks?

Implementation in the support/server.rs would need these things:

  • add chunk_size: Option<usize> to the Txn struct
  • Update the [write portion[(https://github.com/seanmonstar/reqwest/blob/master/tests/support/server.rs#L57-L64) to loop on writing if let Some(size) = self.chunk_size.

@@ -16,7 +16,7 @@ fn run() -> Result<()> {

println!("GET https://www.rust-lang.org");

let mut res = reqwest::get("https://www.rust-lang.org/en-US/")?;
let mut res = reqwest::get("http://localhost:8080/range/4096?duration=5&chunk_size=300")?;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving a reminder so we don't forget to switch this back.



/// A Response to a submitted `Request`.
pub struct Response {
status: StatusCode,
headers: Headers,
url: Url,
body: Body,
body: Decoder<Body>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it probably makes sense that the body is a Decoder instead, like you've done here. I'd probably remove the generic, and just expose Decoder (at least, publicly. If that wraps something that is generic internally, that's fine.)

@@ -45,17 +51,13 @@ impl Response {
&mut self.headers
}

/// Get a mutable reference to the `Body` of this `Response`.
#[inline]
pub fn body_mut(&mut self) -> &mut Body {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd leave this in, but returning &mut Decoder.

@@ -95,6 +97,25 @@ impl Response {
}
}

impl Read for Response {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not impl Read for the async Response, I think. Maybe it does make sense, but probably a separate issue.

@KodrAus
Copy link
Contributor Author

KodrAus commented Jul 25, 2017

Thanks for the feedback @seanmonstar. As we discussed on gitter, libflate requires wrapping a Read, which is why I've wired this up the way that I have. I'm also not a huge fan of Read on the async response (it's needed for AsyncRead), because it'll panic if you call read() outside of an executing task, which is a shame. That's why there's that PollChunk trait, so the synchronous impl can swap the Stream out for a WaitStream and read synchronously until it gets to the end of a chunk.

So I'm not really sure how to go about refactoring the decoder to work on chunks as you described yet @seanmonstar but don't want this to stall so I'll work on the test server and get a decent suite of tests up in the meantime.

@echochamber
Copy link
Contributor

@KodrAus Could we change decoder to implement a future::Future trait instead of read, and in the synchronous implementation just call wait() on it?

@KodrAus
Copy link
Contributor Author

KodrAus commented Jul 27, 2017

@echochamber I did that initially but came to the conclusion that:

  • If the Future just returns the complete potentially decompressed response in some buffer then it's too limiting. What if the client wants to supply their own buffer? What if it's too small?
  • If the Future just performs a read then it's a lot of up-front effort allocating Arcs to handle timeouts for each call. Especially with serde_json::from_reader that'll just read 1 byte at a time.

So I ended up with this implementation that can read synchronously without blocking unless a chunk is unavailable.

Note that for the second point I didn't think about wrapping the asynchronous body in a BufRead on the synchronous side, which would probably solve that problem of allocating for each call to read. It just didn't cross my mind at the time. It's probably worth looking into and could make this a lot simpler.

@ishitatsuyuki
Copy link

I looked at all the compression libraries, and flate2 has BufRead implementation which can follow hyper's zero-copy policy. I have also opened tokio-rs/tokio-io#61 to request adapters.

We can then wrap the Body stream to make it yield buffers. Not sure if such (Async)BufRead wrapper should implemented in hyper or reqwest.

@KodrAus
Copy link
Contributor Author

KodrAus commented Jul 31, 2017

Thanks @ishitatsuyuki 👍 I think the best way forward is to wrap the AsyncRead in a sync BufReader for the synchronous Response. That means we can do large, infrequent reads from the AsyncRead, which means setting up some shared state, and individual reads on the synchronous side will be mostly cheap because the decompressed bytes are already there.

As far as I know, we're using libflate because it has better cross-platform support than flate2.

If we really don't want the async Response to implement Read, then we might have to figure something out about tracking yet another BytesMut, reading into slices of that and emitting those slices as chunks. That seems less ideal to me than just dealing with a read-like API though.

Does anyone have any other thoughts before I go and make those changes? @seanmonstar @echochamber @ishitatsuyuki?

@echochamber
Copy link
Contributor

echochamber commented Aug 1, 2017

No problems, sounds good to me.

libflate because it has better cross-platform support than flate2

You are correct. Kind of anyway. Flate2 works on windows, it just requires the users to install MSVC.

@ishitatsuyuki
Copy link

No, BufReader is an additional layer of nonsense. As Chunk is already buffered, we should wrap it to satisfy the read_buf method.
Using flate2 definitely makes a difference because it can handle BufRead resulting in less copies. This matches hyper's aim.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 1, 2017

I'm not sure I understand you @ishitatsuyuki. I've arrived at BufReader based on the following:

  • flate2 requires msvc on Windows, which makes it a non-starter for reqwest
  • libflate requires a Read. It doesn't have BufRead support
  • You can't decompress any arbitrary chunk of a gzipped response, so we can't simply decompress the chunks we receive independently and emit those in a stream
  • My current implementation already does minimise copies, but is complex to work around the fact that we can't poll a futures::Stream without being in the context of an executing task
  • Without that complexity we need to wrap all synchronous calls to read in a timeout, which makes it particularly slow for readers that only grab 1 byte at a time
  • We can improve that situation by wrapping the slow reader in a BufReader. Maybe it's not appropriate for reqwest to do that though and expect a caller to. Or we just do that in calls to json()
  • The async response doesn't introduce another buffer that data is copied into

I think maybe we should sketch out a plan of what we expect the sync and async responses to look like, so we're all on the same page.

I think (straw-man names):

  • async::Response provides a Decoder that is AsyncRead that is accessible by calling body() and emits decompressed bytes
  • async::Response provides a method like raw_chunks() that is Stream<Item = Chunk>. Those Chunks aren't decompressed
  • sync::Response wraps async::Decoder as AsyncRead in a synchronous reader, sync::Decoder. This means sync::Decoder needs to wrap the whole call to async::Decoder.read() in a wait::timeout so chunks can be polled as necessary, or it needs to call async::Decoder.read() synchronously and wrap the chunk polling in a wait::stream (this is what my PR does currently)
  • If sync::Decoder.read() always calls wait::timeout, then sync::Response will limit calls to sync::Decoder by wrapping it in a BufReader

Do you have other ideas of how this should work @ishitatsuyuki?

One other idea is that we have totally separate async::Decoder and sync::Decoders, so there's some amount of duplication, but the current sync::Response implementation doesn't really change.

@ishitatsuyuki
Copy link

ishitatsuyuki commented Aug 1, 2017

This is what I think:

  • the internal Body implements Read the way it should, and BufRead by returning a slice of chunk without copy.
  • flate2 is definitely a win here; but well, it may be feature gated. It should be transparent as the user only see Read in the interface.
  • for the blocking needs, can we call Core::run with one of the tokio-io wrapper?

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 2, 2017

Ok I've made some changes:

  • The BufRead thing didn't quite pan out because we peek some bytes from the response to check whether it's empty or not, but that might be non-blocking. So I've wrapped that in a future. The blocking side will still only block the decoder if it needs to fetch a chunk
  • Removed the weird wrap_stream stuff and just don't create a Decoder unless the user asks for one
  • Added some quick chunking to the test server
  • Added a simple async gzip test

The diff on the last commit is really noisy so it's probably worth looking at it from the top again.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 2, 2017

Travis says no. Looks like a random Travis fail?

@ishitatsuyuki
Copy link

It just seems that it's hitting the per user build limit and being queued.

DecoderInnerFuture::PendingGzip(ref mut body) => {
match body.read(&mut peek) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
read @ _ => read

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition doesn't look cool to me. Can you explain? Is it same as x => x?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ishitatsuyuki That's right. I refactored it into this shape but should simplify it.

The following types directly support the gzip compression case:

- `DecoderInnerFuture` is a non-blocking constructor for a `Decoder` in case the body needs to be read
- `Peeked` is a wrapper around a peeked byte that prevents an EOF on read

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. Anything asynchronous should never block and returning WouldBlock instead. What do you mean by "EOF on read" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because libflate does a read_exact for 2 bytes, which will return an eof error if there aren't 2 bytes available.

So we've got a wrapper that reads a byte and checks whether the response is actually empty.

Copy link

@ishitatsuyuki ishitatsuyuki Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... from the docs of read_exact:

If any other read error is encountered then this function immediately returns. The contents of buf are unspecified in this case.

If this function returns an error, it is unspecified how many bytes it has read, but it will never read more than would be necessary to completely fill the buffer.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is not that libflate returns EOF, it's that even if the body were empty, the read_exact will return an UnexpectedEof error. Users should be able to expect that an empty body just returns Ok(0).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah my mistake, thanks @seanmonstar

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that special casing is the best way? It's not gzip in that case then.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could imagine a less-stellar server setting a header universally, but then it turns out the file it is serving is empty.

Of course, in this case, since the bytes are coming in a Stream of Chunks, we can detect EOF before ever using the libflate encoder: if poll returns Ok(Async::Ready(None)), that's the EOF. No need to ask libflate to decode then :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes we could do that... I could just simplify the gzip future I've already got 👍

@ishitatsuyuki
Copy link

If you're looking a way to provide Read's all-or-error guarantee, either:

  • return at chunk boundary.
  • check the size first and read beforehand.
  • fake a Ok when it blocks. Defer the error until the internal buffer is completely empty.

See also what BufReader in std do. It may greatly simplify the implementation if you implemented BufRead at first.

}
}

pub struct DecodedBody {
Copy link
Owner

@seanmonstar seanmonstar Aug 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, my opinion may seem odd, and if so, that's OK. If it seems too absurd, let me know.

But, here's an API that I think would make sense to expose:

impl Response {
    pub fn body_mut(&mut self) -> &mut Decoder {
        // ...
    }
}

pub struct Decoder {
    inner: DecoderImplWhateverNameNotSuperImportant,
}

impl Stream for Decoder {
    type Item = Chunk;
    type Error = io::Error;
}

I realize that libflate needs an io::Read, but I'm not certain yet about exposing those traits to the public.

The inner decoder impl can provide a Read for libflate to use, which does like you've done, just polling a chunk out when needed. When the libflate decoder returns some decoded bytes, that can be emitted as a new Chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just got a few questions (sorry I don't usually do this kind of programming):

  • Would you expect the synchronous response to consume the Decoder: Stream, or the DecoderImplWhateverNameNotSuperImportant: Read? I guess the inner decoder could be both Read and Stream, so the plain text decoding just means passing on chunks
  • If we're going to emit chunks for the gzip decoder, then we'll need to read them into some buffer. I'm not sure how big that should be or where it should come from

So I'm not sure whether this should change the way I've done the Decoder completely or if it's just a matter of the async Response using DecoderImplWhateverNameNotSuperImportant: Stream and the sync Response using DecoderImplWhateverNameNotSuperImportant: Read.

@ishitatsuyuki
Copy link

ishitatsuyuki commented Aug 3, 2017

Additional one. As both libflate and flate2 expects read_exact() to be all-or-nothing (they don't expect retry on error), we may need to have a specialized implementation to do so.

@@ -55,10 +56,16 @@ pub fn spawn(txns: Vec<Txn>) -> Server {
}

if let Some(dur) = txn.write_timeout {
let headers_end = ::std::str::from_utf8(&reply).unwrap().find("\r\n\r\n").unwrap() + 4;
let headers_end = unsafe { ::std::str::from_utf8_unchecked(&reply) }.find("\r\n\r\n").unwrap() + 4;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you creating a potential root of UB here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because gzipped content is not valid UTF8 and we'll always hit the sequence before the invalid content. Again, this is a test server so it's not so important that this is unsafe. If something unexpected happens then the test will fail.

But the road to ruin is paved with good intentions so I think it's reasonable to call this out and do it in an alternative way without the unsafe.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that it is UB doesn't change, and this code may break any time. Change the type signature instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this to just operate over the [u8] instead of turning it into a str for the sake of convenience.

@@ -23,6 +23,7 @@ pub struct Txn {
pub read_timeout: Option<Duration>,
pub response_timeout: Option<Duration>,
pub write_timeout: Option<Duration>,
pub chunk_size: Option<usize>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically simply slowing down tests, as you're just stressing hyper grouping delayed response in that case. To divide it into chunks, apply the real chunked encoding. See https://en.wikipedia.org/wiki/Chunked_transfer_encoding for details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the point. It's blocking the response to force WouldBlock. It doesn't necessarily need to be chunked encoding because that isn't what it's testing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? You don't need to split them into chunks, as it's just the difference of 3 WouldBlock vs 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it blocks is important because the internal state of the decoder will change over time. That is, blocking before any bytes are received is a different code path to blocking after bytes are received.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't get any data before the response is complete. You get a chunk of the whole response after you send everything.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, you will get interesting results if you sent it with chunk encoding of size 1. It should break because read_exact is not transactional (see above).

Copy link
Contributor Author

@KodrAus KodrAus Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked this out and you're right, the response isn't received until all chunks are received. I was hoping I wouldn't have to do actual chunked encoding here but looks like I will. I'll worry about this one later though, I haven't quite finished the decoder side yet.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you should get a stream of chunks as they are received, not at the end of the response, that's the point of the streaming interface. That is to say, even with a body that has a Content-Length: 15, if a read in hyper receives 5 bytes, it will yield a Chunk of those 5 bytes, and register to read more (still expecting 10 bytes).

However, it is also true that if the body is broken up by the chunked encoding, hyper will yield Chunks each time it reaches the end of an encoding "chunk". So, if it received 5\r\nabcde\r\n1\r\nf\r\n, it would yield a Chunk of 5, and then another poll would yield a Chunk of 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must've been doing something else wrong here before, because changing this back to emit the body in chunks without explicit chunked encoding is giving us individual chunks and hitting the right WouldBlocks...

let chunk_start = *pos;
let len = cmp::min(buf.len(), chunk.len() - chunk_start);
let chunk_end = chunk_start + len;
buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using [u8]'s Read implementation may be easier.


fn into_inner(self) -> TStream {
match self.state {
ReadState::Ready(_, _) => panic!("attempted to take used reader"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to rethink these error conditions. They should effectively be unreachable, but the invariants are spread a bit thin.

Maybe it would be better to return Option or Result here and expect it in the synchronous response? Or just silently ignore the first chunk of there is one, but that seems worse to me.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 6, 2017

I've pushed a few changes but this isn't ready for another review yet. I'll send a ping when I think it is.

Copy link

@ishitatsuyuki ishitatsuyuki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to do so? I think this is completely nonsense.

@seanmonstar
Copy link
Owner

Any reason to do so? I think this is completely nonsense.

What in particular are you referring to? Also, calling people's work "complete nonsense" is kind of hostile, could you dial it back? Pointing out specific flaws will be much more useful for everyone.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 8, 2017

I think the 'complete nonsense' @ishitatsuyuki is referring to is the fact that we're decompressing into an intermediate buffer, not the fact that I haven't implemented it properly yet.

To be honest, I'm also not 100% sure why we don't want the response body to be AsyncRead, but I figure if it becomes something we do want in the future it's not too difficult to add it. I think there are also a bunch of strategies for minimising the cost of that buffer but I haven't done any benchmarking so don't know what the impact is.

@ishitatsuyuki
Copy link

I intended to comment on the latest commit, but for some reason the mobile UI turned it into a review.

I don't think using a stream fits here. HTTP response is just a stream of bytes, and I think there's no better solution than Read/AsyncRead.

Anyway, do you have a particular reason to make it a Stream? There's more overhead, it is less intuitive, and API compatibility is not a problem.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 9, 2017

@ishitatsuyuki I basically agree with you. Maybe @seanmonstar can shed some more light on why this needs to be Stream and not AsyncRead.

But I think it's also worth considering that this is an unstable API. I don't think we need to get it perfect right away. For the stable API there'll be an impact on memory usage for compressed responses but that could be improved through major changes to this async API in non-breaking version bumps (I'm assuming the unstable module isn't semver'd).

So personally I'm happy for this to move forward in any shape that @seanmonstar is happy with.

@ishitatsuyuki
Copy link

I cannot understand why you're moving to use Stream where AsyncRead would be definitely be better. I saw too much changes that it would require substantial amount of work to revert to AsyncRead.

Can you elaborate on why you're switching to Chunks?

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 9, 2017

Yeh I get that it doesn't seem ideal, but I don't really want to push back on the design right now. I'd rather focus on having a robust implementation that isn't going to cause regressions for all the current synchronous reqwest users.

@ishitatsuyuki Your reviews have been great, you've caught a lot of errors in this PR. So I think any more bug-related feedback you have would help drive this forward more than the merits of AsyncRead vs Stream right now because I already agree with your points there.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 12, 2017

There's a debug_assert failing in libflate that I haven't quite figured out yet. Swapping to flate2 works fine.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 13, 2017

@seanmonstar other than the debug_assert failure this should be ready for another look.

@KodrAus
Copy link
Contributor Author

KodrAus commented Aug 13, 2017

Looks like the libflate bug is all fixed now 🎉

Copy link

@ishitatsuyuki ishitatsuyuki left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work! I didn't inspect the implementation closely because I believe with buffering it's mostly safe. I reviewed the tests carefully instead.

@@ -1,4 +1,7 @@
//! `cargo run --example simple`

#![allow(unknown_lints, unused_doc_comment)]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a leftover. Upgrade error-chain, although it's still in rc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only a dev-dependency so I have no objection to using an rc build.

@@ -13,4 +13,4 @@ fn main() {
});

core.run(work).unwrap();
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjust your editor settings.

tests/async.rs Outdated
\r\n\
",
chunk_size: chunk_size,
write_timeout: Duration::from_secs(1),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend a smaller delay here. 5~10 ms is enough for machines today, as the operation happens at microseconds.

Also, I would recommend an extreme case with chunked encoding of 1 byte each line. That stress tests the buffer implementation to provide a correct implementation of read_exact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is interesting. libflate does not like reading 1 byte. I think I'll need to bring back the Peeked type to cater for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a little 2 byte buf reader just to make sure that would keep libflate happy. Looks like it does. I'll simplify it a bit later on.

tests/gzip.rs Outdated
@@ -34,14 +38,16 @@ fn test_gzip_response() {
Accept-Encoding: gzip\r\n\
\r\n\
",
chunk_size: chunk_size,
write_timeout: Duration::from_secs(1),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Smaller delay here too.

// - a `chunk_size` has been supplied
//
// the server won't read headers so if the response doesn't specify `Transfer-Encoding`
// then it'll misbehave and send an invalid response.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend a debug assertion if it's easy to implement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍 I can search the response bytes for a Transfer-Encoding header and assert it's present.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the server needs to do it's own chunked encoding, that can be part of the text passed to the macro, so that the exact text expected is always visible in the test cases.

Copy link
Owner

@seanmonstar seanmonstar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work!

@@ -23,6 +23,7 @@ pub struct Txn {
pub read_timeout: Option<Duration>,
pub response_timeout: Option<Duration>,
pub write_timeout: Option<Duration>,
pub chunk_size: Option<usize>,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, you should get a stream of chunks as they are received, not at the end of the response, that's the point of the streaming interface. That is to say, even with a body that has a Content-Length: 15, if a read in hyper receives 5 bytes, it will yield a Chunk of those 5 bytes, and register to read more (still expecting 10 bytes).

However, it is also true that if the body is broken up by the chunked encoding, hyper will yield Chunks each time it reaches the end of an encoding "chunk". So, if it received 5\r\nabcde\r\n1\r\nf\r\n, it would yield a Chunk of 5, and then another poll would yield a Chunk of 1.

// - a `chunk_size` has been supplied
//
// the server won't read headers so if the response doesn't specify `Transfer-Encoding`
// then it'll misbehave and send an invalid response.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the server needs to do it's own chunked encoding, that can be part of the text passed to the macro, so that the exact text expected is always visible in the test cases.

- `Pending` is a non-blocking constructor for a `Decoder` in case the body needs to be checked for EOF
- `Peeked` is a buffer that keeps a few bytes available so `libflate`s `read_exact` calls won't fail
*/

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The work in the module is phenomenal! It was quite easy to read and follow what was happening, thanks!

One comment is that the style throughout the rest of the crate is to prefer generics of just a single letter. As for my personal opinion, I find seeing argument types that are a word makes me immediately assume it's some concrete type, like TStream, instead of T, where I know that is a generic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I've updated the generics to match the rest of the crate. I'll rebase this on master now.

@KodrAus KodrAus force-pushed the feat/async-decoder branch from de1e2ec to e2fa972 Compare August 18, 2017 09:46
src/response.rs Outdated
@@ -183,11 +182,11 @@ impl Response {
/// ```
#[inline]
pub fn error_for_status(self) -> ::Result<Self> {
let Response { body, inner, _thread_handle } = self;
let Response { inner, body, _thread_handle } = self;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious reordering :)

///
/// This function will replace the body on the response with an empty one.
#[inline]
pub fn body(&mut self) -> Decoder {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually would have body() -> &Decoder, to match the http crate...

Taking a Decoder can be done with mem::swap and body_mut(), right?

Copy link
Contributor Author

@KodrAus KodrAus Aug 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit unintuitive to have to use mem::replace, but I guess at some point if this uses http::Response then we'll have into_parts.

I've updated the async sample to show how you can move the body out.

@KodrAus KodrAus force-pushed the feat/async-decoder branch from 18149b0 to 2cb70c8 Compare August 19, 2017 08:02
@seanmonstar seanmonstar merged commit fe8c7a2 into seanmonstar:master Aug 21, 2017
@seanmonstar
Copy link
Owner

Thanks for this spectacular work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants