From e01c78f85434177b78cf1cf876eb9a2fa41bc9c3 Mon Sep 17 00:00:00 2001 From: Kyle Barron Date: Sun, 6 Mar 2022 13:42:00 -0700 Subject: [PATCH 1/2] Use lz4_flex --- Cargo.toml | 1 + src/compression.rs | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 657355c5b..3ea2a91ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ brotli = { version = "^3.3", optional = true } flate2 = { version = "^1.0", optional = true } lz4 = { version = "1.23.3", optional = true } zstd = { version = "^0.11", optional = true, default-features = false } +lz4_flex = { version = "^0.9.2", optional = true } xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] } diff --git a/src/compression.rs b/src/compression.rs index a302ef0d4..99b35caeb 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -62,6 +62,15 @@ pub fn compress( crate::error::Feature::Snappy, "compress to snappy".to_string(), )), + #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] + Compression::Lz4Raw => { + let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); + output_buf.resize(required_len, 0); + + let compressed_size = lz4_flex::block::compress_into(input_buf, output_buf).unwrap(); + output_buf.truncate(compressed_size); + Ok(()) + } #[cfg(feature = "lz4")] Compression::Lz4Raw => { let output_buf_len = output_buf.len(); @@ -76,7 +85,7 @@ pub fn compress( output_buf.truncate(output_buf_len + size); Ok(()) } - #[cfg(not(feature = "lz4"))] + #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))] Compression::Lz4Raw => Err(Error::FeatureNotActive( crate::error::Feature::Lz4, "compress to lz4".to_string(), @@ -153,13 +162,18 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [ crate::error::Feature::Snappy, "decompress with snappy".to_string(), )), + #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] + Compression::Lz4Raw => { + lz4_flex::block::decompress_into(input_buf, output_buf).unwrap(); + Ok(()) + } #[cfg(feature = "lz4")] Compression::Lz4Raw => { lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf) .map(|_| {}) .map_err(|e| e.into()) } - #[cfg(not(feature = "lz4"))] + #[cfg(all(not(feature = "lz4"), not(feature = "lz4_flex")))] Compression::Lz4Raw => Err(Error::FeatureNotActive( crate::error::Feature::Lz4, "decompress with lz4".to_string(), From 20c4d96eff5c472cc91784e43111ac92f7187059 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 12 Apr 2022 11:13:49 +0000 Subject: [PATCH 2/2] Fixed LZ4 --- .github/workflows/test.yml | 2 ++ src/compression.rs | 15 ++++++++------- src/error.rs | 14 ++++++++++++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d50281b61..c1bdaeeaa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -24,6 +24,8 @@ jobs: deactivate - name: Run run: cargo test + - name: Run lz4-flex + run: cargo test --no-default-features --features lz4_flex,bloom_filter,stream,snappy,brotli,zstd,gzip clippy: name: Clippy diff --git a/src/compression.rs b/src/compression.rs index 99b35caeb..2760fcc5d 100644 --- a/src/compression.rs +++ b/src/compression.rs @@ -64,11 +64,13 @@ pub fn compress( )), #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] Compression::Lz4Raw => { + let output_buf_len = output_buf.len(); let required_len = lz4_flex::block::get_maximum_output_size(input_buf.len()); - output_buf.resize(required_len, 0); + output_buf.resize(output_buf_len + required_len, 0); - let compressed_size = lz4_flex::block::compress_into(input_buf, output_buf).unwrap(); - output_buf.truncate(compressed_size); + let compressed_size = + lz4_flex::block::compress_into(input_buf, &mut output_buf[output_buf_len..])?; + output_buf.truncate(output_buf_len + compressed_size); Ok(()) } #[cfg(feature = "lz4")] @@ -163,10 +165,9 @@ pub fn decompress(compression: Compression, input_buf: &[u8], output_buf: &mut [ "decompress with snappy".to_string(), )), #[cfg(all(feature = "lz4_flex", not(feature = "lz4")))] - Compression::Lz4Raw => { - lz4_flex::block::decompress_into(input_buf, output_buf).unwrap(); - Ok(()) - } + Compression::Lz4Raw => lz4_flex::block::decompress_into(input_buf, output_buf) + .map(|_| {}) + .map_err(|e| e.into()), #[cfg(feature = "lz4")] Compression::Lz4Raw => { lz4::block::decompress_to_buffer(input_buf, Some(output_buf.len() as i32), output_buf) diff --git a/src/error.rs b/src/error.rs index c8486d687..37a13dcda 100644 --- a/src/error.rs +++ b/src/error.rs @@ -64,6 +64,20 @@ impl From for Error { } } +#[cfg(feature = "lz4_flex")] +impl From for Error { + fn from(e: lz4_flex::block::DecompressError) -> Error { + Error::General(format!("underlying lz4_flex error: {}", e)) + } +} + +#[cfg(feature = "lz4_flex")] +impl From for Error { + fn from(e: lz4_flex::block::CompressError) -> Error { + Error::General(format!("underlying lz4_flex error: {}", e)) + } +} + impl From for Error { fn from(e: parquet_format_async_temp::thrift::Error) -> Error { Error::General(format!("underlying thrift error: {}", e))