diff --git a/CHANGELOG.md b/CHANGELOG.md index 963b6481..cce7da18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0), ## Unreleased +- Remove deprecated `stream`, `futures-bufread` and `futures-write` features. +- Remove Tokio 0.2.x and 0.3.x support (`tokio-02` and `tokio-03` features). + ## 0.3.15 - 2022-10-08 - `Level::Default::into_zstd()` now returns libzstd's default value `3`. diff --git a/Cargo.lock b/Cargo.lock index 85f9c595..60ab6946 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,9 +28,7 @@ name = "async-compression" version = "0.3.15" dependencies = [ "brotli", - "bytes 0.5.6", - "bytes 0.6.0", - "bytes 1.1.0", + "bytes", "bzip2", "flate2", "futures", @@ -39,17 +37,12 @@ dependencies = [ "futures-test", "memchr", "ntest", - "pin-project-lite 0.2.8", + "pin-project-lite", "proptest", "proptest-derive", "rand", - "tokio 0.2.25", - "tokio 0.3.7", - "tokio 1.24.2", - "tokio-util 0.3.1", - "tokio-util 0.4.0", - "tokio-util 0.5.1", - "tokio-util 0.6.9", + "tokio", + "tokio-util", "xz2", "zstd", "zstd-safe", @@ -109,18 +102,6 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - -[[package]] -name = "bytes" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" - [[package]] name = "bytes" version = "1.1.0" @@ -321,7 +302,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.8", + "pin-project-lite", "pin-utils", "slab", ] @@ -337,6 +318,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "hermit-abi" version = "0.3.1" @@ -358,7 +348,7 @@ version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c66c74d2ae7e79a5a8f7ac924adbe38ee42a859c6539ad869eb51f0b52dc220" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.1", "libc", "windows-sys 0.48.0", ] @@ -476,6 +466,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" +dependencies = [ + "hermit-abi 0.2.6", + "libc", +] + [[package]] name = "pin-project" version = "1.0.10" @@ -496,12 +496,6 @@ dependencies = [ "syn 1.0.86", ] -[[package]] -name = "pin-project-lite" -version = "0.1.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" - [[package]] name = "pin-project-lite" version = "0.2.8" @@ -762,33 +756,6 @@ dependencies = [ "syn 1.0.86", ] -[[package]] -name = "tokio" -version = "0.2.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6703a273949a90131b290be1fe7b039d0fc884aa1935860dfcbe056f28cd8092" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "memchr", - "pin-project-lite 0.1.12", - "slab", - "tokio-macros", -] - -[[package]] -name = "tokio" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46409491c9375a693ce7032101970a54f8a2010efb77e13f70788f0d84489e39" -dependencies = [ - "autocfg", - "bytes 0.6.0", - "futures-core", - "memchr", - "pin-project-lite 0.2.8", -] - [[package]] name = "tokio" version = "1.24.2" @@ -796,77 +763,37 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "597a12a59981d9e3c38d216785b0c37399f6e415e8d0712047620f189371b0bb" dependencies = [ "autocfg", - "bytes 1.1.0", + "bytes", "memchr", - "pin-project-lite 0.2.8", + "num_cpus", + "pin-project-lite", + "tokio-macros", "windows-sys 0.42.0", ] [[package]] name = "tokio-macros" -version = "0.2.6" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2 1.0.36", "quote 1.0.15", "syn 1.0.86", ] -[[package]] -name = "tokio-util" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.1.12", - "tokio 0.2.25", -] - -[[package]] -name = "tokio-util" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24793699f4665ba0416ed287dc794fe6b11a4aa5e4e95b58624f45f6c46b97d4" -dependencies = [ - "bytes 0.5.6", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.1.12", - "tokio 0.3.7", -] - -[[package]] -name = "tokio-util" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3137de2b078e95274b696cc522e87f22c9a753fe3ef3344116ffb94f104f10a3" -dependencies = [ - "bytes 0.6.0", - "futures-core", - "futures-sink", - "log", - "pin-project-lite 0.2.8", - "tokio 0.3.7", -] - [[package]] name = "tokio-util" version = "0.6.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-core", "futures-sink", "log", - "pin-project-lite 0.2.8", - "tokio 1.24.2", + "pin-project-lite", + "tokio", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8e9b0823..c1cf5496 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ rustdoc-args = ["--cfg", "docsrs"] # groups default = [] all = ["all-implementations", "all-algorithms"] -all-implementations = ["futures-io", "stream", "tokio-02", "tokio-03", "tokio"] +all-implementations = ["futures-io", "tokio"] all-algorithms = ["brotli", "bzip2", "deflate", "gzip", "lzma", "xz", "zlib", "zstd"] # algorithms @@ -30,15 +30,9 @@ xz = ["xz2"] zlib = ["flate2"] zstd = ["libzstd", "zstd-safe"] -# deprecated -stream = ["bytes-05"] -futures-bufread = ["futures-io"] -futures-write = ["futures-io"] - [dependencies] xz2 = { version = "0.1.6", optional = true } brotli = { version = "3.3.0", optional = true, default-features = false, features = ["std"] } -bytes-05 = { package = "bytes", version = "0.5.0", optional = true } bzip2 = { version = "0.4.1" , optional = true } flate2 = { version = "1.0.11", optional = true } futures-core = { version = "0.3.0", default-features = false } @@ -47,8 +41,6 @@ pin-project-lite = "0.2.0" libzstd = { package = "zstd", version = "0.11.1", optional = true, default-features = false } zstd-safe = { version = "5.0.1", optional = true, default-features = false } memchr = "2.2.1" -tokio-02 = { package = "tokio", version = "0.2.21", optional = true, default-features = false } -tokio-03 = { package = "tokio", version = "0.3.0", optional = true, default-features = false } tokio = { version = "1.0.0", optional = true, default-features = false } [dev-dependencies] @@ -58,15 +50,8 @@ rand = "0.8.5" futures = "0.3.5" futures-test = "0.3.5" ntest = "0.8.1" -bytes-05 = { package = "bytes", version = "0.5.0" } -bytes-06 = { package = "bytes", version = "0.6.0" } bytes = "1.0.0" -tokio-02 = { package = "tokio", version = "0.2.21", default-features = false, features = ["io-util", "stream", "macros", "io-std"] } -tokio-03 = { package = "tokio", version = "0.3.0", default-features = false, features = ["io-util", "stream"] } -tokio = { version = "1.0.0", default-features = false, features = ["io-util"] } -tokio-util-03 = { package = "tokio-util", version = "0.3.0", default-features = false, features = ["codec"] } -tokio-util-04 = { package = "tokio-util", version = "0.4.0", default-features = false, features = ["io"] } -tokio-util-05 = { package = "tokio-util", version = "0.5.0", default-features = false, features = ["io"] } +tokio = { version = "1.0.0", default-features = false, features = ["io-util", "macros", "rt-multi-thread", "io-std"] } tokio-util-06 = { package = "tokio-util", version = "0.6.0", default-features = false, features = ["io"] } [[test]] @@ -102,9 +87,9 @@ name = "zstd" required-features = ["zstd"] [[example]] -name = "zlib_tokio_02_write" -required-features = ["zlib", "tokio-02"] +name = "zlib_tokio_write" +required-features = ["zlib", "tokio"] [[example]] name = "zstd_gzip" -required-features = ["zstd", "gzip", "tokio-02"] +required-features = ["zstd", "gzip", "tokio"] diff --git a/examples/zlib_tokio_02_write.rs b/examples/zlib_tokio_write.rs similarity index 74% rename from examples/zlib_tokio_02_write.rs rename to examples/zlib_tokio_write.rs index d5fd9461..a707c3d7 100644 --- a/examples/zlib_tokio_02_write.rs +++ b/examples/zlib_tokio_write.rs @@ -1,11 +1,10 @@ -use async_compression::tokio_02::write::ZlibDecoder; -use async_compression::tokio_02::write::ZlibEncoder; +use async_compression::tokio::write::ZlibDecoder; +use async_compression::tokio::write::ZlibEncoder; use std::io::Result; -use tokio_02::io::AsyncWriteExt as _; // for `write_all` and `shutdown` +use tokio::io::AsyncWriteExt as _; // for `write_all` and `shutdown` -use tokio_02 as tokio; // this enable the tokio main macro -#[tokio_02::main] +#[tokio::main] async fn main() -> Result<()> { let data = b"example"; let compressed_data = compress(data).await?; diff --git a/examples/zstd_gzip.rs b/examples/zstd_gzip.rs index 40f9ac15..07c55518 100644 --- a/examples/zstd_gzip.rs +++ b/examples/zstd_gzip.rs @@ -1,21 +1,20 @@ -use async_compression::tokio_02::bufread::ZstdDecoder; -use async_compression::tokio_02::write::GzipEncoder; +use async_compression::tokio::bufread::ZstdDecoder; +use async_compression::tokio::write::GzipEncoder; use std::io::Result; -use tokio_02::io::stderr; -use tokio_02::io::stdin; -use tokio_02::io::stdout; -use tokio_02::io::AsyncReadExt as _; // for `read_to_end` -use tokio_02::io::AsyncWriteExt as _; // for `write_all` and `shutdown` -use tokio_02::io::BufReader; +use tokio::io::stderr; +use tokio::io::stdin; +use tokio::io::stdout; +use tokio::io::AsyncReadExt as _; // for `read_to_end` +use tokio::io::AsyncWriteExt as _; // for `write_all` and `shutdown` +use tokio::io::BufReader; // Run this example by running the following in the terminal: // ``` // echo 'example' | zstd | cargo run --example zstd_gzip --features="all" | gunzip -c ─╯ // ``` -use tokio_02 as tokio; // this enable the tokio main macro -#[tokio_02::main] +#[tokio::main] async fn main() -> Result<()> { // Read zstd encoded data from stdin and decode let mut reader = ZstdDecoder::new(BufReader::new(stdin())); diff --git a/src/lib.rs b/src/lib.rs index 108cb859..2eee17dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,38 +37,6 @@ not(feature = "futures-io"), doc = "`futures-io` (*inactive*) | `futures::io::AsyncBufRead`, `futures::io::AsyncWrite`" )] -#![cfg_attr( - feature = "futures-bufread", - doc = "`futures-bufread` | (*deprecated*, use `futures-io`)" -)] -#![cfg_attr( - feature = "futures-write", - doc = "`futures-write` | (*deprecated*, use `futures-io`)" -)] -#![cfg_attr( - feature = "stream", - doc = "[`stream`] | (*deprecated*, see [`async-compression:stream`](crate::stream) docs for migration)" -)] -#![cfg_attr( - not(feature = "stream"), - doc = "`stream` (*inactive*) | (*deprecated*, see `async-compression::stream` docs for migration)" -)] -#![cfg_attr( - feature = "tokio-02", - doc = "[`tokio-02`](crate::tokio_02) | [`tokio::io::AsyncBufRead`](::tokio_02::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio_02::io::AsyncWrite)" -)] -#![cfg_attr( - not(feature = "tokio-02"), - doc = "`tokio-02` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`" -)] -#![cfg_attr( - feature = "tokio-03", - doc = "[`tokio-03`](crate::tokio_03) | [`tokio::io::AsyncBufRead`](::tokio_03::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio_03::io::AsyncWrite)" -)] -#![cfg_attr( - not(feature = "tokio-03"), - doc = "`tokio-03` (*inactive*) | `tokio::io::AsyncBufRead`, `tokio::io::AsyncWrite`" -)] #![cfg_attr( feature = "tokio", doc = "[`tokio`](crate::tokio) | [`tokio::io::AsyncBufRead`](::tokio::io::AsyncBufRead), [`tokio::io::AsyncWrite`](::tokio::io::AsyncWrite)" @@ -169,18 +137,9 @@ mod codec; #[cfg(feature = "futures-io")] #[cfg_attr(docsrs, doc(cfg(feature = "futures-io")))] pub mod futures; -#[cfg(feature = "stream")] -#[cfg_attr(docsrs, doc(cfg(feature = "stream")))] -pub mod stream; #[cfg(feature = "tokio")] #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod tokio; -#[cfg(feature = "tokio-02")] -#[cfg_attr(docsrs, doc(cfg(feature = "tokio-02")))] -pub mod tokio_02; -#[cfg(feature = "tokio-03")] -#[cfg_attr(docsrs, doc(cfg(feature = "tokio-03")))] -pub mod tokio_03; mod unshared; mod util; diff --git a/src/stream/generic/decoder.rs b/src/stream/generic/decoder.rs deleted file mode 100644 index a56a8b6a..00000000 --- a/src/stream/generic/decoder.rs +++ /dev/null @@ -1,166 +0,0 @@ -use std::{ - io::Result, - pin::Pin, - task::{Context, Poll}, -}; - -use crate::{codec::Decode, util::PartialBuffer}; -use bytes_05::{Buf, Bytes, BytesMut}; -use futures_core::{ready, stream::Stream}; -use pin_project_lite::pin_project; - -const OUTPUT_BUFFER_SIZE: usize = 8_000; - -#[derive(Debug)] -enum State { - Reading, - Writing, - Flushing, - Next, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - stream: S, - decoder: D, - state: State, - input: Bytes, - output: BytesMut, - multiple_members: bool, - } -} - -impl>, D: Decode> Decoder { - pub fn new(stream: S, decoder: D) -> Self { - Self { - stream, - decoder, - state: State::Reading, - input: Bytes::new(), - output: BytesMut::new(), - multiple_members: false, - } - } - - pub fn get_ref(&self) -> &S { - &self.stream - } - - pub fn get_mut(&mut self) -> &mut S { - &mut self.stream - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { - self.project().stream - } - - pub fn into_inner(self) -> S { - self.stream - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } -} - -impl>, D: Decode> Stream for Decoder { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - - let (mut stream, input, state, decoder, multiple_members) = ( - this.stream, - this.input, - this.state, - this.decoder, - *this.multiple_members, - ); - - let mut output = PartialBuffer::new(this.output); - - let result = (|| loop { - let output_capacity = output.written().len() + OUTPUT_BUFFER_SIZE; - output.get_mut().resize(output_capacity, 0); - - *state = match state { - State::Reading => { - if let Some(chunk) = ready!(stream.as_mut().poll_next(cx)) { - *input = chunk?; - State::Writing - } else { - State::Flushing - } - } - - State::Writing => { - if input.is_empty() { - State::Reading - } else { - let mut input = PartialBuffer::new(&mut *input); - - let done = decoder.decode(&mut input, &mut output)?; - - let input_len = input.written().len(); - input.into_inner().advance(input_len); - - if done { - State::Flushing - } else { - State::Writing - } - } - } - - State::Flushing => { - if decoder.finish(&mut output)? { - if multiple_members { - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Next => { - if input.is_empty() { - if let Some(chunk) = ready!(stream.as_mut().poll_next(cx)) { - *input = chunk?; - State::Next - } else { - State::Done - } - } else { - decoder.reinit()?; - State::Writing - } - } - - State::Done => { - return Poll::Ready(None); - } - }; - })(); - - match result { - Poll::Ready(Some(Ok(_))) => unreachable!(), - Poll::Ready(Some(Err(_))) => { - *state = State::Done; - result - } - Poll::Ready(None) | Poll::Pending => { - if output.written().is_empty() { - result - } else { - let output_len = output.written().len(); - Poll::Ready(Some(Ok(output.into_inner().split_to(output_len).freeze()))) - } - } - } - } -} diff --git a/src/stream/generic/encoder.rs b/src/stream/generic/encoder.rs deleted file mode 100644 index 10520f10..00000000 --- a/src/stream/generic/encoder.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::{ - io::Result, - pin::Pin, - task::{Context, Poll}, -}; - -use crate::{codec::Encode, util::PartialBuffer}; -use bytes_05::{Buf, Bytes, BytesMut}; -use futures_core::{ready, stream::Stream}; -use pin_project_lite::pin_project; - -const OUTPUT_BUFFER_SIZE: usize = 8_000; - -#[derive(Debug)] -enum State { - Reading, - Writing, - Flushing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - stream: S, - encoder: E, - state: State, - input: Bytes, - output: BytesMut, - } -} - -impl>, E: Encode> Encoder { - pub(crate) fn new(stream: S, encoder: E) -> Self { - Self { - stream, - encoder, - state: State::Reading, - input: Bytes::new(), - output: BytesMut::new(), - } - } - - pub(crate) fn get_ref(&self) -> &S { - &self.stream - } - - pub(crate) fn get_mut(&mut self) -> &mut S { - &mut self.stream - } - - pub(crate) fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { - self.project().stream - } - - pub(crate) fn into_inner(self) -> S { - self.stream - } -} - -impl>, E: Encode> Stream for Encoder { - type Item = Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { - let this = self.project(); - - let (mut stream, input, state, encoder) = - (this.stream, this.input, this.state, this.encoder); - - let mut output = PartialBuffer::new(this.output); - - let result = (|| loop { - let output_capacity = output.written().len() + OUTPUT_BUFFER_SIZE; - output.get_mut().resize(output_capacity, 0); - - *state = match *state { - State::Reading => { - if let Some(chunk) = ready!(stream.as_mut().poll_next(cx)) { - *input = chunk?; - State::Writing - } else { - State::Flushing - } - } - - State::Writing => { - if input.is_empty() { - State::Reading - } else { - let mut input = PartialBuffer::new(&mut *input); - - encoder.encode(&mut input, &mut output)?; - - let input_len = input.written().len(); - input.into_inner().advance(input_len); - - State::Writing - } - } - - State::Flushing => { - if encoder.finish(&mut output)? { - State::Done - } else { - State::Flushing - } - } - - State::Done => { - return Poll::Ready(None); - } - }; - })(); - - match result { - Poll::Ready(Some(Ok(_))) => unreachable!(), - Poll::Ready(Some(Err(_))) => { - *state = State::Done; - result - } - Poll::Ready(None) | Poll::Pending => { - if output.written().is_empty() { - result - } else { - let output_len = output.written().len(); - Poll::Ready(Some(Ok(output.into_inner().split_to(output_len).freeze()))) - } - } - } - } -} diff --git a/src/stream/generic/mod.rs b/src/stream/generic/mod.rs deleted file mode 100644 index 13665af4..00000000 --- a/src/stream/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub(crate) use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/stream/macros/decoder.rs b/src/stream/macros/decoder.rs deleted file mode 100644 index 6a3c38a1..00000000 --- a/src/stream/macros/decoder.rs +++ /dev/null @@ -1,91 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements a [`Stream`](futures_core::stream::Stream) interface and will read - /// compressed data from an underlying stream and emit a stream of uncompressed data. - pub struct $name { - #[pin] - inner: crate::stream::generic::Decoder, - } - } - - impl>> $name { - /// Creates a new decoder which will read compressed data from the given stream and - /// emit an uncompressed stream. - pub fn new(stream: S) -> Self { - Self { - inner: crate::stream::Decoder::new( - stream, - crate::codec::$name::new(), - ), - } - } - - /// Configure multi-member/frame decoding, if enabled this will reset the decoder state - /// when reaching the end of a compressed member/frame and expect either the end of the - /// wrapped stream or another compressed member/frame to follow. - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } - - /// Acquires a reference to the underlying stream that this decoder is wrapping. - pub fn get_ref(&self) -> &S { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the stream which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut S { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the stream which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut S> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying stream. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> S { - self.inner.into_inner() - } - } - - impl>> - futures_core::stream::Stream for $name - { - type Item = std::io::Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> { - self.project().inner.poll_next(cx) - } - } - - const _: () = { - fn _assert() { - use std::{pin::Pin, io::Result}; - use bytes_05::Bytes; - use futures_core::stream::Stream; - use crate::util::{_assert_send, _assert_sync}; - - _assert_send::<$name> + Send>>>>(); - _assert_sync::<$name> + Sync>>>>(); - } - }; - } -} diff --git a/src/stream/macros/encoder.rs b/src/stream/macros/encoder.rs deleted file mode 100644 index 0f1524e7..00000000 --- a/src/stream/macros/encoder.rs +++ /dev/null @@ -1,80 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements a [`Stream`](futures_core::stream::Stream) interface and will read - /// uncompressed data from an underlying stream and emit a stream of compressed data. - pub struct $name<$inner> { - #[pin] - inner: crate::stream::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: futures_core::stream::Stream>> $name<$inner> { - $( - /// Creates a new encoder which will read uncompressed data from the given stream - /// and emit a compressed stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying stream that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying stream that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the stream which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying stream that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the stream which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying stream. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: futures_core::stream::Stream>> - futures_core::stream::Stream for $name<$inner> - { - type Item = std::io::Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll>> { - self.project().inner.poll_next(cx) - } - } - - const _: () = { - fn _assert() { - use std::{pin::Pin, io::Result}; - use bytes_05::Bytes; - use futures_core::stream::Stream; - use crate::util::{_assert_send, _assert_sync}; - - _assert_send::<$name> + Send>>>>(); - _assert_sync::<$name> + Sync>>>>(); - } - }; - } -} diff --git a/src/stream/macros/mod.rs b/src/stream/macros/mod.rs deleted file mode 100644 index c8aac75e..00000000 --- a/src/stream/macros/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -#[macro_use] -mod encoder; - -#[macro_use] -mod decoder; diff --git a/src/stream/mod.rs b/src/stream/mod.rs deleted file mode 100644 index 7dae314f..00000000 --- a/src/stream/mod.rs +++ /dev/null @@ -1,152 +0,0 @@ -//! Types which operate over [`Stream`](futures_core::stream::Stream)`>` streams, both encoders and -//! decoders for various formats. -//! -//! The `Stream` is treated as a single byte-stream to be compressed/decompressed, each item is a -//! chunk of data from this byte-stream. There is not guaranteed to be a one-to-one relationship -//! between chunks of data from the underlying stream and the resulting compressed/decompressed -//! stream, the encoders and decoders will buffer the incoming data and choose their own boundaries -//! at which to yield a new item. -//! -//! # Deprecation Migration -//! -//! This feature and module was deprecated because it's choosing one point in a large solution -//! space of "stream of byte chunks" to represent an IO data stream, and the conversion between -//! these solutions and standard IO data streams like `futures::io::AsyncBufRead` / -//! `tokio::io::AsyncBufRead` should be zero-cost. -//! -//! ```rust -//! use bytes_05::Bytes; -//! use futures::{stream::Stream, TryStreamExt}; -//! use std::io::Result; -//! -//! /// For code that looks like this, choose one of the options below to replace it -//! fn from( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! #[allow(deprecated)] -//! async_compression::stream::GzipEncoder::new(input) -//! } -//! -//! /// Direct replacement with `tokio` v0.2 and `bytes` v0.5 using `tokio-util` v0.3 -//! fn tokio_02_bytes_05( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_03::codec::FramedRead::new( -//! async_compression::tokio_02::bufread::GzipEncoder::new( -//! tokio_02::io::stream_reader(input), -//! ), -//! tokio_util_03::codec::BytesCodec::new(), -//! ).map_ok(|bytes| bytes.freeze()) -//! } -//! -//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.5 using `tokio-util` v0.4 -//! fn tokio_03_bytes_05( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_04::io::ReaderStream::new( -//! async_compression::tokio_03::bufread::GzipEncoder::new( -//! tokio_util_04::io::StreamReader::new(input), -//! ), -//! ) -//! } -//! -//! /// Upgrade replacement with `tokio` v0.3 and `bytes` v0.6 using `tokio-util` v0.5 -//! fn tokio_03_bytes_06( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_05::io::ReaderStream::new( -//! async_compression::tokio_03::bufread::GzipEncoder::new( -//! tokio_util_05::io::StreamReader::new(input), -//! ), -//! ) -//! } -//! -//! /// Upgrade replacement with `tokio` v1.0 and `bytes` v1.0 using `tokio-util` v0.6 -//! fn tokio_bytes( -//! input: impl Stream>, -//! ) -> impl Stream> { -//! tokio_util_06::io::ReaderStream::new( -//! async_compression::tokio::bufread::GzipEncoder::new( -//! tokio_util_06::io::StreamReader::new(input), -//! ), -//! ) -//! } -//! -//! /// What if you didn't want anything to do with `bytes`, but just a `Vec` instead? -//! fn futures_vec( -//! input: impl Stream>> + Unpin, -//! ) -> impl Stream>> { -//! use futures::io::AsyncReadExt; -//! -//! futures::stream::try_unfold( -//! async_compression::futures::bufread::GzipEncoder::new(input.into_async_read()), -//! |mut encoder| async move { -//! let mut chunk = vec![0; 8 * 1024]; -//! let len = encoder.read(&mut chunk).await?; -//! if len == 0 { -//! Ok(None) -//! } else { -//! chunk.truncate(len); -//! Ok(Some((chunk, encoder))) -//! } -//! }) -//! } -//! # -//! # futures::executor::block_on(async { -//! # let data = || futures::stream::iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]); -//! # let expected: Vec> = from(data().map_ok(bytes_05::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect() -//! # .await?; -//! # -//! # assert_eq!( -//! # expected, -//! # tokio_02_bytes_05(data().map_ok(bytes_05::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, -//! # tokio_03_bytes_05(data().map_ok(bytes_05::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, -//! # tokio_03_bytes_06(data().map_ok(bytes_06::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, -//! # tokio_bytes(data().map_ok(bytes::Bytes::from)) -//! # .map_ok(|bytes| bytes.as_ref().into()) -//! # .try_collect::>>() -//! # .await?, -//! # ); -//! # assert_eq!( -//! # expected, -//! # futures_vec(data()) -//! # .try_collect::>>() -//! # .await? -//! # ); -//! # Ok::<_, std::io::Error>(()) -//! # })?; Ok::<_, std::io::Error>(()) -//! ``` - -#![deprecated( - since = "0.3.8", - note = "See `async-compression::stream` docs for migration" -)] - -#[macro_use] -mod macros; -mod generic; - -pub(crate) use self::generic::{Decoder, Encoder}; - -algos!(stream); diff --git a/src/tokio_02/bufread/generic/decoder.rs b/src/tokio_02/bufread/generic/decoder.rs deleted file mode 100644 index 38dc068d..00000000 --- a/src/tokio_02/bufread/generic/decoder.rs +++ /dev/null @@ -1,141 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Decode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_02::io::{AsyncBufRead, AsyncRead}; - -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - state: State, - multiple_members: bool, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Decoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - if done { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Decoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut output = PartialBuffer::new(buf); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(output.written().len())), - } - } -} diff --git a/src/tokio_02/bufread/generic/encoder.rs b/src/tokio_02/bufread/generic/encoder.rs deleted file mode 100644 index b6f60489..00000000 --- a/src/tokio_02/bufread/generic/encoder.rs +++ /dev/null @@ -1,113 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Encode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_02::io::{AsyncBufRead, AsyncRead}; - -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - reader: R, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Encoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - State::Encoding - } - } - - State::Flushing => { - if this.encoder.finish(output)? { - State::Done - } else { - State::Flushing - } - } - - State::Done => State::Done, - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Encoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut output = PartialBuffer::new(buf); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(output.written().len())), - } - } -} diff --git a/src/tokio_02/bufread/generic/mod.rs b/src/tokio_02/bufread/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_02/bufread/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_02/bufread/macros/decoder.rs b/src/tokio_02/bufread/macros/decoder.rs deleted file mode 100644 index c4285769..00000000 --- a/src/tokio_02/bufread/macros/decoder.rs +++ /dev/null @@ -1,84 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_02::io::AsyncRead) interface and will - /// read compressed data from an underlying stream and emit a stream of uncompressed data. - pub struct $name { - #[pin] - inner: crate::tokio_02::bufread::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will read compressed data from the given stream and - /// emit a uncompressed stream. - pub fn new(read: R) -> $name { - $name { - inner: crate::tokio_02::bufread::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Configure multi-member/frame decoding, if enabled this will reset the decoder state - /// when reaching the end of a compressed member/frame and expect either EOF or another - /// compressed member/frame to follow it in the stream. - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &R { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut R> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> R { - self.inner.into_inner() - } - } - - impl tokio_02::io::AsyncRead for $name { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_02::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_02/bufread/macros/encoder.rs b/src/tokio_02/bufread/macros/encoder.rs deleted file mode 100644 index 1f358aac..00000000 --- a/src/tokio_02/bufread/macros/encoder.rs +++ /dev/null @@ -1,76 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_02::io::AsyncRead) interface and will - /// read uncompressed data from an underlying stream and emit a stream of compressed data. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_02::bufread::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_02::io::AsyncBufRead> $name<$inner> { - $( - /// Creates a new encoder which will read uncompressed data from the given stream - /// and emit a compressed stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying reader that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying reader. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_02::io::AsyncBufRead> tokio_02::io::AsyncRead for $name<$inner> { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut [u8], - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_02::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_02/bufread/macros/mod.rs b/src/tokio_02/bufread/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_02/bufread/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_02/bufread/mod.rs b/src/tokio_02/bufread/mod.rs deleted file mode 100644 index 306d216a..00000000 --- a/src/tokio_02/bufread/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Types which operate over [`AsyncBufRead`](::tokio_02::io::AsyncBufRead) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -pub(crate) use generic::{Decoder, Encoder}; - -algos!(tokio_02::bufread); diff --git a/src/tokio_02/mod.rs b/src/tokio_02/mod.rs deleted file mode 100644 index 9cc2ffa2..00000000 --- a/src/tokio_02/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Implementations for IO traits exported by [`tokio` v0.2](::tokio_02). - -pub mod bufread; -pub mod write; diff --git a/src/tokio_02/write/buf_write.rs b/src/tokio_02/write/buf_write.rs deleted file mode 100644 index 5ca99731..00000000 --- a/src/tokio_02/write/buf_write.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::{ - io, - pin::Pin, - task::{Context, Poll}, -}; - -pub(crate) trait AsyncBufWrite { - /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if - /// it is full. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and - /// arranges for the current task context (`cx`) to receive a notification when the object - /// becomes readable or is closed. - fn poll_partial_flush_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be - /// written out to the underlying IO when possible. - /// - /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to - /// function properly. This function does not perform any I/O, it simply informs this object - /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should - /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't - /// called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. - fn produce(self: Pin<&mut Self>, amt: usize); -} diff --git a/src/tokio_02/write/buf_writer.rs b/src/tokio_02/write/buf_writer.rs deleted file mode 100644 index 775a95c6..00000000 --- a/src/tokio_02/write/buf_writer.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that -// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient -// with those methods. - -use super::AsyncBufWrite; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - cmp::min, - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; -use tokio_02::io::AsyncWrite; - -const DEFAULT_BUF_SIZE: usize = 8192; - -pin_project! { - pub struct BufWriter { - #[pin] - inner: W, - buf: Box<[u8]>, - written: usize, - buffered: usize, - } -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: vec![0; cap].into(), - written: 0, - buffered: 0, - } - } - - fn partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered]) - { - Poll::Pending => { - break; - } - Poll::Ready(Ok(0)) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Poll::Ready(Ok(n)) => *this.written += n, - Poll::Ready(Err(e)) => { - ret = Err(e); - break; - } - } - } - - if *this.written > 0 { - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - - Poll::Ready(ret) - } else if *this.buffered == 0 { - Poll::Ready(ret) - } else { - ret?; - Poll::Pending - } - } - - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match ready!(this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered])) - { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *this.written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - Poll::Ready(ret) - } - - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.as_mut().project(); - if *this.buffered + buf.len() > this.buf.len() { - ready!(self.as_mut().partial_flush_buf(cx))?; - } - - let this = self.as_mut().project(); - if buf.len() >= this.buf.len() { - if *this.buffered == 0 { - this.inner.poll_write(cx, buf) - } else { - // The only way that `partial_flush_buf` would have returned with - // `this.buffered != 0` is if it were Pending, so our waker was already queued - Poll::Pending - } - } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); - this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); - *this.buffered += len; - Poll::Ready(Ok(len)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_shutdown(cx) - } -} - -impl AsyncBufWrite for BufWriter { - fn poll_partial_flush_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().partial_flush_buf(cx))?; - let this = self.project(); - Poll::Ready(Ok(&mut this.buf[*this.buffered..])) - } - - fn produce(self: Pin<&mut Self>, amt: usize) { - *self.project().buffered += amt; - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buffered, self.buf.len()), - ) - .field("written", &self.written) - .finish() - } -} diff --git a/src/tokio_02/write/generic/decoder.rs b/src/tokio_02/write/generic/decoder.rs deleted file mode 100644 index 23a79cf3..00000000 --- a/src/tokio_02/write/generic/decoder.rs +++ /dev/null @@ -1,175 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::{Error, ErrorKind, Result}; - -use crate::{ - codec::Decode, - tokio_02::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_02::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Decoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - writer: BufWriter, - decoder: D, - state: State, - } -} - -impl Decoder { - pub fn new(writer: W, decoder: D) -> Self { - Self { - writer: BufWriter::new(writer), - decoder, - state: State::Decoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Decoding => { - if this.decoder.decode(input, &mut output)? { - State::Finishing - } else { - State::Decoding - } - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => panic!("Write after end of stream"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let (state, done) = match this.state { - State::Decoding => { - let done = this.decoder.flush(&mut output)?; - (State::Decoding, done) - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - (State::Done, false) - } else { - (State::Finishing, false) - } - } - - State::Done => (State::Done, true), - }; - - *this.state = state; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Decoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let State::Decoding = self.as_mut().project().state { - *self.as_mut().project().state = State::Finishing; - } - - ready!(self.as_mut().do_poll_flush(cx))?; - - if let State::Done = self.as_mut().project().state { - ready!(self.as_mut().project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(Error::new( - ErrorKind::Other, - "Attempt to shutdown before finishing input", - ))) - } - } -} diff --git a/src/tokio_02/write/generic/encoder.rs b/src/tokio_02/write/generic/encoder.rs deleted file mode 100644 index a2e227f5..00000000 --- a/src/tokio_02/write/generic/encoder.rs +++ /dev/null @@ -1,163 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{ - codec::Encode, - tokio_02::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_02::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Encoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - writer: BufWriter, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding => { - this.encoder.encode(input, &mut output)?; - State::Encoding - } - - State::Finishing | State::Done => panic!("Write after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let done = match this.state { - State::Encoding => this.encoder.flush(&mut output)?, - - State::Finishing | State::Done => panic!("Flush after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding | State::Finishing => { - if this.encoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Encoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_shutdown(cx))?; - ready!(self.project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } -} diff --git a/src/tokio_02/write/generic/mod.rs b/src/tokio_02/write/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_02/write/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_02/write/macros/decoder.rs b/src/tokio_02/write/macros/decoder.rs deleted file mode 100644 index 6c85ffa7..00000000 --- a/src/tokio_02/write/macros/decoder.rs +++ /dev/null @@ -1,91 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_02::io::AsyncWrite) interface and will - /// take in compressed data and write it uncompressed to an underlying stream. - pub struct $name { - #[pin] - inner: crate::tokio_02::write::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will take in compressed data and write it uncompressedd - /// to the given stream. - pub fn new(read: W) -> $name { - $name { - inner: crate::tokio_02::write::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &W { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut W { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut W> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> W { - self.inner.into_inner() - } - } - - impl tokio_02::io::AsyncWrite for $name { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_02::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_02/write/macros/encoder.rs b/src/tokio_02/write/macros/encoder.rs deleted file mode 100644 index 5237331e..00000000 --- a/src/tokio_02/write/macros/encoder.rs +++ /dev/null @@ -1,90 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_02::io::AsyncWrite) interface and will - /// take in uncompressed data and write it compressed to an underlying stream. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_02::write::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_02::io::AsyncWrite> $name<$inner> { - $( - /// Creates a new encoder which will take in uncompressed data and write it - /// compressed to the given stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying writer that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying writer. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_02::io::AsyncWrite> tokio_02::io::AsyncWrite for $name<$inner> { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_02::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_02/write/macros/mod.rs b/src/tokio_02/write/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_02/write/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_02/write/mod.rs b/src/tokio_02/write/mod.rs deleted file mode 100644 index 541b4ba5..00000000 --- a/src/tokio_02/write/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Types which operate over [`AsyncWrite`](tokio_02::io::AsyncWrite) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -mod buf_write; -mod buf_writer; - -use self::{ - buf_write::AsyncBufWrite, - buf_writer::BufWriter, - generic::{Decoder, Encoder}, -}; - -algos!(tokio_02::write); diff --git a/src/tokio_03/bufread/generic/decoder.rs b/src/tokio_03/bufread/generic/decoder.rs deleted file mode 100644 index 796218c9..00000000 --- a/src/tokio_03/bufread/generic/decoder.rs +++ /dev/null @@ -1,145 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Decode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf}; - -#[derive(Debug)] -enum State { - Decoding, - Flushing, - Done, - Next, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - reader: R, - decoder: D, - state: State, - multiple_members: bool, - } -} - -impl Decoder { - pub fn new(reader: R, decoder: D) -> Self { - Self { - reader, - decoder, - state: State::Decoding, - multiple_members: false, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - pub fn multiple_members(&mut self, enabled: bool) { - self.multiple_members = enabled; - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Decoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - // Avoid attempting to reinitialise the decoder if the reader - // has returned EOF. - *this.multiple_members = false; - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - let done = this.decoder.decode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - if done { - State::Flushing - } else { - State::Decoding - } - } - } - - State::Flushing => { - if this.decoder.finish(output)? { - if *this.multiple_members { - this.decoder.reinit()?; - State::Next - } else { - State::Done - } - } else { - State::Flushing - } - } - - State::Done => State::Done, - - State::Next => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Done - } else { - State::Decoding - } - } - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Decoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = PartialBuffer::new(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written().len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } - } -} diff --git a/src/tokio_03/bufread/generic/encoder.rs b/src/tokio_03/bufread/generic/encoder.rs deleted file mode 100644 index 0c47ce2b..00000000 --- a/src/tokio_03/bufread/generic/encoder.rs +++ /dev/null @@ -1,117 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{codec::Encode, util::PartialBuffer}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::{AsyncBufRead, AsyncRead, ReadBuf}; - -#[derive(Debug)] -enum State { - Encoding, - Flushing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - reader: R, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(reader: R, encoder: E) -> Self { - Self { - reader, - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &R { - &self.reader - } - - pub fn get_mut(&mut self) -> &mut R { - &mut self.reader - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> { - self.project().reader - } - - pub fn into_inner(self) -> R { - self.reader - } - - fn do_poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - output: &mut PartialBuffer<&mut [u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - *this.state = match this.state { - State::Encoding => { - let input = ready!(this.reader.as_mut().poll_fill_buf(cx))?; - if input.is_empty() { - State::Flushing - } else { - let mut input = PartialBuffer::new(input); - this.encoder.encode(&mut input, output)?; - let len = input.written().len(); - this.reader.as_mut().consume(len); - State::Encoding - } - } - - State::Flushing => { - if this.encoder.finish(output)? { - State::Done - } else { - State::Flushing - } - } - - State::Done => State::Done, - }; - - if let State::Done = *this.state { - return Poll::Ready(Ok(())); - } - if output.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncRead for Encoder { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - if buf.remaining() == 0 { - return Poll::Ready(Ok(())); - } - - let mut output = PartialBuffer::new(buf.initialize_unfilled()); - match self.do_poll_read(cx, &mut output)? { - Poll::Pending if output.written().is_empty() => Poll::Pending, - _ => { - let len = output.written().len(); - buf.advance(len); - Poll::Ready(Ok(())) - } - } - } -} diff --git a/src/tokio_03/bufread/generic/mod.rs b/src/tokio_03/bufread/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_03/bufread/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_03/bufread/macros/decoder.rs b/src/tokio_03/bufread/macros/decoder.rs deleted file mode 100644 index 55affc12..00000000 --- a/src/tokio_03/bufread/macros/decoder.rs +++ /dev/null @@ -1,84 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_03::io::AsyncRead) interface and will - /// read compressed data from an underlying stream and emit a stream of uncompressed data. - pub struct $name { - #[pin] - inner: crate::tokio_03::bufread::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will read compressed data from the given stream and - /// emit a uncompressed stream. - pub fn new(read: R) -> $name { - $name { - inner: crate::tokio_03::bufread::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Configure multi-member/frame decoding, if enabled this will reset the decoder state - /// when reaching the end of a compressed member/frame and expect either EOF or another - /// compressed member/frame to follow it in the stream. - pub fn multiple_members(&mut self, enabled: bool) { - self.inner.multiple_members(enabled); - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &R { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut R { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut R> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> R { - self.inner.into_inner() - } - } - - impl tokio_03::io::AsyncRead for $name { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio_03::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/bufread/macros/encoder.rs b/src/tokio_03/bufread/macros/encoder.rs deleted file mode 100644 index 3204d15d..00000000 --- a/src/tokio_03/bufread/macros/encoder.rs +++ /dev/null @@ -1,76 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncRead`](tokio_03::io::AsyncRead) interface and will - /// read uncompressed data from an underlying stream and emit a stream of compressed data. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_03::bufread::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_03::io::AsyncBufRead> $name<$inner> { - $( - /// Creates a new encoder which will read uncompressed data from the given stream - /// and emit a compressed stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying reader that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying reader. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_03::io::AsyncBufRead> tokio_03::io::AsyncRead for $name<$inner> { - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio_03::io::ReadBuf<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_read(cx, buf) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncBufRead; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/bufread/macros/mod.rs b/src/tokio_03/bufread/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_03/bufread/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_03/bufread/mod.rs b/src/tokio_03/bufread/mod.rs deleted file mode 100644 index 9dc41344..00000000 --- a/src/tokio_03/bufread/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Types which operate over [`AsyncBufRead`](::tokio_03::io::AsyncBufRead) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -pub(crate) use generic::{Decoder, Encoder}; - -algos!(tokio_03::bufread); diff --git a/src/tokio_03/mod.rs b/src/tokio_03/mod.rs deleted file mode 100644 index 45f3e0d9..00000000 --- a/src/tokio_03/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -//! Implementations for IO traits exported by [`tokio` v0.3](::tokio_03). - -pub mod bufread; -pub mod write; diff --git a/src/tokio_03/write/buf_write.rs b/src/tokio_03/write/buf_write.rs deleted file mode 100644 index 5ca99731..00000000 --- a/src/tokio_03/write/buf_write.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::{ - io, - pin::Pin, - task::{Context, Poll}, -}; - -pub(crate) trait AsyncBufWrite { - /// Attempt to return an internal buffer to write to, flushing data out to the inner reader if - /// it is full. - /// - /// On success, returns `Poll::Ready(Ok(buf))`. - /// - /// If the buffer is full and cannot be flushed, the method returns `Poll::Pending` and - /// arranges for the current task context (`cx`) to receive a notification when the object - /// becomes readable or is closed. - fn poll_partial_flush_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>; - - /// Tells this buffer that `amt` bytes have been written to its buffer, so they should be - /// written out to the underlying IO when possible. - /// - /// This function is a lower-level call. It needs to be paired with the `poll_flush_buf` method to - /// function properly. This function does not perform any I/O, it simply informs this object - /// that some amount of its buffer, returned from `poll_flush_buf`, has been written to and should - /// be sent. As such, this function may do odd things if `poll_flush_buf` isn't - /// called before calling it. - /// - /// The `amt` must be `<=` the number of bytes in the buffer returned by `poll_flush_buf`. - fn produce(self: Pin<&mut Self>, amt: usize); -} diff --git a/src/tokio_03/write/buf_writer.rs b/src/tokio_03/write/buf_writer.rs deleted file mode 100644 index f10f7074..00000000 --- a/src/tokio_03/write/buf_writer.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Originally sourced from `futures_util::io::buf_writer`, needs to be redefined locally so that -// the `AsyncBufWrite` impl can access its internals, and changed a bit to make it more efficient -// with those methods. - -use super::AsyncBufWrite; -use futures_core::ready; -use pin_project_lite::pin_project; -use std::{ - cmp::min, - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; -use tokio_03::io::AsyncWrite; - -const DEFAULT_BUF_SIZE: usize = 8192; - -pin_project! { - pub struct BufWriter { - #[pin] - inner: W, - buf: Box<[u8]>, - written: usize, - buffered: usize, - } -} - -impl BufWriter { - /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: W) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) - } - - /// Creates a new `BufWriter` with the specified buffer capacity. - pub fn with_capacity(cap: usize, inner: W) -> Self { - Self { - inner, - buf: vec![0; cap].into(), - written: 0, - buffered: 0, - } - } - - fn partial_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered]) - { - Poll::Pending => { - break; - } - Poll::Ready(Ok(0)) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Poll::Ready(Ok(n)) => *this.written += n, - Poll::Ready(Err(e)) => { - ret = Err(e); - break; - } - } - } - - if *this.written > 0 { - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - - Poll::Ready(ret) - } else if *this.buffered == 0 { - Poll::Ready(ret) - } else { - ret?; - Poll::Pending - } - } - - fn flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - let mut ret = Ok(()); - while *this.written < *this.buffered { - match ready!(this - .inner - .as_mut() - .poll_write(cx, &this.buf[*this.written..*this.buffered])) - { - Ok(0) => { - ret = Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write the buffered data", - )); - break; - } - Ok(n) => *this.written += n, - Err(e) => { - ret = Err(e); - break; - } - } - } - this.buf.copy_within(*this.written..*this.buffered, 0); - *this.buffered -= *this.written; - *this.written = 0; - Poll::Ready(ret) - } - - /// Gets a reference to the underlying writer. - pub fn get_ref(&self) -> &W { - &self.inner - } - - /// Gets a mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_mut(&mut self) -> &mut W { - &mut self.inner - } - - /// Gets a pinned mutable reference to the underlying writer. - /// - /// It is inadvisable to directly write to the underlying writer. - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().inner - } - - /// Consumes this `BufWriter`, returning the underlying writer. - /// - /// Note that any leftover data in the internal buffer is lost. - pub fn into_inner(self) -> W { - self.inner - } -} - -impl AsyncWrite for BufWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.as_mut().project(); - if *this.buffered + buf.len() > this.buf.len() { - ready!(self.as_mut().partial_flush_buf(cx))?; - } - - let this = self.as_mut().project(); - if buf.len() >= this.buf.len() { - if *this.buffered == 0 { - this.inner.poll_write(cx, buf) - } else { - // The only way that `partial_flush_buf` would have returned with - // `this.buffered != 0` is if it were Pending, so our waker was already queued - Poll::Pending - } - } else { - let len = min(this.buf.len() - *this.buffered, buf.len()); - this.buf[*this.buffered..*this.buffered + len].copy_from_slice(&buf[..len]); - *this.buffered += len; - Poll::Ready(Ok(len)) - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().flush_buf(cx))?; - self.project().inner.poll_shutdown(cx) - } -} - -impl AsyncBufWrite for BufWriter { - fn poll_partial_flush_buf( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - ready!(self.as_mut().partial_flush_buf(cx))?; - let this = self.project(); - Poll::Ready(Ok(&mut this.buf[*this.buffered..])) - } - - fn produce(self: Pin<&mut Self>, amt: usize) { - *self.project().buffered += amt; - } -} - -impl fmt::Debug for BufWriter { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufWriter") - .field("writer", &self.inner) - .field( - "buffer", - &format_args!("{}/{}", self.buffered, self.buf.len()), - ) - .field("written", &self.written) - .finish() - } -} diff --git a/src/tokio_03/write/generic/decoder.rs b/src/tokio_03/write/generic/decoder.rs deleted file mode 100644 index 944b35b8..00000000 --- a/src/tokio_03/write/generic/decoder.rs +++ /dev/null @@ -1,175 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::{Error, ErrorKind, Result}; - -use crate::{ - codec::Decode, - tokio_03::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Decoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Decoder { - #[pin] - writer: BufWriter, - decoder: D, - state: State, - } -} - -impl Decoder { - pub fn new(writer: W, decoder: D) -> Self { - Self { - writer: BufWriter::new(writer), - decoder, - state: State::Decoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Decoding => { - if this.decoder.decode(input, &mut output)? { - State::Finishing - } else { - State::Decoding - } - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => panic!("Write after end of stream"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let (state, done) = match this.state { - State::Decoding => { - let done = this.decoder.flush(&mut output)?; - (State::Decoding, done) - } - - State::Finishing => { - if this.decoder.finish(&mut output)? { - (State::Done, false) - } else { - (State::Finishing, false) - } - } - - State::Done => (State::Done, true), - }; - - *this.state = state; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Decoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let State::Decoding = self.as_mut().project().state { - *self.as_mut().project().state = State::Finishing; - } - - ready!(self.as_mut().do_poll_flush(cx))?; - - if let State::Done = self.as_mut().project().state { - ready!(self.as_mut().project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } else { - Poll::Ready(Err(Error::new( - ErrorKind::Other, - "Attempt to shutdown before finishing input", - ))) - } - } -} diff --git a/src/tokio_03/write/generic/encoder.rs b/src/tokio_03/write/generic/encoder.rs deleted file mode 100644 index 2121ce1d..00000000 --- a/src/tokio_03/write/generic/encoder.rs +++ /dev/null @@ -1,163 +0,0 @@ -use core::{ - pin::Pin, - task::{Context, Poll}, -}; -use std::io::Result; - -use crate::{ - codec::Encode, - tokio_03::write::{AsyncBufWrite, BufWriter}, - util::PartialBuffer, -}; -use futures_core::ready; -use pin_project_lite::pin_project; -use tokio_03::io::AsyncWrite; - -#[derive(Debug)] -enum State { - Encoding, - Finishing, - Done, -} - -pin_project! { - #[derive(Debug)] - pub struct Encoder { - #[pin] - writer: BufWriter, - encoder: E, - state: State, - } -} - -impl Encoder { - pub fn new(writer: W, encoder: E) -> Self { - Self { - writer: BufWriter::new(writer), - encoder, - state: State::Encoding, - } - } - - pub fn get_ref(&self) -> &W { - self.writer.get_ref() - } - - pub fn get_mut(&mut self) -> &mut W { - self.writer.get_mut() - } - - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.project().writer.get_pin_mut() - } - - pub fn into_inner(self) -> W { - self.writer.into_inner() - } - - fn do_poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - input: &mut PartialBuffer<&[u8]>, - ) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding => { - this.encoder.encode(input, &mut output)?; - State::Encoding - } - - State::Finishing | State::Done => panic!("Write after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if input.unwritten().is_empty() { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - let done = match this.state { - State::Encoding => this.encoder.flush(&mut output)?, - - State::Finishing | State::Done => panic!("Flush after shutdown"), - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if done { - return Poll::Ready(Ok(())); - } - } - } - - fn do_poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - loop { - let output = ready!(this.writer.as_mut().poll_partial_flush_buf(cx))?; - let mut output = PartialBuffer::new(output); - - *this.state = match this.state { - State::Encoding | State::Finishing => { - if this.encoder.finish(&mut output)? { - State::Done - } else { - State::Finishing - } - } - - State::Done => State::Done, - }; - - let produced = output.written().len(); - this.writer.as_mut().produce(produced); - - if let State::Done = this.state { - return Poll::Ready(Ok(())); - } - } - } -} - -impl AsyncWrite for Encoder { - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - if buf.is_empty() { - return Poll::Ready(Ok(0)); - } - - let mut input = PartialBuffer::new(buf); - - match self.do_poll_write(cx, &mut input)? { - Poll::Pending if input.written().is_empty() => Poll::Pending, - _ => Poll::Ready(Ok(input.written().len())), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_flush(cx))?; - ready!(self.project().writer.as_mut().poll_flush(cx))?; - Poll::Ready(Ok(())) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.as_mut().do_poll_shutdown(cx))?; - ready!(self.project().writer.as_mut().poll_shutdown(cx))?; - Poll::Ready(Ok(())) - } -} diff --git a/src/tokio_03/write/generic/mod.rs b/src/tokio_03/write/generic/mod.rs deleted file mode 100644 index dbe1e3e2..00000000 --- a/src/tokio_03/write/generic/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod decoder; -mod encoder; - -pub use self::{decoder::Decoder, encoder::Encoder}; diff --git a/src/tokio_03/write/macros/decoder.rs b/src/tokio_03/write/macros/decoder.rs deleted file mode 100644 index c573fe2e..00000000 --- a/src/tokio_03/write/macros/decoder.rs +++ /dev/null @@ -1,91 +0,0 @@ -macro_rules! decoder { - ($(#[$attr:meta])* $name:ident) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_03::io::AsyncWrite) interface and will - /// take in compressed data and write it uncompressed to an underlying stream. - pub struct $name { - #[pin] - inner: crate::tokio_03::write::Decoder, - } - } - - impl $name { - /// Creates a new decoder which will take in compressed data and write it uncompressedd - /// to the given stream. - pub fn new(read: W) -> $name { - $name { - inner: crate::tokio_03::write::Decoder::new(read, crate::codec::$name::new()), - } - } - - /// Acquires a reference to the underlying reader that this decoder is wrapping. - pub fn get_ref(&self) -> &W { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_mut(&mut self) -> &mut W { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying reader that this decoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the reader which - /// may otherwise confuse this decoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut W> { - self.project().inner.get_pin_mut() - } - - /// Consumes this decoder returning the underlying reader. - /// - /// Note that this may discard internal state of this decoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> W { - self.inner.into_inner() - } - } - - impl tokio_03::io::AsyncWrite for $name { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/write/macros/encoder.rs b/src/tokio_03/write/macros/encoder.rs deleted file mode 100644 index 449813c6..00000000 --- a/src/tokio_03/write/macros/encoder.rs +++ /dev/null @@ -1,90 +0,0 @@ -macro_rules! encoder { - ($(#[$attr:meta])* $name:ident<$inner:ident> $({ $($constructor:tt)* })*) => { - pin_project_lite::pin_project! { - $(#[$attr])* - #[derive(Debug)] - /// - /// This structure implements an [`AsyncWrite`](tokio_03::io::AsyncWrite) interface and will - /// take in uncompressed data and write it compressed to an underlying stream. - pub struct $name<$inner> { - #[pin] - inner: crate::tokio_03::write::Encoder<$inner, crate::codec::$name>, - } - } - - impl<$inner: tokio_03::io::AsyncWrite> $name<$inner> { - $( - /// Creates a new encoder which will take in uncompressed data and write it - /// compressed to the given stream. - /// - $($constructor)* - )* - - /// Acquires a reference to the underlying writer that this encoder is wrapping. - pub fn get_ref(&self) -> &$inner { - self.inner.get_ref() - } - - /// Acquires a mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_mut(&mut self) -> &mut $inner { - self.inner.get_mut() - } - - /// Acquires a pinned mutable reference to the underlying writer that this encoder is - /// wrapping. - /// - /// Note that care must be taken to avoid tampering with the state of the writer which - /// may otherwise confuse this encoder. - pub fn get_pin_mut(self: std::pin::Pin<&mut Self>) -> std::pin::Pin<&mut $inner> { - self.project().inner.get_pin_mut() - } - - /// Consumes this encoder returning the underlying writer. - /// - /// Note that this may discard internal state of this encoder, so care should be taken - /// to avoid losing resources when this is called. - pub fn into_inner(self) -> $inner { - self.inner.into_inner() - } - } - - impl<$inner: tokio_03::io::AsyncWrite> tokio_03::io::AsyncWrite for $name<$inner> { - fn poll_write( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - self.project().inner.poll_write(cx, buf) - } - - fn poll_flush( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_flush(cx) - } - - fn poll_shutdown( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.project().inner.poll_shutdown(cx) - } - } - - const _: () = { - fn _assert() { - use crate::util::{_assert_send, _assert_sync}; - use core::pin::Pin; - use tokio_03::io::AsyncWrite; - - _assert_send::<$name>>>(); - _assert_sync::<$name>>>(); - } - }; - } -} diff --git a/src/tokio_03/write/macros/mod.rs b/src/tokio_03/write/macros/mod.rs deleted file mode 100644 index 31e1010b..00000000 --- a/src/tokio_03/write/macros/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -#[macro_use] -mod decoder; -#[macro_use] -mod encoder; diff --git a/src/tokio_03/write/mod.rs b/src/tokio_03/write/mod.rs deleted file mode 100644 index 5387da70..00000000 --- a/src/tokio_03/write/mod.rs +++ /dev/null @@ -1,17 +0,0 @@ -//! Types which operate over [`AsyncWrite`](tokio_03::io::AsyncWrite) streams, both encoders and -//! decoders for various formats. - -#[macro_use] -mod macros; -mod generic; - -mod buf_write; -mod buf_writer; - -use self::{ - buf_write::AsyncBufWrite, - buf_writer::BufWriter, - generic::{Decoder, Encoder}, -}; - -algos!(tokio_03::write); diff --git a/tests/gzip.rs b/tests/gzip.rs index b17c7b34..d9cd4eff 100644 --- a/tests/gzip.rs +++ b/tests/gzip.rs @@ -6,99 +6,9 @@ test_cases!(gzip); #[allow(unused)] use utils::{algos::gzip::sync, InputStream}; -#[cfg(feature = "stream")] -use utils::algos::gzip::stream; - #[cfg(feature = "futures-io")] use utils::algos::gzip::futures::bufread; -/// Splits the input bytes into the first 10 bytes, the rest and the last 8 bytes, taking apart the -/// 3 parts of compressed gzip data. -#[allow(unused)] -fn split(mut input: Vec) -> (Vec, Vec, Vec) { - assert!(input.len() >= 18); - - let mut body = input.split_off(10); - let header = input; - let footer = body.split_off(body.len() - 8); - - (header, body, footer) -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_decompress_single_chunk() { - let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); - - // The entirety in one chunk - let input = InputStream::new(vec![compressed]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_decompress_segmented() { - let (header, body, footer) = split(sync::compress(&[1, 2, 3, 4, 5, 6])); - - // Header, body and footer in separate chunks, similar to how `GzipStream` outputs it. - let input = InputStream::new(vec![header, body, footer]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_decompress_split() { - let (header, body, footer) = split(sync::compress(&[1, 2, 3, 4, 5, 6])); - - // Header, body and footer each split across multiple chunks, no mixing - let input = InputStream::from(vec![ - &header[0..5], - &header[5..10], - &body[0..body.len() / 2], - &body[body.len() / 2..], - &footer[0..4], - &footer[4..8], - ]); - - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_decompress_split_mixed() { - let (header, body, footer) = split(sync::compress(&[1, 2, 3, 4, 5, 6])); - - // Header, body and footer split across multiple chunks and mixed together - let input = InputStream::new(vec![ - header[0..5].into(), - header[5..10] - .iter() - .chain(&body[0..body.len() / 2]) - .cloned() - .collect(), - body[body.len() / 2..] - .iter() - .chain(&footer[0..4]) - .cloned() - .collect(), - footer[4..8].into(), - ]); - - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - #[allow(unused)] fn compress_with_header(data: &[u8]) -> Vec { use flate2::{Compression, GzBuilder}; @@ -118,30 +28,6 @@ fn compress_with_header(data: &[u8]) -> Vec { bytes } -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_decompress_with_extra_header() { - let bytes = compress_with_header(&[1, 2, 3, 4, 5, 6]); - - let input = InputStream::new(vec![bytes]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn gzip_stream_chunks_decompress_with_extra_header() { - let bytes = compress_with_header(&[1, 2, 3, 4, 5, 6]); - - let input = InputStream::from(bytes.chunks(2)); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6][..]); -} - #[test] #[ntest::timeout(1000)] #[cfg(feature = "futures-io")] diff --git a/tests/proptest.rs b/tests/proptest.rs index 1a0e23ee..0e5ad998 100644 --- a/tests/proptest.rs +++ b/tests/proptest.rs @@ -106,62 +106,13 @@ macro_rules! io_tests { macro_rules! tests { ($variant:ident) => { mod $variant { - #[cfg(feature = "stream")] - #[allow(deprecated)] - mod stream { - use crate::utils::{algos::$variant::{stream, sync}, InputStream}; - use proptest::{prelude::{any, ProptestConfig}, proptest}; - use std::iter::FromIterator; - - proptest! { - #[test] - fn compress(ref input in any::()) { - let compressed = stream::compress(input.bytes_05_stream()); - let output = sync::decompress(&compressed); - assert_eq!(output, input.bytes()); - } - - #[test] - fn decompress( - ref input in any::>(), - chunk_size in 1..20usize, - ) { - let compressed = sync::compress(input); - let stream = InputStream::from(Vec::from_iter(compressed.chunks(chunk_size).map(Vec::from))); - let output = stream::decompress(stream.bytes_05_stream()); - assert_eq!(&output, input); - } - } - - proptest! { - #![proptest_config(ProptestConfig::with_cases(32))] - - #[test] - fn compress_with_level( - ref input in any::(), - level in crate::any_level(), - ) { - let encoder = stream::Encoder::with_quality(input.bytes_05_stream(), level); - let compressed = stream::to_vec(encoder); - let output = sync::decompress(&compressed); - assert_eq!(output, input.bytes()); - } - } - } - #[cfg(feature = "futures-io")] io_tests!(futures, $variant); - #[cfg(feature = "tokio-02")] - io_tests!(tokio_02, $variant); - - #[cfg(feature = "tokio-03")] - io_tests!(tokio_03, $variant); - #[cfg(feature = "tokio")] io_tests!(tokio, $variant); } - } + }; } mod proptest { diff --git a/tests/utils/algos.rs b/tests/utils/algos.rs index 8cdfeb66..f0afc93f 100644 --- a/tests/utils/algos.rs +++ b/tests/utils/algos.rs @@ -55,35 +55,9 @@ macro_rules! algos { pub mod $name { pub mod sync { $($tt)* } - #[cfg(feature = "stream")] - #[allow(deprecated)] - pub mod stream { - pub use async_compression::stream::{$decoder as Decoder, $encoder as Encoder}; - pub use crate::utils::impls::stream::to_vec; - use bytes_05::Bytes; - - use crate::utils::{Level, pin_mut, Stream, Result}; - - pub fn compress(input: impl Stream>) -> Vec { - pin_mut!(input); - to_vec(Encoder::with_quality(input, Level::Fastest)) - } - - pub fn decompress(input: impl Stream>) -> Vec { - pin_mut!(input); - to_vec(Decoder::new(input)) - } - } - #[cfg(feature = "futures-io")] io_algo!(futures, $name($encoder, $decoder)); - #[cfg(feature = "tokio-02")] - io_algo!(tokio_02, $name($encoder, $decoder)); - - #[cfg(feature = "tokio-03")] - io_algo!(tokio_03, $name($encoder, $decoder)); - #[cfg(feature = "tokio")] io_algo!(tokio, $name($encoder, $decoder)); } diff --git a/tests/utils/impls.rs b/tests/utils/impls.rs index 18cea2eb..31a3da5a 100644 --- a/tests/utils/impls.rs +++ b/tests/utils/impls.rs @@ -78,164 +78,6 @@ pub mod futures { } } -#[cfg(feature = "stream")] -#[allow(deprecated)] -pub mod stream { - use crate::utils::{block_on, pin_mut, Result}; - use bytes_05::Bytes; - use futures::stream::{Stream, TryStreamExt as _}; - - pub fn to_vec(stream: impl Stream>) -> Vec { - pin_mut!(stream); - block_on(stream.try_collect::>()) - .unwrap() - .into_iter() - .flatten() - .collect() - } -} - -#[cfg(feature = "tokio-02")] -pub mod tokio_02 { - pub mod bufread { - pub use tokio_02::io::AsyncBufRead; - - use crate::utils::{InputStream, TrackEof}; - use tokio_02::io::stream_reader; - - pub fn from(input: &InputStream) -> impl AsyncBufRead { - // By using the stream here we ensure that each chunk will require a separate - // read/poll_fill_buf call to process to help test reading multiple chunks. - TrackEof::new(stream_reader(input.bytes_05_stream())) - } - } - - pub mod read { - use crate::utils::{block_on, pin_mut, tokio_02_ext::copy_buf}; - use std::io::Cursor; - use tokio_02::io::{AsyncRead, AsyncReadExt, BufReader}; - - pub fn to_vec(read: impl AsyncRead) -> Vec { - let mut output = Cursor::new(vec![0; 102_400]); - pin_mut!(read); - let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); - let mut output = output.into_inner(); - output.truncate(len as usize); - output - } - - pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result { - pin_mut!(reader); - block_on(reader.read(output)) - } - } - - pub mod write { - use crate::utils::{ - block_on, tokio_02_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, - }; - use std::io::Cursor; - use tokio_02::io::{AsyncWrite, AsyncWriteExt as _}; - - pub fn to_vec( - input: &[Vec], - create_writer: impl for<'a> FnOnce( - &'a mut (dyn AsyncWrite + Unpin), - ) -> Pin>, - limit: usize, - ) -> Vec { - let mut output = Cursor::new(Vec::new()); - { - let mut test_writer = TrackClosed::new( - (&mut output) - .limited_write(limit) - .interleave_pending_write(), - ); - { - let mut writer = create_writer(&mut test_writer); - for chunk in input { - block_on(writer.write_all(chunk)).unwrap(); - block_on(writer.flush()).unwrap(); - } - block_on(writer.shutdown()).unwrap(); - } - assert!(test_writer.is_closed()); - } - output.into_inner() - } - } -} - -#[cfg(feature = "tokio-03")] -pub mod tokio_03 { - pub mod bufread { - use crate::utils::{InputStream, TrackEof}; - pub use tokio_03::io::AsyncBufRead; - use tokio_util_04::io::StreamReader; - - pub fn from(input: &InputStream) -> impl AsyncBufRead { - // By using the stream here we ensure that each chunk will require a separate - // read/poll_fill_buf call to process to help test reading multiple chunks. - TrackEof::new(StreamReader::new(input.bytes_05_stream())) - } - } - - pub mod read { - use crate::utils::{block_on, pin_mut, tokio_03_ext::copy_buf}; - use std::io::Cursor; - use tokio_03::io::{AsyncRead, AsyncReadExt, BufReader}; - - pub fn to_vec(read: impl AsyncRead) -> Vec { - let mut output = Cursor::new(vec![0; 102_400]); - pin_mut!(read); - let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); - let mut output = output.into_inner(); - output.truncate(len as usize); - output - } - - pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result { - pin_mut!(reader); - block_on(reader.read(output)) - } - } - - pub mod write { - use crate::utils::{ - block_on, tokio_03_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, - }; - use std::io::Cursor; - use tokio_03::io::{AsyncWrite, AsyncWriteExt as _}; - - pub fn to_vec( - input: &[Vec], - create_writer: impl for<'a> FnOnce( - &'a mut (dyn AsyncWrite + Unpin), - ) -> Pin>, - limit: usize, - ) -> Vec { - let mut output = Cursor::new(Vec::new()); - { - let mut test_writer = TrackClosed::new( - (&mut output) - .limited_write(limit) - .interleave_pending_write(), - ); - { - let mut writer = create_writer(&mut test_writer); - for chunk in input { - block_on(writer.write_all(chunk)).unwrap(); - block_on(writer.flush()).unwrap(); - } - block_on(writer.shutdown()).unwrap(); - } - assert!(test_writer.is_closed()); - } - output.into_inner() - } - } -} - #[cfg(feature = "tokio")] pub mod tokio { pub mod bufread { diff --git a/tests/utils/input_stream.rs b/tests/utils/input_stream.rs index 9f1fc162..e9a3dce1 100644 --- a/tests/utils/input_stream.rs +++ b/tests/utils/input_stream.rs @@ -1,4 +1,4 @@ -use futures::stream::{Stream, StreamExt as _}; +use futures::stream::Stream; use futures_test::stream::StreamTestExt as _; use proptest_derive::Arbitrary; @@ -28,12 +28,6 @@ impl InputStream { .interleave_pending() } - pub fn bytes_05_stream(&self) -> impl Stream> { - self.stream() - .map(bytes_05::Bytes::from) - .map(std::io::Result::Ok) - } - pub fn bytes(&self) -> Vec { self.0.iter().flatten().cloned().collect() } diff --git a/tests/utils/mod.rs b/tests/utils/mod.rs index 0f6de461..7c7dc8b7 100644 --- a/tests/utils/mod.rs +++ b/tests/utils/mod.rs @@ -1,10 +1,6 @@ #![allow(dead_code, unused_macros)] // Different tests use a different subset of functions mod input_stream; -#[cfg(feature = "tokio-02")] -mod tokio_02_ext; -#[cfg(feature = "tokio-03")] -mod tokio_03_ext; #[cfg(feature = "tokio")] mod tokio_ext; mod track_closed; diff --git a/tests/utils/test_cases.rs b/tests/utils/test_cases.rs index fc61d78a..aa13d45e 100644 --- a/tests/utils/test_cases.rs +++ b/tests/utils/test_cases.rs @@ -447,259 +447,9 @@ macro_rules! io_test_cases { macro_rules! test_cases { ($variant:ident) => { mod $variant { - #[cfg(feature = "stream")] - #[allow(deprecated)] - mod stream { - mod compress { - use crate::utils::{ - algos::$variant::{stream, sync}, - block_on, one_to_six, one_to_six_stream, InputStream, Level, - }; - use futures::stream::StreamExt as _; - - #[test] - #[ntest::timeout(1000)] - fn empty() { - // Can't use InputStream for this as it will inject extra empty chunks - let compressed = stream::compress(futures::stream::empty()); - let output = sync::decompress(&compressed); - - assert_eq!(output, &[][..]); - } - - #[test] - #[ntest::timeout(1000)] - fn empty_chunk() { - let input = InputStream::new(vec![vec![]]); - - let compressed = stream::compress(input.bytes_05_stream()); - let output = sync::decompress(&compressed); - - assert_eq!(output, input.bytes()); - } - - #[test] - #[ntest::timeout(1000)] - fn short() { - let compressed = stream::compress(one_to_six_stream().bytes_05_stream()); - let output = sync::decompress(&compressed); - - assert_eq!(output, one_to_six()); - } - - #[test] - #[ntest::timeout(1000)] - fn long() { - let input = InputStream::new(vec![ - (0..32_768).map(|_| rand::random()).collect(), - (0..32_768).map(|_| rand::random()).collect(), - ]); - - let compressed = stream::compress(input.bytes_05_stream()); - let output = sync::decompress(&compressed); - - assert_eq!(output, input.bytes()); - } - - #[test] - #[ntest::timeout(1000)] - fn error() { - let err = std::io::Error::new(std::io::ErrorKind::Other, "failure"); - let input = futures::stream::iter(vec![Err(err)]); - - let mut stream = stream::Encoder::with_quality(input, Level::Fastest); - - assert!(block_on(stream.next()).unwrap().is_err()); - assert!(block_on(stream.next()).is_none()); - } - - #[test] - fn with_level_best() { - let encoder = stream::Encoder::with_quality( - one_to_six_stream().bytes_05_stream(), - Level::Best, - ); - let compressed = stream::to_vec(encoder); - let output = sync::decompress(&compressed); - - assert_eq!(output, one_to_six()); - } - - #[test] - fn with_level_default() { - let encoder = stream::Encoder::new(one_to_six_stream().bytes_05_stream()); - let compressed = stream::to_vec(encoder); - let output = sync::decompress(&compressed); - - assert_eq!(output, one_to_six()); - } - - #[test] - fn with_level_0() { - let encoder = stream::Encoder::with_quality( - one_to_six_stream().bytes_05_stream(), - Level::Precise(0), - ); - let compressed = stream::to_vec(encoder); - let output = sync::decompress(&compressed); - - assert_eq!(output, one_to_six()); - } - - #[test] - fn with_level_max() { - let encoder = stream::Encoder::with_quality( - one_to_six_stream().bytes_05_stream(), - Level::Precise(u32::max_value()), - ); - let compressed = stream::to_vec(encoder); - let output = sync::decompress(&compressed); - - assert_eq!(output, one_to_six()); - } - } - - mod decompress { - use crate::utils::{ - algos::$variant::{stream, sync}, - block_on, one_to_six, one_to_six_stream, InputStream, - }; - use futures::stream::{StreamExt as _, TryStreamExt as _}; - - #[test] - #[ntest::timeout(1000)] - fn empty() { - let compressed = sync::compress(&[]); - - let input = InputStream::new(vec![compressed]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, &[][..]); - } - - #[test] - #[ntest::timeout(1000)] - fn short() { - let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); - - let input = InputStream::new(vec![compressed]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, one_to_six()); - } - - #[test] - #[ntest::timeout(1000)] - fn long() { - let bytes: Vec = (0..65_536).map(|_| rand::random()).collect(); - let compressed = sync::compress(&bytes); - - let input = InputStream::new(vec![compressed]); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, bytes); - } - - #[test] - #[ntest::timeout(1000)] - fn long_chunks() { - let bytes: Vec = (0..65_536).map(|_| rand::random()).collect(); - let compressed = sync::compress(&bytes); - - let input = InputStream::from(compressed.chunks(1024)); - let output = stream::decompress(input.bytes_05_stream()); - - assert_eq!(output, bytes); - } - - #[test] - #[ntest::timeout(1000)] - fn trailer() { - // Currently there is no way to get any partially consumed stream item from - // the decoder, for now we just guarantee that if the compressed frame - // exactly matches an item boundary we will not read the next item from the - // stream. - let compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); - - let input = InputStream::new(vec![compressed, vec![7, 8, 9, 10]]); - - let mut stream = input.bytes_05_stream(); - let output = stream::decompress(&mut stream); - let trailer = stream::to_vec(stream); - - assert_eq!(output, one_to_six()); - assert_eq!(trailer, &[7, 8, 9, 10][..]); - } - - #[test] - #[ntest::timeout(1000)] - fn multiple_members() { - let compressed = [ - sync::compress(&[1, 2, 3, 4, 5, 6]), - sync::compress(&[6, 5, 4, 3, 2, 1]), - ] - .join(&[][..]); - - let input = InputStream::new(vec![compressed]); - - let mut decoder = stream::Decoder::new(input.bytes_05_stream()); - decoder.multiple_members(true); - let output = stream::to_vec(decoder); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]); - } - - #[test] - #[ntest::timeout(1000)] - fn multiple_members_chunked() { - let compressed = [ - sync::compress(&[1, 2, 3, 4, 5, 6]), - sync::compress(&[6, 5, 4, 3, 2, 1]), - ] - .join(&[][..]); - - let input = InputStream::from(compressed.chunks(1)); - - let mut decoder = stream::Decoder::new(input.bytes_05_stream()); - decoder.multiple_members(true); - let output = stream::to_vec(decoder); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]); - } - - #[test] - #[ntest::timeout(1000)] - fn error() { - let err = std::io::Error::new(std::io::ErrorKind::Other, "failure"); - let input = futures::stream::iter(vec![Err(err)]); - - let mut stream = stream::Decoder::new(input); - - assert!(block_on(stream.by_ref().try_collect::>()).is_err()); - assert!(block_on(stream.next()).is_none()); - } - - #[test] - #[ntest::timeout(1000)] - fn invalid_data() { - let mut stream = - stream::Decoder::new(one_to_six_stream().bytes_05_stream()); - - assert!(block_on(stream.by_ref().try_collect::>()).is_err()); - assert!(block_on(stream.next()).is_none()); - } - } - } - #[cfg(feature = "futures-io")] io_test_cases!(futures, $variant); - #[cfg(feature = "tokio-02")] - io_test_cases!(tokio_02, $variant); - - #[cfg(feature = "tokio-03")] - io_test_cases!(tokio_03, $variant); - #[cfg(feature = "tokio")] io_test_cases!(tokio, $variant); } diff --git a/tests/utils/tokio_02_ext/copy_buf.rs b/tests/utils/tokio_02_ext/copy_buf.rs deleted file mode 100644 index a472c097..00000000 --- a/tests/utils/tokio_02_ext/copy_buf.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use futures::ready; -use tokio_02::io::{AsyncBufRead, AsyncWrite}; - -pub fn copy_buf(reader: R, writer: &mut W) -> CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - CopyBuf { - reader, - writer, - amt: 0, - } -} - -#[derive(Debug)] -pub struct CopyBuf<'a, R, W: ?Sized> { - reader: R, - writer: &'a mut W, - amt: u64, -} - -impl Future for CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - type Output = std::io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - let buffer = ready!(Pin::new(&mut this.reader).poll_fill_buf(cx))?; - if buffer.is_empty() { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; - if i == 0 { - return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); - } - this.amt += i as u64; - Pin::new(&mut this.reader).consume(i); - } - } -} diff --git a/tests/utils/tokio_02_ext/interleave_pending.rs b/tests/utils/tokio_02_ext/interleave_pending.rs deleted file mode 100644 index c675f4f7..00000000 --- a/tests/utils/tokio_02_ext/interleave_pending.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -pub struct InterleavePending { - inner: T, - pended: bool, -} - -impl InterleavePending { - pub(crate) fn new(inner: T) -> Self { - Self { - inner, - pended: false, - } - } -} - -impl tokio_02::io::AsyncWrite for InterleavePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_write(cx, buf); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_flush(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_shutdown(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } -} diff --git a/tests/utils/tokio_02_ext/limited.rs b/tests/utils/tokio_02_ext/limited.rs deleted file mode 100644 index f4c94467..00000000 --- a/tests/utils/tokio_02_ext/limited.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -#[derive(Debug)] -pub struct Limited { - io: Io, - limit: usize, -} - -impl Limited { - pub(crate) fn new(io: Io, limit: usize) -> Limited { - Limited { io, limit } - } -} - -impl tokio_02::io::AsyncWrite for Limited { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let limit = self.limit; - Pin::new(&mut self.io).poll_write(cx, &buf[..std::cmp::min(limit, buf.len())]) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_shutdown(cx) - } -} diff --git a/tests/utils/tokio_02_ext/mod.rs b/tests/utils/tokio_02_ext/mod.rs deleted file mode 100644 index ee510b1b..00000000 --- a/tests/utils/tokio_02_ext/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -mod copy_buf; -mod interleave_pending; -mod limited; - -pub use copy_buf::copy_buf; - -pub trait AsyncWriteTestExt: tokio_02::io::AsyncWrite { - fn interleave_pending_write(self) -> interleave_pending::InterleavePending - where - Self: Sized + Unpin, - { - interleave_pending::InterleavePending::new(self) - } - - fn limited_write(self, limit: usize) -> limited::Limited - where - Self: Sized + Unpin, - { - limited::Limited::new(self, limit) - } -} - -impl AsyncWriteTestExt for T {} diff --git a/tests/utils/tokio_03_ext/copy_buf.rs b/tests/utils/tokio_03_ext/copy_buf.rs deleted file mode 100644 index d144fd19..00000000 --- a/tests/utils/tokio_03_ext/copy_buf.rs +++ /dev/null @@ -1,52 +0,0 @@ -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; -use futures::ready; -use tokio_03::io::{AsyncBufRead, AsyncWrite}; - -pub fn copy_buf(reader: R, writer: &mut W) -> CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - CopyBuf { - reader, - writer, - amt: 0, - } -} - -#[derive(Debug)] -pub struct CopyBuf<'a, R, W: ?Sized> { - reader: R, - writer: &'a mut W, - amt: u64, -} - -impl Future for CopyBuf<'_, R, W> -where - R: AsyncBufRead + Unpin, - W: AsyncWrite + Unpin + ?Sized, -{ - type Output = std::io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = &mut *self; - loop { - let buffer = ready!(Pin::new(&mut this.reader).poll_fill_buf(cx))?; - if buffer.is_empty() { - ready!(Pin::new(&mut this.writer).poll_flush(cx))?; - return Poll::Ready(Ok(this.amt)); - } - - let i = ready!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; - if i == 0 { - return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); - } - this.amt += i as u64; - Pin::new(&mut this.reader).consume(i); - } - } -} diff --git a/tests/utils/tokio_03_ext/interleave_pending.rs b/tests/utils/tokio_03_ext/interleave_pending.rs deleted file mode 100644 index 4948b6e0..00000000 --- a/tests/utils/tokio_03_ext/interleave_pending.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -pub struct InterleavePending { - inner: T, - pended: bool, -} - -impl InterleavePending { - pub(crate) fn new(inner: T) -> Self { - Self { - inner, - pended: false, - } - } -} - -impl tokio_03::io::AsyncWrite for InterleavePending { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_write(cx, buf); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_flush(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.pended { - let next = Pin::new(&mut self.inner).poll_shutdown(cx); - if next.is_ready() { - self.pended = false; - } - next - } else { - cx.waker().wake_by_ref(); - self.pended = true; - Poll::Pending - } - } -} diff --git a/tests/utils/tokio_03_ext/limited.rs b/tests/utils/tokio_03_ext/limited.rs deleted file mode 100644 index b943f470..00000000 --- a/tests/utils/tokio_03_ext/limited.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -#[derive(Debug)] -pub struct Limited { - io: Io, - limit: usize, -} - -impl Limited { - pub(crate) fn new(io: Io, limit: usize) -> Limited { - Limited { io, limit } - } -} - -impl tokio_03::io::AsyncWrite for Limited { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let limit = self.limit; - Pin::new(&mut self.io).poll_write(cx, &buf[..std::cmp::min(limit, buf.len())]) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.io).poll_shutdown(cx) - } -} diff --git a/tests/utils/tokio_03_ext/mod.rs b/tests/utils/tokio_03_ext/mod.rs deleted file mode 100644 index 73d8f414..00000000 --- a/tests/utils/tokio_03_ext/mod.rs +++ /dev/null @@ -1,23 +0,0 @@ -mod copy_buf; -mod interleave_pending; -mod limited; - -pub use copy_buf::copy_buf; - -pub trait AsyncWriteTestExt: tokio_03::io::AsyncWrite { - fn interleave_pending_write(self) -> interleave_pending::InterleavePending - where - Self: Sized + Unpin, - { - interleave_pending::InterleavePending::new(self) - } - - fn limited_write(self, limit: usize) -> limited::Limited - where - Self: Sized + Unpin, - { - limited::Limited::new(self, limit) - } -} - -impl AsyncWriteTestExt for T {} diff --git a/tests/utils/track_closed.rs b/tests/utils/track_closed.rs index 081f3831..ea32640c 100644 --- a/tests/utils/track_closed.rs +++ b/tests/utils/track_closed.rs @@ -56,54 +56,6 @@ impl futures::io::AsyncWrite for TrackClosed } } -#[cfg(feature = "tokio-02")] -impl tokio_02::io::AsyncWrite for TrackClosed { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - match Pin::new(&mut self.inner).poll_shutdown(cx) { - Poll::Ready(Ok(())) => { - self.closed = true; - Poll::Ready(Ok(())) - } - other => other, - } - } -} - -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncWrite for TrackClosed { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - Pin::new(&mut self.inner).poll_flush(cx) - } - - fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - assert!(!self.closed); - match Pin::new(&mut self.inner).poll_shutdown(cx) { - Poll::Ready(Ok(())) => { - self.closed = true; - Poll::Ready(Ok(())) - } - other => other, - } - } -} - #[cfg(feature = "tokio")] impl tokio::io::AsyncWrite for TrackClosed { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { diff --git a/tests/utils/track_eof.rs b/tests/utils/track_eof.rs index d2101d13..d0065059 100644 --- a/tests/utils/track_eof.rs +++ b/tests/utils/track_eof.rs @@ -59,87 +59,6 @@ impl futures::io::AsyncBufRead for TrackEo } } -#[cfg(feature = "tokio-02")] -impl tokio_02::io::AsyncRead for TrackEof { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - match inner.poll_read(cx, buf) { - Poll::Ready(Ok(0)) => { - if !buf.is_empty() { - *eof = true; - } - Poll::Ready(Ok(0)) - } - other => other, - } - } -} - -#[cfg(feature = "tokio-02")] -impl tokio_02::io::AsyncBufRead for TrackEof { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - match inner.poll_fill_buf(cx) { - Poll::Ready(Ok(buf)) => { - if buf.is_empty() { - *eof = true; - } - Poll::Ready(Ok(buf)) - } - other => other, - } - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().0.consume(amt) - } -} - -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncRead for TrackEof { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut tokio_03::io::ReadBuf, - ) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - let len = buf.filled().len(); - match inner.poll_read(cx, buf) { - Poll::Ready(Ok(())) => { - if buf.filled().len() == len && buf.remaining() > 0 { - *eof = true; - } - Poll::Ready(Ok(())) - } - other => other, - } - } -} - -#[cfg(feature = "tokio-03")] -impl tokio_03::io::AsyncBufRead for TrackEof { - fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let (inner, eof) = self.project(); - assert!(!*eof); - match inner.poll_fill_buf(cx) { - Poll::Ready(Ok(buf)) => { - if buf.is_empty() { - *eof = true; - } - Poll::Ready(Ok(buf)) - } - other => other, - } - } - - fn consume(self: Pin<&mut Self>, amt: usize) { - self.project().0.consume(amt) - } -} - #[cfg(feature = "tokio")] impl tokio::io::AsyncRead for TrackEof { fn poll_read( diff --git a/tests/xz.rs b/tests/xz.rs index 9cf62b62..f029993c 100644 --- a/tests/xz.rs +++ b/tests/xz.rs @@ -1,5 +1,5 @@ #[allow(unused)] -use futures::{executor::block_on, io::AsyncReadExt, stream::StreamExt}; +use futures::{executor::block_on, io::AsyncReadExt}; #[macro_use] mod utils; @@ -9,58 +9,9 @@ test_cases!(xz); #[allow(unused)] use utils::{algos::xz::sync, InputStream}; -#[cfg(feature = "stream")] -use utils::algos::xz::stream; - #[cfg(feature = "futures-io")] use utils::algos::xz::futures::{bufread, read}; -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn stream_multiple_members_with_padding() { - let compressed = [ - sync::compress(&[1, 2, 3, 4, 5, 6]), - vec![0, 0, 0, 0], - sync::compress(&[6, 5, 4, 3, 2, 1]), - vec![0, 0, 0, 0], - ] - .join(&[][..]); - - let input = InputStream::from(vec![compressed]); - - #[allow(deprecated)] - let mut decoder = stream::Decoder::new(input.bytes_05_stream()); - #[allow(deprecated)] - decoder.multiple_members(true); - let output = stream::to_vec(decoder); - - assert_eq!(output, &[1, 2, 3, 4, 5, 6, 6, 5, 4, 3, 2, 1][..]); -} - -#[test] -#[ntest::timeout(1000)] -#[cfg(feature = "stream")] -fn stream_multiple_members_with_invalid_padding() { - let compressed = [ - sync::compress(&[1, 2, 3, 4, 5, 6]), - vec![0, 0, 0], - sync::compress(&[6, 5, 4, 3, 2, 1]), - vec![0, 0, 0, 0], - ] - .join(&[][..]); - - let input = InputStream::from(vec![compressed]); - - #[allow(deprecated)] - let mut decoder = stream::Decoder::new(input.bytes_05_stream()); - #[allow(deprecated)] - decoder.multiple_members(true); - - assert!(block_on(decoder.next()).unwrap().is_err()); - assert!(block_on(decoder.next()).is_none()); -} - #[test] #[ntest::timeout(1000)] #[cfg(feature = "futures-io")]