Skip to content

Commit

Permalink
Try using RFC 2930 for AsyncRead impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Nemo157 committed May 27, 2020
1 parent f157007 commit 88b7308
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 19 deletions.
14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ futures = "0.3.0"
futures-test = "0.3.0"
ntest = "0.3.3"
timebomb = "0.1.2"
bytes = "0.5.0"

[[test]]
name = "brotli"
required-features = ["all-implementations", "brotli"]
required-features = ["brotli"]

[[test]]
name = "bzip2"
Expand All @@ -81,6 +82,17 @@ required-features = ["all-implementations", "zlib"]
name = "zstd"
required-features = ["all-implementations", "zstd"]

[[test]]
name = "lzma"
required-features = ["all-implementations", "lzma"]

[[test]]
name = "xz"
required-features = ["all-implementations", "xz"]

[[test]]
name = "proptest"
required-features = ["all"]

[patch.crates-io]
futures-io = { git = "https://github.com/Nemo157/futures-rs", branch = "read-buf-rfc-2930" }
15 changes: 9 additions & 6 deletions src/codec/brotli/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use crate::{codec::Encode, util::PartialBuffer};
use std::{
fmt,
io::{Error, ErrorKind, Result},
mem::MaybeUninit,
};
use futures_io::ReadBuf;

