From 15ef1ab1126c9579d960933a760457f762949765 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sun, 22 Sep 2024 19:33:08 +0200 Subject: [PATCH 1/7] Update Cargo.toml files --- Cargo.toml | 3 +++ isal-sys/Cargo.toml | 2 ++ 2 files changed, 5 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 1f739f7..305befe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,9 @@ name = "isal-rs" version = "0.2.0+496255c" edition = "2021" +description = "isa-l Rust bindings" +license = "MIT" +readme = "README.md" [lib] name = "isal" diff --git a/isal-sys/Cargo.toml b/isal-sys/Cargo.toml index d5f1961..37f4917 100644 --- a/isal-sys/Cargo.toml +++ b/isal-sys/Cargo.toml @@ -2,6 +2,8 @@ name = "isal-sys" version = "0.2.0+496255c" edition = "2021" +description = "isa-l sys crate" +license = "MIT" [features] default = ["static"] From c1c993791f975524a5b2d7e40338879ecefe8fd2 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Sun, 22 Sep 2024 19:34:32 +0200 Subject: [PATCH 2/7] Add version for isal-sys crate --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 305befe..c947823 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ shared = ["isal-sys/shared"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -isal-sys = { path = "isal-sys"} +isal-sys = { path = "isal-sys", version = "0.2.0+496255c" } [dev-dependencies] criterion = "0.3" From b775a930a1e4df092864da9ec6d569ab6aa4ee0d Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 23 Sep 2024 05:13:08 +0200 Subject: [PATCH 3/7] Impl of write::Encoder --- src/{igzip.rs => igzip/mod.rs} | 481 +++++---------------------------- src/igzip/read.rs | 348 ++++++++++++++++++++++++ src/igzip/write.rs | 206 ++++++++++++++ 3 files changed, 619 insertions(+), 416 deletions(-) rename src/{igzip.rs => igzip/mod.rs} (54%) create mode 100644 src/igzip/read.rs create mode 100644 src/igzip/write.rs diff --git a/src/igzip.rs b/src/igzip/mod.rs similarity index 54% rename from src/igzip.rs rename to src/igzip/mod.rs index 3fcc15a..56ba1cc 100644 --- a/src/igzip.rs +++ b/src/igzip/mod.rs @@ -1,3 +1,6 @@ +pub mod read; +pub mod write; + use std::io; use std::mem; @@ -7,6 +10,68 @@ use isal_sys::igzip_lib as isal; /// Buffer size pub const BUF_SIZE: usize = 16 * 1024; +/// Compress `input` directly into `output`. This is the fastest possible compression available. +#[inline(always)] +pub fn compress_into( + input: &[u8], + output: &mut [u8], + level: CompressionLevel, + is_gzip: bool, +) -> Result { + let mut zstream = ZStream::new(level, ZStreamKind::Stateless); + + zstream.stream.flush = FlushFlags::NoFlush as _; + zstream.stream.gzip_flag = is_gzip as _; + zstream.stream.end_of_stream = 1; + + // read input into buffer + zstream.stream.avail_in = input.len() as _; + zstream.stream.next_in = input.as_ptr() as *mut _; + + // compress this block in its entirety + zstream.stream.avail_out = output.len() as _; + zstream.stream.next_out = output.as_mut_ptr(); + + zstream.deflate()?; + Ok(zstream.stream.total_out as _) +} + +/// Compress `input` +#[inline(always)] +pub fn compress( + input: R, + level: CompressionLevel, + is_gzip: bool, +) -> Result> { + let mut out = vec![]; + let mut encoder = read::Encoder::new(input, level, is_gzip); + io::copy(&mut encoder, &mut out)?; + Ok(out) +} + +#[inline(always)] +pub fn decompress(input: R) -> Result> { + let mut out = vec![]; + let mut decoder = read::Decoder::new(input); + io::copy(&mut decoder, &mut out)?; + Ok(out) +} + +#[inline(always)] +pub fn decompress_into(input: &[u8], output: &mut [u8]) -> Result { + let mut zst = InflateState::new(); + zst.0.avail_in = input.len() as _; + zst.0.next_in = input.as_ptr() as *mut _; + zst.0.crc_flag = 1; + + zst.0.avail_out = output.len() as _; + zst.0.next_out = output.as_mut_ptr(); + + zst.inflate_stateless()?; + + Ok(zst.0.total_out as _) +} + /// Flush Flags #[derive(Copy, Clone)] #[repr(i8)] @@ -124,360 +189,6 @@ impl TryFrom for CompressionLevel { } } -pub mod read { - - use mem::MaybeUninit; - - use super::*; - - /// Streaming compression for input streams implementing `std::io::Read`. - /// - /// Notes - /// ----- - /// One should consider using `crate::igzip::compress` or `crate::igzip::compress_into` if possible. - /// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. - /// - /// Example - /// ------- - /// ``` - /// use std::{io, io::Read}; - /// use isal::igzip::{read::Encoder, CompressionLevel, decompress}; - /// let data = b"Hello, World!".to_vec(); - /// - /// let mut encoder = Encoder::new(data.as_slice(), CompressionLevel::Three, true); - /// let mut compressed = vec![]; - /// - /// // Numbeer of compressed bytes written to `output` - /// let n = io::copy(&mut encoder, &mut compressed).unwrap(); - /// assert_eq!(n as usize, compressed.len()); - /// - /// let decompressed = decompress(io::Cursor::new(compressed)).unwrap(); - /// assert_eq!(decompressed.as_slice(), data); - /// ``` - pub struct Encoder { - inner: R, - stream: ZStream, - in_buf: [u8; BUF_SIZE], - out_buf: Vec, - dsts: usize, - dste: usize, - } - - impl Encoder { - /// Create a new `Encoder` which implements the `std::io::Read` trait. - pub fn new(reader: R, level: CompressionLevel, is_gzip: bool) -> Encoder { - let in_buf = [0_u8; BUF_SIZE]; - let out_buf = Vec::with_capacity(BUF_SIZE); - - let mut zstream = ZStream::new(level, ZStreamKind::Stateful); - - zstream.stream.end_of_stream = 0; - zstream.stream.flush = FlushFlags::SyncFlush as _; - zstream.stream.gzip_flag = is_gzip as _; - - Self { - inner: reader, - stream: zstream, - in_buf, - out_buf, - dste: 0, - dsts: 0, - } - } - - /// Mutable reference to underlying reader, not advisable to modify during reading. - pub fn get_ref_mut(&mut self) -> &mut R { - &mut self.inner - } - - // Reference to underlying reader - pub fn get_ref(&self) -> &R { - &self.inner - } - - // Read data from intermediate output buffer holding compressed output. - // It's unknown if the output from igzip will fit into buffer passed during read - // so we hold it here and empty as read calls pass. - // thanks to: https://github.com/BurntSushi/rust-snappy/blob/f9eb8d49c713adc48732fb95682a201a7b74d39a/src/read.rs#L327 - #[inline(always)] - fn read_from_out_buf(&mut self, buf: &mut [u8]) -> usize { - let available_bytes = self.dste - self.dsts; - let count = std::cmp::min(available_bytes, buf.len()); - buf[..count].copy_from_slice(&self.out_buf[self.dsts..self.dsts + count]); - self.dsts += count; - count - } - } - - impl io::Read for Encoder { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - // Check if there is data left in out_buf, otherwise refill; if end state, return 0 - let count = self.read_from_out_buf(buf); - if count > 0 { - Ok(count) - } else if self.stream.stream.internal_state.state != isal::isal_zstate_state_ZSTATE_END - { - // Read out next buf len worth to compress; filling intermediate out_buf - self.stream.stream.avail_in = self.inner.read(&mut self.in_buf)? as _; - if self.stream.stream.avail_in < self.in_buf.len() as _ { - self.stream.stream.end_of_stream = 1; - } - self.stream.stream.next_in = self.in_buf.as_mut_ptr(); - - let mut n_bytes = 0; - self.out_buf.truncate(0); - - // compress this chunk into out_buf - while self.stream.stream.avail_in > 0 { - self.out_buf.resize(self.out_buf.len() + BUF_SIZE, 0); - - self.stream.stream.avail_out = BUF_SIZE as _; - self.stream.stream.next_out = - self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr(); - - self.stream.deflate()?; - - n_bytes += BUF_SIZE - self.stream.stream.avail_out as usize; - } - self.out_buf.truncate(n_bytes); - self.dste = n_bytes; - self.dsts = 0; - - Ok(self.read_from_out_buf(buf)) - } else { - Ok(0) - } - } - } - - /// Streaming compression for input streams implementing `std::io::Read`. - /// - /// Notes - /// ----- - /// One should consider using `crate::igzip::decompress` or `crate::igzip::decompress_into` if possible. - /// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. - /// - /// Example - /// ------- - /// ``` - /// use std::{io, io::Read}; - /// use isal::igzip::{read::Decoder, CompressionLevel, compress}; - /// let data = b"Hello, World!".to_vec(); - /// - /// let compressed = compress(data.as_slice(), CompressionLevel::Three, true).unwrap(); - /// let mut decoder = Decoder::new(compressed.as_slice()); - /// let mut decompressed = vec![]; - /// - /// // Numbeer of compressed bytes written to `output` - /// let n = io::copy(&mut decoder, &mut decompressed).unwrap(); - /// assert_eq!(n as usize, data.len()); - /// assert_eq!(decompressed.as_slice(), data); - /// ``` - pub struct Decoder { - inner: R, - zst: InflateState, - in_buf: [u8; BUF_SIZE], - out_buf: Vec, - dsts: usize, - dste: usize, - } - - impl Decoder { - pub fn new(reader: R) -> Decoder { - let mut zst = InflateState::new(); - zst.0.crc_flag = isal::IGZIP_GZIP; - - Self { - inner: reader, - zst, - in_buf: [0u8; BUF_SIZE], - out_buf: Vec::with_capacity(BUF_SIZE), - dste: 0, - dsts: 0, - } - } - - /// Mutable reference to underlying reader, not advisable to modify during reading. - pub fn get_ref_mut(&mut self) -> &mut R { - &mut self.inner - } - - // Reference to underlying reader - pub fn get_ref(&self) -> &R { - &self.inner - } - - // Read data from intermediate output buffer holding compressed output. - // It's unknown if the output from igzip will fit into buffer passed during read - // so we hold it here and empty as read calls pass. - // thanks to: https://github.com/BurntSushi/rust-snappy/blob/f9eb8d49c713adc48732fb95682a201a7b74d39a/src/read.rs#L327 - #[inline(always)] - fn read_from_out_buf(&mut self, buf: &mut [u8]) -> usize { - let available_bytes = self.dste - self.dsts; - let count = std::cmp::min(available_bytes, buf.len()); - buf[..count].copy_from_slice(&self.out_buf[self.dsts..self.dsts + count]); - self.dsts += count; - count - } - } - - impl io::Read for Decoder { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - // Check if there is data left in out_buf, otherwise refill; if end state, return 0 - let count = self.read_from_out_buf(buf); - if count > 0 { - Ok(count) - } else { - // Read out next buf len worth to compress; filling intermediate out_buf - self.zst.0.avail_in = self.inner.read(&mut self.in_buf)? as _; - self.zst.0.next_in = self.in_buf.as_mut_ptr(); - - let mut n_bytes = 0; - while self.zst.0.avail_in != 0 { - if self.zst.block_state() == isal::isal_block_state_ISAL_BLOCK_NEW_HDR { - // Read this member's gzip header - let mut gz_hdr: MaybeUninit = MaybeUninit::uninit(); - unsafe { isal::isal_gzip_header_init(gz_hdr.as_mut_ptr()) }; - let mut gz_hdr = unsafe { gz_hdr.assume_init() }; - read_gzip_header(&mut self.zst.0, &mut gz_hdr)?; - } - - // decompress member - loop { - self.out_buf.resize(n_bytes + BUF_SIZE, 0); - - self.zst.0.next_out = - self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr(); - self.zst.0.avail_out = BUF_SIZE as _; - - self.zst.step_inflate()?; - - n_bytes += BUF_SIZE - self.zst.0.avail_out as usize; - - let state = self.zst.block_state(); - if state == isal::isal_block_state_ISAL_BLOCK_CODED - || state == isal::isal_block_state_ISAL_BLOCK_TYPE0 - || state == isal::isal_block_state_ISAL_BLOCK_HDR - || state == isal::isal_block_state_ISAL_BLOCK_FINISH - { - break; - } - } - if self.zst.0.block_state == isal::isal_block_state_ISAL_BLOCK_FINISH { - self.zst.reset(); - } - } - self.out_buf.truncate(n_bytes); - self.dste = n_bytes; - self.dsts = 0; - - Ok(self.read_from_out_buf(buf)) - } - } - } - - #[cfg(test)] - mod tests { - - use super::*; - use std::io::{self, Cursor}; - - fn gen_large_data() -> Vec { - (0..1_000_000) - .map(|_| b"oh what a beautiful morning, oh what a beautiful day!!".to_vec()) - .flat_map(|v| v) - .collect() - } - - #[test] - fn large_roundtrip() { - let input = gen_large_data(); - let mut encoder = Encoder::new(Cursor::new(&input), CompressionLevel::Three, true); - let mut output = vec![]; - - let n = io::copy(&mut encoder, &mut output).unwrap(); - assert!(n < input.len() as u64); - - let mut decoder = Decoder::new(Cursor::new(output)); - let mut decompressed = vec![]; - let nbytes = io::copy(&mut decoder, &mut decompressed).unwrap(); - - assert_eq!(nbytes as usize, input.len()); - } - - #[test] - fn basic_compress() -> Result<()> { - let input = b"hello, world!"; - let mut encoder = Encoder::new(Cursor::new(input), CompressionLevel::Three, true); - let mut output = vec![]; - - let n = io::copy(&mut encoder, &mut output)? as usize; - let decompressed = decompress(&output[..n])?; - - assert_eq!(input, decompressed.as_slice()); - Ok(()) - } - - #[test] - fn longer_compress() -> Result<()> { - // Build input which is greater than BUF_SIZE - let mut input = vec![]; - for chunk in std::iter::repeat(b"Hello, World!") { - input.extend_from_slice(chunk); - if input.len() > BUF_SIZE * 2 { - break; - } - } - - let mut encoder = Encoder::new(Cursor::new(&input), CompressionLevel::Three, true); - let mut output = vec![]; - - let n = io::copy(&mut encoder, &mut output)? as usize; - let decompressed = decompress(&output[..n])?; - - assert_eq!(input, decompressed.as_slice()); - Ok(()) - } - - #[test] - fn basic_decompress() -> Result<()> { - let input = b"hello, world!"; - let compressed = compress(Cursor::new(input), CompressionLevel::Three, true)?; - - let mut decoder = Decoder::new(compressed.as_slice()); - let mut decompressed = vec![]; - - let n = io::copy(&mut decoder, &mut decompressed)? as usize; - assert_eq!(n, decompressed.len()); - assert_eq!(input, decompressed.as_slice()); - Ok(()) - } - - #[test] - fn longer_decompress() -> Result<()> { - // Build input which is greater than BUF_SIZE - let mut input = vec![]; - for chunk in std::iter::repeat(b"Hello, World!") { - input.extend_from_slice(chunk); - if input.len() > BUF_SIZE * 3 { - break; - } - } - - let compressed = compress(input.as_slice(), CompressionLevel::Three, true)?; - - let mut decoder = Decoder::new(compressed.as_slice()); - let mut decompressed = vec![]; - - let n = io::copy(&mut decoder, &mut decompressed)? as usize; - assert_eq!(n, decompressed.len()); - assert_eq!(input, decompressed.as_slice()); - - Ok(()) - } - } -} - pub enum ZStreamKind { Stateful, Stateless, @@ -533,45 +244,6 @@ impl ZStream { } } -/// Compress `input` directly into `output`. This is the fastest possible compression available. -#[inline(always)] -pub fn compress_into( - input: &[u8], - output: &mut [u8], - level: CompressionLevel, - is_gzip: bool, -) -> Result { - let mut zstream = ZStream::new(level, ZStreamKind::Stateless); - - zstream.stream.flush = FlushFlags::NoFlush as _; - zstream.stream.gzip_flag = is_gzip as _; - zstream.stream.end_of_stream = 1; - - // read input into buffer - zstream.stream.avail_in = input.len() as _; - zstream.stream.next_in = input.as_ptr() as *mut _; - - // compress this block in its entirety - zstream.stream.avail_out = output.len() as _; - zstream.stream.next_out = output.as_mut_ptr(); - - zstream.deflate()?; - Ok(zstream.stream.total_out as _) -} - -/// Compress `input` -#[inline(always)] -pub fn compress( - input: R, - level: CompressionLevel, - is_gzip: bool, -) -> Result> { - let mut out = vec![]; - let mut encoder = read::Encoder::new(input, level, is_gzip); - io::copy(&mut encoder, &mut out)?; - Ok(out) -} - pub struct InflateState(isal::inflate_state); impl InflateState { @@ -627,29 +299,6 @@ pub fn read_gzip_header( } } -#[inline(always)] -pub fn decompress(input: R) -> Result> { - let mut out = vec![]; - let mut decoder = read::Decoder::new(input); - io::copy(&mut decoder, &mut out)?; - Ok(out) -} - -#[inline(always)] -pub fn decompress_into(input: &[u8], output: &mut [u8]) -> Result { - let mut zst = InflateState::new(); - zst.0.avail_in = input.len() as _; - zst.0.next_in = input.as_ptr() as *mut _; - zst.0.crc_flag = 1; - - zst.0.avail_out = output.len() as _; - zst.0.next_out = output.as_mut_ptr(); - - zst.inflate_stateless()?; - - Ok(zst.0.total_out as _) -} - #[cfg(test)] mod tests { diff --git a/src/igzip/read.rs b/src/igzip/read.rs new file mode 100644 index 0000000..b891474 --- /dev/null +++ b/src/igzip/read.rs @@ -0,0 +1,348 @@ +use crate::igzip::*; +use mem::MaybeUninit; +use std::io; + +/// Streaming compression for input streams implementing `std::io::Read`. +/// +/// Notes +/// ----- +/// One should consider using `crate::igzip::compress` or `crate::igzip::compress_into` if possible. +/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. +/// +/// Example +/// ------- +/// ``` +/// use std::{io, io::Read}; +/// use isal::igzip::{read::Encoder, CompressionLevel, decompress}; +/// let data = b"Hello, World!".to_vec(); +/// +/// let mut encoder = Encoder::new(data.as_slice(), CompressionLevel::Three, true); +/// let mut compressed = vec![]; +/// +/// // Numbeer of compressed bytes written to `output` +/// let n = io::copy(&mut encoder, &mut compressed).unwrap(); +/// assert_eq!(n as usize, compressed.len()); +/// +/// let decompressed = decompress(io::Cursor::new(compressed)).unwrap(); +/// assert_eq!(decompressed.as_slice(), data); +/// ``` +pub struct Encoder { + inner: R, + stream: ZStream, + in_buf: [u8; BUF_SIZE], + out_buf: Vec, + dsts: usize, + dste: usize, +} + +impl Encoder { + /// Create a new `Encoder` which implements the `std::io::Read` trait. + pub fn new(reader: R, level: CompressionLevel, is_gzip: bool) -> Encoder { + let in_buf = [0_u8; BUF_SIZE]; + let out_buf = Vec::with_capacity(BUF_SIZE); + + let mut zstream = ZStream::new(level, ZStreamKind::Stateful); + + zstream.stream.end_of_stream = 0; + zstream.stream.flush = FlushFlags::SyncFlush as _; + zstream.stream.gzip_flag = is_gzip as _; + + Self { + inner: reader, + stream: zstream, + in_buf, + out_buf, + dste: 0, + dsts: 0, + } + } + + /// Mutable reference to underlying reader, not advisable to modify during reading. + pub fn get_ref_mut(&mut self) -> &mut R { + &mut self.inner + } + + // Reference to underlying reader + pub fn get_ref(&self) -> &R { + &self.inner + } + + // Read data from intermediate output buffer holding compressed output. + // It's unknown if the output from igzip will fit into buffer passed during read + // so we hold it here and empty as read calls pass. + // thanks to: https://github.com/BurntSushi/rust-snappy/blob/f9eb8d49c713adc48732fb95682a201a7b74d39a/src/read.rs#L327 + #[inline(always)] + fn read_from_out_buf(&mut self, buf: &mut [u8]) -> usize { + let available_bytes = self.dste - self.dsts; + let count = std::cmp::min(available_bytes, buf.len()); + buf[..count].copy_from_slice(&self.out_buf[self.dsts..self.dsts + count]); + self.dsts += count; + count + } +} + +impl io::Read for Encoder { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + // Check if there is data left in out_buf, otherwise refill; if end state, return 0 + let count = self.read_from_out_buf(buf); + if count > 0 { + Ok(count) + } else if self.stream.stream.internal_state.state != isal::isal_zstate_state_ZSTATE_END { + // Read out next buf len worth to compress; filling intermediate out_buf + self.stream.stream.avail_in = self.inner.read(&mut self.in_buf)? as _; + if self.stream.stream.avail_in < self.in_buf.len() as _ { + self.stream.stream.end_of_stream = 1; + } + self.stream.stream.next_in = self.in_buf.as_mut_ptr(); + + let mut n_bytes = 0; + self.out_buf.truncate(0); + + // compress this chunk into out_buf + while self.stream.stream.avail_in > 0 { + self.out_buf.resize(self.out_buf.len() + BUF_SIZE, 0); + + self.stream.stream.avail_out = BUF_SIZE as _; + self.stream.stream.next_out = + self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr(); + + self.stream.deflate()?; + + n_bytes += BUF_SIZE - self.stream.stream.avail_out as usize; + } + self.out_buf.truncate(n_bytes); + self.dste = n_bytes; + self.dsts = 0; + + Ok(self.read_from_out_buf(buf)) + } else { + Ok(0) + } + } +} + +/// Streaming compression for input streams implementing `std::io::Read`. +/// +/// Notes +/// ----- +/// One should consider using `crate::igzip::decompress` or `crate::igzip::decompress_into` if possible. +/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. +/// +/// Example +/// ------- +/// ``` +/// use std::{io, io::Read}; +/// use isal::igzip::{read::Decoder, CompressionLevel, compress}; +/// let data = b"Hello, World!".to_vec(); +/// +/// let compressed = compress(data.as_slice(), CompressionLevel::Three, true).unwrap(); +/// let mut decoder = Decoder::new(compressed.as_slice()); +/// let mut decompressed = vec![]; +/// +/// // Numbeer of compressed bytes written to `output` +/// let n = io::copy(&mut decoder, &mut decompressed).unwrap(); +/// assert_eq!(n as usize, data.len()); +/// assert_eq!(decompressed.as_slice(), data); +/// ``` +pub struct Decoder { + inner: R, + zst: InflateState, + in_buf: [u8; BUF_SIZE], + out_buf: Vec, + dsts: usize, + dste: usize, +} + +impl Decoder { + pub fn new(reader: R) -> Decoder { + let mut zst = InflateState::new(); + zst.0.crc_flag = isal::IGZIP_GZIP; + + Self { + inner: reader, + zst, + in_buf: [0u8; BUF_SIZE], + out_buf: Vec::with_capacity(BUF_SIZE), + dste: 0, + dsts: 0, + } + } + + /// Mutable reference to underlying reader, not advisable to modify during reading. + pub fn get_ref_mut(&mut self) -> &mut R { + &mut self.inner + } + + // Reference to underlying reader + pub fn get_ref(&self) -> &R { + &self.inner + } + + // Read data from intermediate output buffer holding compressed output. + // It's unknown if the output from igzip will fit into buffer passed during read + // so we hold it here and empty as read calls pass. + // thanks to: https://github.com/BurntSushi/rust-snappy/blob/f9eb8d49c713adc48732fb95682a201a7b74d39a/src/read.rs#L327 + #[inline(always)] + fn read_from_out_buf(&mut self, buf: &mut [u8]) -> usize { + let available_bytes = self.dste - self.dsts; + let count = std::cmp::min(available_bytes, buf.len()); + buf[..count].copy_from_slice(&self.out_buf[self.dsts..self.dsts + count]); + self.dsts += count; + count + } +} + +impl io::Read for Decoder { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + // Check if there is data left in out_buf, otherwise refill; if end state, return 0 + let count = self.read_from_out_buf(buf); + if count > 0 { + Ok(count) + } else { + // Read out next buf len worth to compress; filling intermediate out_buf + self.zst.0.avail_in = self.inner.read(&mut self.in_buf)? as _; + self.zst.0.next_in = self.in_buf.as_mut_ptr(); + + let mut n_bytes = 0; + while self.zst.0.avail_in != 0 { + if self.zst.block_state() == isal::isal_block_state_ISAL_BLOCK_NEW_HDR { + // Read this member's gzip header + let mut gz_hdr: MaybeUninit = MaybeUninit::uninit(); + unsafe { isal::isal_gzip_header_init(gz_hdr.as_mut_ptr()) }; + let mut gz_hdr = unsafe { gz_hdr.assume_init() }; + read_gzip_header(&mut self.zst.0, &mut gz_hdr)?; + } + + // decompress member + loop { + self.out_buf.resize(n_bytes + BUF_SIZE, 0); + + self.zst.0.next_out = self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr(); + self.zst.0.avail_out = BUF_SIZE as _; + + self.zst.step_inflate()?; + + n_bytes += BUF_SIZE - self.zst.0.avail_out as usize; + + let state = self.zst.block_state(); + if state == isal::isal_block_state_ISAL_BLOCK_CODED + || state == isal::isal_block_state_ISAL_BLOCK_TYPE0 + || state == isal::isal_block_state_ISAL_BLOCK_HDR + || state == isal::isal_block_state_ISAL_BLOCK_FINISH + { + break; + } + } + if self.zst.0.block_state == isal::isal_block_state_ISAL_BLOCK_FINISH { + self.zst.reset(); + } + } + self.out_buf.truncate(n_bytes); + self.dste = n_bytes; + self.dsts = 0; + + Ok(self.read_from_out_buf(buf)) + } + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use std::io::{self, Cursor}; + + fn gen_large_data() -> Vec { + (0..1_000_000) + .map(|_| b"oh what a beautiful morning, oh what a beautiful day!!".to_vec()) + .flat_map(|v| v) + .collect() + } + + #[test] + fn large_roundtrip() { + let input = gen_large_data(); + let mut encoder = Encoder::new(Cursor::new(&input), CompressionLevel::Three, true); + let mut output = vec![]; + + let n = io::copy(&mut encoder, &mut output).unwrap(); + assert!(n < input.len() as u64); + + let mut decoder = Decoder::new(Cursor::new(output)); + let mut decompressed = vec![]; + let nbytes = io::copy(&mut decoder, &mut decompressed).unwrap(); + + assert_eq!(nbytes as usize, input.len()); + } + + #[test] + fn basic_compress() -> Result<()> { + let input = b"hello, world!"; + let mut encoder = Encoder::new(Cursor::new(input), CompressionLevel::Three, true); + let mut output = vec![]; + + let n = io::copy(&mut encoder, &mut output)? as usize; + let decompressed = decompress(&output[..n])?; + + assert_eq!(input, decompressed.as_slice()); + Ok(()) + } + + #[test] + fn longer_compress() -> Result<()> { + // Build input which is greater than BUF_SIZE + let mut input = vec![]; + for chunk in std::iter::repeat(b"Hello, World!") { + input.extend_from_slice(chunk); + if input.len() > BUF_SIZE * 2 { + break; + } + } + + let mut encoder = Encoder::new(Cursor::new(&input), CompressionLevel::Three, true); + let mut output = vec![]; + + let n = io::copy(&mut encoder, &mut output)? as usize; + let decompressed = decompress(&output[..n])?; + + assert_eq!(input, decompressed.as_slice()); + Ok(()) + } + + #[test] + fn basic_decompress() -> Result<()> { + let input = b"hello, world!"; + let compressed = compress(Cursor::new(input), CompressionLevel::Three, true)?; + + let mut decoder = Decoder::new(compressed.as_slice()); + let mut decompressed = vec![]; + + let n = io::copy(&mut decoder, &mut decompressed)? as usize; + assert_eq!(n, decompressed.len()); + assert_eq!(input, decompressed.as_slice()); + Ok(()) + } + + #[test] + fn longer_decompress() -> Result<()> { + // Build input which is greater than BUF_SIZE + let mut input = vec![]; + for chunk in std::iter::repeat(b"Hello, World!") { + input.extend_from_slice(chunk); + if input.len() > BUF_SIZE * 3 { + break; + } + } + + let compressed = compress(input.as_slice(), CompressionLevel::Three, true)?; + + let mut decoder = Decoder::new(compressed.as_slice()); + let mut decompressed = vec![]; + + let n = io::copy(&mut decoder, &mut decompressed)? as usize; + assert_eq!(n, decompressed.len()); + assert_eq!(input, decompressed.as_slice()); + + Ok(()) + } +} diff --git a/src/igzip/write.rs b/src/igzip/write.rs new file mode 100644 index 0000000..4238494 --- /dev/null +++ b/src/igzip/write.rs @@ -0,0 +1,206 @@ +use crate::igzip::*; +use std::io; + +/// Streaming compression for input streams implementing `std::io::Read`. +/// +/// Notes +/// ----- +/// One should consider using `crate::igzip::compress` or `crate::igzip::compress_into` if possible. +/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. +/// +/// Example +/// ------- +/// ``` +/// use std::{io, io::Write}; +/// use isal::igzip::{write::Encoder, CompressionLevel, decompress}; +/// +/// let data = b"Hello, World!".to_vec(); +/// let mut compressed = vec![]; +/// +/// let mut encoder = Encoder::new(&mut compressed, CompressionLevel::Three, true); +/// +/// // Numbeer of compressed bytes written to `output` +/// io::copy(&mut io::Cursor::new(&data), &mut encoder).unwrap(); +/// +/// // call .flush to finish the stream +/// encoder.flush().unwrap(); +/// +/// let decompressed = decompress(io::Cursor::new(&compressed)).unwrap(); +/// assert_eq!(decompressed.as_slice(), data); +/// +/// ``` +pub struct Encoder { + inner: W, + stream: ZStream, + out_buf: Vec, + dsts: usize, + dste: usize, + total_in: usize, + total_out: usize, +} + +impl Encoder { + /// Create a new `Encoder` which implements the `std::io::Read` trait. + pub fn new(writer: W, level: CompressionLevel, is_gzip: bool) -> Encoder { + let out_buf = Vec::with_capacity(BUF_SIZE); + + let mut zstream = ZStream::new(level, ZStreamKind::Stateful); + + zstream.stream.end_of_stream = 0; + zstream.stream.flush = FlushFlags::NoFlush as _; + zstream.stream.gzip_flag = is_gzip as _; + + Self { + inner: writer, + stream: zstream, + out_buf, + dste: 0, + dsts: 0, + total_in: 0, + total_out: 0, + } + } + + /// Mutable reference to underlying reader, not advisable to modify during reading. + pub fn get_ref_mut(&mut self) -> &mut W { + &mut self.inner + } + + // Reference to underlying reader + pub fn get_ref(&self) -> &W { + &self.inner + } + + #[inline(always)] + fn write_from_out_buf(&mut self) -> io::Result { + let count = self.dste - self.dsts; + self.inner + .write_all(&mut self.out_buf[self.dsts..self.dste])?; + self.out_buf.truncate(0); + self.dsts = 0; + self.dste = 0; + Ok(count) + } + + pub fn total_out(&self) -> usize { + self.stream.stream.total_out as usize + self.total_out + } + pub fn total_in(&self) -> usize { + self.stream.stream.total_in as usize + self.total_in + } +} + +impl io::Write for Encoder { + fn write(&mut self, buf: &[u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + self.stream.stream.avail_in = buf.len() as _; + self.stream.stream.next_in = buf.as_ptr() as *mut _; + + while self.stream.stream.avail_in > 0 { + self.out_buf.resize(self.dste + BUF_SIZE, 0); + + self.stream.stream.avail_out = BUF_SIZE as _; + self.stream.stream.next_out = + self.out_buf[self.dste..self.dste + BUF_SIZE].as_mut_ptr(); + + self.stream.deflate()?; + + self.dste += BUF_SIZE - self.stream.stream.avail_out as usize; + } + + self.write_from_out_buf()?; + + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + // Write footer and flush to inner + self.stream.stream.end_of_stream = 1; + self.stream.stream.flush = FlushFlags::FullFlush as _; + while self.stream.stream.internal_state.state != isal::isal_zstate_state_ZSTATE_END { + self.out_buf.resize(self.dste + BUF_SIZE, 0); + self.stream.stream.avail_out = BUF_SIZE as _; + self.stream.stream.next_out = + self.out_buf[self.dste..self.dste + BUF_SIZE].as_mut_ptr(); + self.stream.deflate()?; + self.dste += BUF_SIZE - self.stream.stream.avail_out as usize; + } + self.write_from_out_buf()?; + self.inner.flush()?; + + // Prep for next stream should user call 'write' again after flush. + // needs to store total_in/out separately as checksum is calculated + // from these values per stream + self.total_in += self.stream.stream.total_in as usize; + self.total_out += self.stream.stream.total_out as usize; + unsafe { isal::isal_deflate_reset(&mut self.stream.stream) }; + + // TODO: when doing something other than gzip, or flush flags on init + // store these and re-initialize + self.stream.stream.flush = FlushFlags::NoFlush as _; + self.stream.stream.end_of_stream = 0; + self.stream.stream.gzip_flag = 1; + Ok(()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use std::io::Write; + + fn gen_large_data() -> Vec { + (0..1_000_000) + .map(|_| b"oh what a beautiful morning, oh what a beautiful day!!".to_vec()) + .flat_map(|v| v) + .collect() + } + + #[test] + fn test_encoder_basic() { + let data = gen_large_data(); + + let mut compressed = vec![]; + let mut encoder = Encoder::new(&mut compressed, CompressionLevel::Three, true); + let nbytes = io::copy(&mut io::Cursor::new(&data), &mut encoder).unwrap(); + + // Footer isn't written until .flush is called + let before_flush_bytes_out = encoder.total_out(); + encoder.flush().unwrap(); + let after_flush_bytes_out = encoder.total_out(); + assert!(before_flush_bytes_out < after_flush_bytes_out); + + // nbytes read equals data lenth; compressed is between 0 and data length + assert_eq!(nbytes, data.len() as _); + assert!(compressed.len() > 0); + assert!(compressed.len() < data.len()); + + // total out after flush should equal compressed length + assert_eq!(after_flush_bytes_out, compressed.len()); + + // and can be decompressed + let decompressed = crate::igzip::decompress(io::Cursor::new(&compressed)).unwrap(); + assert_eq!(decompressed, data); + } + + #[test] + fn test_encoder_multi_stream() { + let first = b"foo"; + let second = b"bar"; + + let mut compressed = vec![]; + let mut encoder = Encoder::new(&mut compressed, CompressionLevel::Three, true); + + encoder.write_all(first).unwrap(); + encoder.flush().unwrap(); + assert_eq!(encoder.total_in(), first.len()); + + encoder.write_all(second).unwrap(); + encoder.flush().unwrap(); + assert_eq!(encoder.total_in(), first.len() + second.len()); + + let decompressed = crate::igzip::decompress(io::Cursor::new(&compressed)).unwrap(); + assert_eq!(&decompressed, b"foobar"); + } +} From 61dc314ac6f48674bb66afe2837987a065eb9534 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 23 Sep 2024 10:26:39 +0200 Subject: [PATCH 4/7] Small refactors, docs --- src/igzip/write.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/igzip/write.rs b/src/igzip/write.rs index 4238494..59daa55 100644 --- a/src/igzip/write.rs +++ b/src/igzip/write.rs @@ -1,5 +1,6 @@ use crate::igzip::*; use std::io; +use std::io::Write; /// Streaming compression for input streams implementing `std::io::Read`. /// @@ -82,9 +83,18 @@ impl Encoder { Ok(count) } + /// Call flush and return the inner writer + pub fn finish(mut self) -> io::Result { + self.flush()?; + Ok(self.inner) + } + + /// total bytes written to the writer, inclusive of all streams if `flush` has been called before pub fn total_out(&self) -> usize { self.stream.stream.total_out as usize + self.total_out } + + /// total bytes processed, inclusive of all streams if `flush` has been called before pub fn total_in(&self) -> usize { self.stream.stream.total_in as usize + self.total_in } @@ -181,7 +191,7 @@ pub mod tests { // and can be decompressed let decompressed = crate::igzip::decompress(io::Cursor::new(&compressed)).unwrap(); - assert_eq!(decompressed, data); + assert_eq!(decompressed.len(), data.len()); } #[test] From 744ef1938d5f4669305861add38fc0fe007f4428 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 23 Sep 2024 13:17:53 +0200 Subject: [PATCH 5/7] impl write::Decoder --- src/igzip/write.rs | 156 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/src/igzip/write.rs b/src/igzip/write.rs index 59daa55..3d8850d 100644 --- a/src/igzip/write.rs +++ b/src/igzip/write.rs @@ -155,6 +155,128 @@ impl io::Write for Encoder { } } +/// Streaming compression for input streams implementing `std::io::Read`. +/// +/// Notes +/// ----- +/// One should consider using `crate::igzip::decompress` or `crate::igzip::decompress_into` if possible. +/// In that context, we do not need to hold and maintain intermediate buffers for reading and writing. +/// +/// Example +/// ------- +/// ``` +/// use std::{io, io::Write}; +/// use isal::igzip::{write::Decoder, CompressionLevel, compress}; +/// let data = b"Hello, World!".to_vec(); +/// +/// let compressed = compress(io::Cursor::new(data.as_slice()), CompressionLevel::Three, true).unwrap(); +/// +/// let mut decompressed = vec![]; +/// let mut decoder = Decoder::new(&mut decompressed); +/// +/// // Numbeer of compressed bytes written to `output` +/// let n = io::copy(&mut io::Cursor::new(&compressed), &mut decoder).unwrap(); +/// assert_eq!(n as usize, compressed.len()); +/// assert_eq!(decompressed.as_slice(), data); +/// ``` +pub struct Decoder { + inner: W, + zst: InflateState, + out_buf: Vec, + dsts: usize, + dste: usize, +} + +impl Decoder { + pub fn new(writer: W) -> Decoder { + let mut zst = InflateState::new(); + zst.0.crc_flag = isal::IGZIP_GZIP; + + Self { + inner: writer, + zst, + out_buf: Vec::with_capacity(BUF_SIZE), + dste: 0, + dsts: 0, + } + } + + /// Mutable reference to underlying reader, not advisable to modify during reading. + pub fn get_ref_mut(&mut self) -> &mut W { + &mut self.inner + } + + // Reference to underlying reader + pub fn get_ref(&self) -> &W { + &self.inner + } + + #[inline(always)] + fn write_from_out_buf(&mut self) -> io::Result { + let count = self.dste - self.dsts; + self.inner + .write_all(&mut self.out_buf[self.dsts..self.dste])?; + self.out_buf.truncate(0); + self.dsts = 0; + self.dste = 0; + Ok(count) + } +} + +impl io::Write for Decoder { + fn write(&mut self, buf: &[u8]) -> io::Result { + // Check if there is data left in out_buf, otherwise refill; if end state, return 0 + // Read out next buf len worth to compress; filling intermediate out_buf + self.zst.0.avail_in = buf.len() as _; + self.zst.0.next_in = buf.as_ptr() as *mut _; + + let mut n_bytes = 0; + while self.zst.0.avail_in > 0 { + if self.zst.block_state() == isal::isal_block_state_ISAL_BLOCK_NEW_HDR { + // Read this member's gzip header + let mut gz_hdr: mem::MaybeUninit = + mem::MaybeUninit::uninit(); + unsafe { isal::isal_gzip_header_init(gz_hdr.as_mut_ptr()) }; + let mut gz_hdr = unsafe { gz_hdr.assume_init() }; + read_gzip_header(&mut self.zst.0, &mut gz_hdr)?; + } + + // decompress member + loop { + self.out_buf.resize(n_bytes + BUF_SIZE, 0); + + self.zst.0.next_out = self.out_buf[n_bytes..n_bytes + BUF_SIZE].as_mut_ptr(); + self.zst.0.avail_out = BUF_SIZE as _; + + self.zst.step_inflate()?; + + n_bytes += BUF_SIZE - self.zst.0.avail_out as usize; + + let state = self.zst.block_state(); + if state == isal::isal_block_state_ISAL_BLOCK_CODED + || state == isal::isal_block_state_ISAL_BLOCK_TYPE0 + || state == isal::isal_block_state_ISAL_BLOCK_HDR + || state == isal::isal_block_state_ISAL_BLOCK_FINISH + { + break; + } + } + if self.zst.0.block_state == isal::isal_block_state_ISAL_BLOCK_FINISH { + self.zst.reset(); + } + } + self.out_buf.truncate(n_bytes); + self.dste = n_bytes; + self.dsts = 0; + self.write_from_out_buf()?; + + Ok(buf.len()) + } + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + #[cfg(test)] pub mod tests { use super::*; @@ -213,4 +335,38 @@ pub mod tests { let decompressed = crate::igzip::decompress(io::Cursor::new(&compressed)).unwrap(); assert_eq!(&decompressed, b"foobar"); } + + #[test] + fn test_decoder_basic() { + let data = gen_large_data(); + + let compressed = + crate::igzip::compress(io::Cursor::new(&data), CompressionLevel::Three, true).unwrap(); + + let mut decompressed = vec![]; + let mut decoder = Decoder::new(&mut decompressed); + let nbytes = io::copy(&mut io::Cursor::new(&compressed), &mut decoder).unwrap(); + assert_eq!(nbytes, compressed.len() as u64); + assert_eq!(decompressed.len(), data.len()); + } + + #[test] + fn test_decoder_multi_stream() { + let first = b"foo"; + let second = b"bar"; + + let mut compressed = + crate::igzip::compress(io::Cursor::new(&first), CompressionLevel::Three, true).unwrap(); + compressed.extend( + crate::igzip::compress(io::Cursor::new(&second), CompressionLevel::Three, true) + .unwrap(), + ); + + let mut decompressed = vec![]; + let mut decoder = Decoder::new(&mut decompressed); + + let nbytes = io::copy(&mut io::Cursor::new(&compressed), &mut decoder).unwrap(); + assert_eq!(nbytes, compressed.len() as _); + assert_eq!(&decompressed, b"foobar"); + } } From c1f6b212386ae47ced6c4700de7502ee012e6860 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 23 Sep 2024 13:19:18 +0200 Subject: [PATCH 6/7] Bump version --- Cargo.toml | 4 ++-- isal-sys/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c947823..ffae4b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "isal-rs" -version = "0.2.0+496255c" +version = "0.3.0+496255c" edition = "2021" description = "isa-l Rust bindings" license = "MIT" @@ -16,7 +16,7 @@ shared = ["isal-sys/shared"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -isal-sys = { path = "isal-sys", version = "0.2.0+496255c" } +isal-sys = { path = "isal-sys", version = "0.3.0+496255c" } [dev-dependencies] criterion = "0.3" diff --git a/isal-sys/Cargo.toml b/isal-sys/Cargo.toml index 37f4917..9846913 100644 --- a/isal-sys/Cargo.toml +++ b/isal-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "isal-sys" -version = "0.2.0+496255c" +version = "0.3.0+496255c" edition = "2021" description = "isa-l sys crate" license = "MIT" From a6356a1bf1a13fdd3f20f24ca5fcab3568892819 Mon Sep 17 00:00:00 2001 From: Miles Granger Date: Mon, 23 Sep 2024 13:32:45 +0200 Subject: [PATCH 7/7] Update README --- README.md | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 26b646b..901a07f 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,11 @@ -## Rust sys and wrapper crates for [isa-l](https://github.com/intel/isa-l) +# isal-rs -[![CI](https://github.com/milesgranger/isal-rs/actions/workflows/ci.yml/badge.svg)](https://github.com/milesgranger/isal-rs/actions/workflows/ci.yml) +[![CI](https://github.com/milesgranger/isal-rs/actions/workflows/CI.yml/badge.svg?branch=main)](https://github.com/milesgranger/isal-rs/actions/workflows/CI.yml) +[![Latest version](https://img.shields.io/crates/v/isal-rs.svg)](https://crates.io/crates/isal-rs) +[![Documentation](https://docs.rs/isal-rs/badge.svg)](https://docs.rs/isal-rs) +![License](https://img.shields.io/crates/l/isal-rs.svg) ---- - -### [isal-sys](./isal-sys/) - - _sys crate, raw Rust bindings to the isa-l library. [only bindings to lib_gzip]_ -### [isal-rs](./isal-rs/) - - _high level wrapper, including gzip/deflate API. [_WIP_]_ +Rust bindings for [isa-l](https://github.com/intel/isa-l) ---