From b432cd29e4aa96e7552762cf11fd6d1caff0b348 Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Wed, 14 Sep 2022 22:36:58 +0800 Subject: [PATCH 1/8] fix: cannot serve large file with compression --- src/http/content_encoding.rs | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index efab22f..e8f3024 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -7,8 +7,9 @@ // except according to those terms. use std::cmp::Ordering; -use std::io::{self, BufReader}; +use std::io::{self, Read}; +use bytes::{Bytes, BytesMut}; use flate2::read::{DeflateEncoder, GzEncoder}; use flate2::Compression; use hyper::header::HeaderValue; @@ -128,24 +129,16 @@ pub fn get_prior_encoding<'a>(accept_encoding: &'a HeaderValue) -> &'static str /// /// * `data` - Data to be compressed. /// * `encoding` - Only support `br`, `gzip`, `deflate` and `identity`. -pub fn compress(data: &[u8], encoding: &str) -> io::Result> { - use std::io::prelude::*; - let mut buf = Vec::new(); - match encoding { - BR => { - BufReader::new(brotli::CompressorReader::new(data, 4096, 6, 20)) - .read_to_end(&mut buf)?; - } - GZIP => { - BufReader::new(GzEncoder::new(data, Compression::default())).read_to_end(&mut buf)?; - } - DEFLATE => { - BufReader::new(DeflateEncoder::new(data, Compression::default())) - .read_to_end(&mut buf)?; - } +pub fn compress(data: &[u8], encoding: &str) -> io::Result { + let mut buf = BytesMut::zeroed(4_096); + let read_bytes = match encoding { + BR => brotli::CompressorReader::new(data, 4096, 6, 20).read(&mut buf[..])?, + GZIP => GzEncoder::new(data, Compression::default()).read(&mut buf[..])?, + DEFLATE => DeflateEncoder::new(data, Compression::default()).read(&mut buf[..])?, _ => return Err(io::Error::new(io::ErrorKind::Other, "Unsupported Encoding")), }; - Ok(buf) + buf.truncate(read_bytes); + Ok(buf.freeze()) } pub fn should_compress(enc: &str) -> bool { @@ -258,10 +251,10 @@ mod t_compress { #[test] fn compressed() { let buf = compress(b"xxxxx", DEFLATE); - assert!(!buf.unwrap().is_empty()); + assert_eq!(buf.unwrap().len(), 5); let buf = compress(b"xxxxx", GZIP); - assert!(!buf.unwrap().is_empty()); + assert_eq!(buf.unwrap().len(), 15); let buf = compress(b"xxxxx", BR); - assert!(!buf.unwrap().is_empty()); + assert_eq!(buf.unwrap().len(), 9); } } From 83edee6f4a2bf84c5ea0e32dff67c7a05b579378 Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Thu, 15 Sep 2022 22:58:06 +0800 Subject: [PATCH 2/8] refactor: replace compress with async-compression --- Cargo.lock | 29 ++++++++++++++++ Cargo.toml | 7 ++++ src/http/content_encoding.rs | 67 ++++++++++++++++++++---------------- src/server/serve.rs | 17 ++++----- 4 files changed, 81 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa11c76..ab8e6d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,20 @@ dependencies = [ "libc", ] +[[package]] +name = "async-compression" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "345fd392ab01f746c717b1357165b76f0b67a60192007b234058c9045fdcf695" +dependencies = [ + "brotli", + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -932,6 +946,7 @@ dependencies = [ name = "sfz" version = "0.7.1" dependencies = [ + "async-compression", "brotli", "bytes", "chrono", @@ -949,6 +964,7 @@ dependencies = [ "tempfile", "tera", "tokio", + "tokio-util", "zip", ] @@ -1129,6 +1145,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower-service" version = "0.3.2" diff --git a/Cargo.toml b/Cargo.toml index 4521738..a282f0c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ edition = "2021" clap = { version = "3", default-features = false, features = ["std", "cargo"] } # Server tokio = { version = "1", features = ["rt-multi-thread", "macros"] } +tokio-util = { version = "0.7", features = ["io"] } hyper = { version = "0.14.20", features = ["http1", "server", "tcp", "stream"] } headers = "0.3" mime_guess = "2.0" @@ -39,6 +40,12 @@ zip = { version = "0.6", default-features = false, features = ["deflate"] } futures = "0.3" tempfile = "3" bytes = "1" +async-compression = { version = "0.3.7", features = [ + "brotli", + "deflate", + "gzip", + "tokio", +] } [dev-dependencies] tempfile = "3" diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index e8f3024..1ab951e 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -7,12 +7,14 @@ // except according to those terms. use std::cmp::Ordering; -use std::io::{self, Read}; +use std::io; -use bytes::{Bytes, BytesMut}; -use flate2::read::{DeflateEncoder, GzEncoder}; -use flate2::Compression; +use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder}; +use bytes::Bytes; +use futures::Stream; use hyper::header::HeaderValue; +use hyper::Body; +use tokio_util::io::{ReaderStream, StreamReader}; pub const IDENTITY: &str = "identity"; pub const DEFLATE: &str = "deflate"; @@ -123,22 +125,22 @@ pub fn get_prior_encoding<'a>(accept_encoding: &'a HeaderValue) -> &'static str .unwrap_or(IDENTITY) } -/// Compress data. -/// -/// # Parameters -/// -/// * `data` - Data to be compressed. -/// * `encoding` - Only support `br`, `gzip`, `deflate` and `identity`. -pub fn compress(data: &[u8], encoding: &str) -> io::Result { - let mut buf = BytesMut::zeroed(4_096); - let read_bytes = match encoding { - BR => brotli::CompressorReader::new(data, 4096, 6, 20).read(&mut buf[..])?, - GZIP => GzEncoder::new(data, Compression::default()).read(&mut buf[..])?, - DEFLATE => DeflateEncoder::new(data, Compression::default()).read(&mut buf[..])?, - _ => return Err(io::Error::new(io::ErrorKind::Other, "Unsupported Encoding")), - }; - buf.truncate(read_bytes); - Ok(buf.freeze()) +pub fn compress_stream( + input: impl Stream> + std::marker::Send + 'static, + encoding: &str, +) -> io::Result { + match encoding { + BR => Ok(Body::wrap_stream(ReaderStream::new(BrotliEncoder::new( + StreamReader::new(input), + )))), + DEFLATE => Ok(Body::wrap_stream(ReaderStream::new(DeflateEncoder::new( + StreamReader::new(input), + )))), + GZIP => Ok(Body::wrap_stream(ReaderStream::new(GzipEncoder::new( + StreamReader::new(input), + )))), + _ => Err(io::Error::new(io::ErrorKind::Other, "Unsupported Encoding")), + } } pub fn should_compress(enc: &str) -> bool { @@ -241,20 +243,27 @@ mod t_prior { #[cfg(test)] mod t_compress { use super::*; + use bytes::Bytes; #[test] fn failed() { - let error = compress(b"hello", "unrecognized").unwrap_err(); + let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"hello"))]); + let error = compress_stream(s, "unrecognized").unwrap_err(); assert_eq!(error.kind(), io::ErrorKind::Other); } - #[test] - fn compressed() { - let buf = compress(b"xxxxx", DEFLATE); - assert_eq!(buf.unwrap().len(), 5); - let buf = compress(b"xxxxx", GZIP); - assert_eq!(buf.unwrap().len(), 15); - let buf = compress(b"xxxxx", BR); - assert_eq!(buf.unwrap().len(), 9); + #[tokio::test] + async fn compressed() { + let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"xxxxx"))]); + let body = compress_stream(s, BR).unwrap(); + assert_eq!(hyper::body::to_bytes(body).await.unwrap().len(), 10); + + let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"xxxxx"))]); + let body = compress_stream(s, DEFLATE).unwrap(); + assert_eq!(hyper::body::to_bytes(body).await.unwrap().len(), 5); + + let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"xxxxx"))]); + let body = compress_stream(s, GZIP).unwrap(); + assert_eq!(hyper::body::to_bytes(body).await.unwrap().len(), 23); } } diff --git a/src/server/serve.rs b/src/server/serve.rs index 5190c1b..983cd0d 100644 --- a/src/server/serve.rs +++ b/src/server/serve.rs @@ -14,7 +14,7 @@ use std::sync::Arc; use std::time::Duration; use chrono::Local; -use futures::{StreamExt as _, TryStreamExt as _}; +use futures::TryStreamExt as _; use headers::{ AcceptRanges, AccessControlAllowHeaders, AccessControlAllowOrigin, CacheControl, ContentLength, ContentType, ETag, HeaderMapExt, LastModified, Range, Server, @@ -32,7 +32,7 @@ use serde::Serialize; use crate::cli::Args; use crate::extensions::{MimeExt, PathExt, SystemTimeExt}; use crate::http::conditional_requests::{is_fresh, is_precondition_failed}; -use crate::http::content_encoding::{compress, get_prior_encoding, should_compress}; +use crate::http::content_encoding::{compress_stream, get_prior_encoding, should_compress}; use crate::http::range_requests::{is_range_fresh, is_satisfiable_range}; use crate::server::send::{send_dir, send_dir_as_zip, send_file, send_file_with_range}; @@ -411,13 +411,10 @@ impl InnerService { if let Some(encoding) = req.headers().get(hyper::header::ACCEPT_ENCODING) { let content_encoding = get_prior_encoding(encoding); if should_compress(content_encoding) { - let stream = body - .map_err(|_e| io::Error::from(io::ErrorKind::InvalidData)) - .map(|b| match b { - Ok(b) => compress(&b, content_encoding), - Err(e) => Err(e), - }); - let body = Body::wrap_stream(stream); + let b = compress_stream( + body.map_err(|e| io::Error::new(io::ErrorKind::Other, e)), + content_encoding, + )?; res.headers_mut().insert( hyper::header::CONTENT_ENCODING, hyper::header::HeaderValue::from_static(content_encoding), @@ -427,7 +424,7 @@ impl InnerService { hyper::header::VARY, hyper::header::HeaderValue::from_name(hyper::header::ACCEPT_ENCODING), ); - body + b } else { body } From 5aa08e996c380347dae855c32aaa473c59a6ce7e Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Thu, 15 Sep 2022 23:49:55 +0800 Subject: [PATCH 3/8] refactor: set brotli compression level to fastest --- src/http/content_encoding.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index 1ab951e..d5abcf0 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -9,7 +9,10 @@ use std::cmp::Ordering; use std::io; -use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder}; +use async_compression::{ + tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder}, + Level, +}; use bytes::Bytes; use futures::Stream; use hyper::header::HeaderValue; @@ -130,9 +133,9 @@ pub fn compress_stream( encoding: &str, ) -> io::Result { match encoding { - BR => Ok(Body::wrap_stream(ReaderStream::new(BrotliEncoder::new( - StreamReader::new(input), - )))), + BR => Ok(Body::wrap_stream(ReaderStream::new( + BrotliEncoder::with_quality(StreamReader::new(input), Level::Fastest), + ))), DEFLATE => Ok(Body::wrap_stream(ReaderStream::new(DeflateEncoder::new( StreamReader::new(input), )))), From ecd357440cce2a5c942d34c66365998f07bbb5cf Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Fri, 16 Sep 2022 00:24:06 +0800 Subject: [PATCH 4/8] chore: dependency cleanup --- Cargo.lock | 2 -- Cargo.toml | 14 ++++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ab8e6d0..669b2dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -947,11 +947,9 @@ name = "sfz" version = "0.7.1" dependencies = [ "async-compression", - "brotli", "bytes", "chrono", "clap", - "flate2", "futures", "headers", "hyper", diff --git a/Cargo.toml b/Cargo.toml index a282f0c..478c811 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,8 +24,12 @@ headers = "0.3" mime_guess = "2.0" percent-encoding = "2.1" # Compression -brotli = "3" -flate2 = "1" +async-compression = { version = "0.3.7", features = [ + "brotli", + "deflate", + "gzip", + "tokio", +] } # Rendering tera = "1" serde = { version = "1.0", features = [ @@ -40,12 +44,6 @@ zip = { version = "0.6", default-features = false, features = ["deflate"] } futures = "0.3" tempfile = "3" bytes = "1" -async-compression = { version = "0.3.7", features = [ - "brotli", - "deflate", - "gzip", - "tokio", -] } [dev-dependencies] tempfile = "3" From 8fc0a6b02005075cbac32014af5a0fd46415244a Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Fri, 16 Sep 2022 00:25:16 +0800 Subject: [PATCH 5/8] test: fix t_compress --- src/http/content_encoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index d5abcf0..1d94b1c 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -259,7 +259,7 @@ mod t_compress { async fn compressed() { let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"xxxxx"))]); let body = compress_stream(s, BR).unwrap(); - assert_eq!(hyper::body::to_bytes(body).await.unwrap().len(), 10); + assert_eq!(hyper::body::to_bytes(body).await.unwrap().len(), 9); let s = futures::stream::iter(vec![Ok::<_, io::Error>(Bytes::from_static(b"xxxxx"))]); let body = compress_stream(s, DEFLATE).unwrap(); From 166e6e7b5aa39f8116608919e0d0147518d8a948 Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Sat, 17 Sep 2022 18:26:33 +0800 Subject: [PATCH 6/8] refactor: nitpick Co-authored-by: Weihang Lo --- src/http/content_encoding.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index 1d94b1c..49a55e9 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -129,7 +129,7 @@ pub fn get_prior_encoding<'a>(accept_encoding: &'a HeaderValue) -> &'static str } pub fn compress_stream( - input: impl Stream> + std::marker::Send + 'static, + input: impl Stream> + Send + 'static, encoding: &str, ) -> io::Result { match encoding { @@ -246,7 +246,6 @@ mod t_prior { #[cfg(test)] mod t_compress { use super::*; - use bytes::Bytes; #[test] fn failed() { From 5c96c8d4835e9ac3bb324ad5ab38c257a928a034 Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Sat, 17 Sep 2022 18:39:33 +0800 Subject: [PATCH 7/8] doc: compress_stream --- src/http/content_encoding.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index 49a55e9..83a4f6a 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -128,6 +128,12 @@ pub fn get_prior_encoding<'a>(accept_encoding: &'a HeaderValue) -> &'static str .unwrap_or(IDENTITY) } +/// Compress data stream. +/// +/// # Parameters +/// +/// * `input` - [`futures::stream::Stream`](futures::stream::Stream) to be compressed e.g. [`hyper::body::Body`](hyper::body::Body). +/// * `encoding` - Only support `br`, `deflate`, `gzip` and `identity`. pub fn compress_stream( input: impl Stream> + Send + 'static, encoding: &str, From 255065409339c3620ca9fdac9ec58dafb015e722 Mon Sep 17 00:00:00 2001 From: Heng-Yi Wu <2316687+henry40408@users.noreply.github.com> Date: Sat, 17 Sep 2022 19:10:24 +0800 Subject: [PATCH 8/8] refactor: intra doc link in compress_data doc Co-authored-by: Weihang Lo --- src/http/content_encoding.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http/content_encoding.rs b/src/http/content_encoding.rs index 83a4f6a..e62178e 100644 --- a/src/http/content_encoding.rs +++ b/src/http/content_encoding.rs @@ -132,7 +132,7 @@ pub fn get_prior_encoding<'a>(accept_encoding: &'a HeaderValue) -> &'static str /// /// # Parameters /// -/// * `input` - [`futures::stream::Stream`](futures::stream::Stream) to be compressed e.g. [`hyper::body::Body`](hyper::body::Body). +/// * `input` - [`futures::stream::Stream`] to be compressed, e.g. [`hyper::body::Body`]. /// * `encoding` - Only support `br`, `deflate`, `gzip` and `identity`. pub fn compress_stream( input: impl Stream> + Send + 'static,