use brotli::enc::{
backward_references::BrotliEncoderParams,
Expand All @@ -27,11 +29,12 @@ impl BrotliEncoder {
fn encode(
&mut self,
input: &mut PartialBuffer<impl AsRef<[u8]>>,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
op: BrotliEncoderOperation,
) -> Result<()> {
let in_buf = input.unwritten();
let mut out_buf = output.unwritten_mut();
// Safety: Presumably brotli does not read from this and it's all good
let mut out_buf = unsafe { MaybeUninit::slice_get_mut(output.unfilled_mut()) };

let mut input_len = 0;
let mut output_len = 0;
Expand All @@ -53,7 +56,7 @@ impl BrotliEncoder {
}

input.advance(input_len);
output.advance(output_len);
output.add_filled(output_len);

Ok(())
}
Expand All @@ -63,7 +66,7 @@ impl Encode for BrotliEncoder {
fn encode(
&mut self,
input: &mut PartialBuffer<impl AsRef<[u8]>>,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
) -> Result<()> {
self.encode(
input,
Expand All @@ -74,7 +77,7 @@ impl Encode for BrotliEncoder {

fn flush(
&mut self,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
) -> Result<bool> {
self.encode(
&mut PartialBuffer::new(&[][..]),
Expand All @@ -87,7 +90,7 @@ impl Encode for BrotliEncoder {

fn finish(
&mut self,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
) -> Result<bool> {
self.encode(
&mut PartialBuffer::new(&[][..]),
Expand Down
7 changes: 4 additions & 3 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::util::PartialBuffer;
use std::io::Result;
use futures_io::ReadBuf;

#[cfg(feature = "brotli")]
mod brotli;
Expand Down Expand Up @@ -47,17 +48,17 @@ pub trait Encode {
fn encode(
&mut self,
input: &mut PartialBuffer<impl AsRef<[u8]>>,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
) -> Result<()>;

/// Returns whether the internal buffers are flushed
fn flush(&mut self, output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>)
fn flush(&mut self, output: &mut ReadBuf<'_>)
-> Result<bool>;

/// Returns whether the internal buffers are flushed and the end of the stream is written
fn finish(
&mut self,
output: &mut PartialBuffer<impl AsRef<[u8]> + AsMut<[u8]>>,
output: &mut ReadBuf<'_>,
) -> Result<bool>;
}

Expand Down
28 changes: 19 additions & 9 deletions src/futures/bufread/generic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::Result;

use crate::{codec::Encode, util::PartialBuffer};
use futures_core::ready;
use futures_io::{AsyncBufRead, AsyncRead};
use futures_io::{AsyncBufRead, AsyncRead, ReadBuf};
use pin_project_lite::pin_project;

#[derive(Debug)]
Expand Down Expand Up @@ -54,7 +54,7 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
fn do_poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
output: &mut PartialBuffer<&mut [u8]>,
output: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
let mut this = self.project();

Expand Down Expand Up @@ -87,7 +87,7 @@ impl<R: AsyncBufRead, E: Encode> Encoder<R, E> {
if let State::Done = *this.state {
return Poll::Ready(Ok(()));
}
if output.unwritten().is_empty() {
if output.remaining() == 0 {
return Poll::Ready(Ok(()));
}
}
Expand All @@ -100,14 +100,24 @@ impl<R: AsyncBufRead, E: Encode> AsyncRead for Encoder<R, E> {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
if buf.is_empty() {
return Poll::Ready(Ok(0));
let mut buf = ReadBuf::new(buf);
ready!(self.poll_read_buf(cx, &mut buf))?;
Poll::Ready(Ok(buf.filled().len()))
}

fn poll_read_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
if buf.remaining() == 0 {
return Poll::Ready(Ok(()));
}

let mut output = PartialBuffer::new(buf);
match self.do_poll_read(cx, &mut output)? {
Poll::Pending if output.written().is_empty() => Poll::Pending,
_ => Poll::Ready(Ok(output.written().len())),
let prior = buf.filled().len();
match self.do_poll_read(cx, buf)? {
Poll::Pending if buf.filled().len() == prior => Poll::Pending,
_ => Poll::Ready(Ok(())),
}
}
}
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
missing_debug_implementations
)]

#![feature(maybe_uninit_slice_assume_init)]

#[macro_use]
mod macros;
mod codec;
Expand Down
17 changes: 17 additions & 0 deletions tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub mod prelude {
output
}

#[cfg(feature = "futures-bufread")]
pub fn async_read_to_vec(read: impl AsyncRead) -> Vec<u8> {
// TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510
// All current test cases are < 100kB
Expand All @@ -101,6 +102,7 @@ pub mod prelude {
output
}

#[cfg(feature = "futures-write")]
pub fn async_write_to_vec(
input: &[Vec<u8>],
create_writer: impl for<'a> FnOnce(
Expand All @@ -127,6 +129,7 @@ pub mod prelude {
output
}

#[cfg(feature = "stream")]
pub fn stream_to_vec(stream: impl Stream<Item = io::Result<Bytes>>) -> Vec<u8> {
pin_mut!(stream);
block_on_stream(stream)
Expand All @@ -136,6 +139,7 @@ pub mod prelude {
}
}

#[cfg(feature = "brotli")]
pub mod brotli {
pub mod sync {
use crate::utils::prelude::*;
Expand All @@ -153,6 +157,7 @@ pub mod brotli {
}
}

#[cfg(feature = "stream")]
pub mod stream {
use crate::utils::prelude::*;
pub use async_compression::stream::{BrotliDecoder as Decoder, BrotliEncoder as Encoder};
Expand All @@ -169,6 +174,7 @@ pub mod brotli {
}

pub mod futures {
#[cfg(feature = "futures-bufread")]
pub mod bufread {
use crate::utils::prelude::*;
pub use async_compression::futures::bufread::{
Expand All @@ -186,6 +192,7 @@ pub mod brotli {
}
}

#[cfg(feature = "futures-write")]
pub mod write {
use crate::utils::prelude::*;
pub use async_compression::futures::write::{
Expand All @@ -207,6 +214,7 @@ pub mod brotli {
}
}

#[cfg(feature = "bzip2")]
pub mod bzip2 {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -276,6 +284,7 @@ pub mod bzip2 {
}
}

#[cfg(feature = "deflate")]
pub mod deflate {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -345,6 +354,7 @@ pub mod deflate {
}
}

#[cfg(feature = "zlib")]
pub mod zlib {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -414,6 +424,7 @@ pub mod zlib {
}
}

#[cfg(feature = "gzip")]
pub mod gzip {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -483,6 +494,7 @@ pub mod gzip {
}
}

#[cfg(feature = "zstd")]
pub mod zstd {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -553,6 +565,7 @@ pub mod zstd {
}
}

#[cfg(feature = "xz")]
pub mod xz {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -624,6 +637,7 @@ pub mod xz {
}
}

#[cfg(feature = "lzma")]
pub mod lzma {
pub mod sync {
use crate::utils::prelude::*;
Expand Down Expand Up @@ -706,6 +720,7 @@ pub mod lzma {
macro_rules! test_cases {
($variant:ident) => {
mod $variant {
#[cfg(feature = "stream")]
mod stream {
mod compress {
use crate::utils::{self, prelude::*};
Expand Down Expand Up @@ -940,6 +955,7 @@ macro_rules! test_cases {
}

mod futures {
#[cfg(feature = "futures-bufread")]
mod bufread {
mod compress {
use crate::utils::{self, prelude::*};
Expand Down Expand Up @@ -1145,6 +1161,7 @@ macro_rules! test_cases {
}
}

#[cfg(feature = "futures-write")]
mod write {
mod compress {
use crate::utils::{self, prelude::*};
Expand Down

0 comments on commit 88b7308

Please sign in to comment.