From 97ee012b5f4c51627e23b4fbdbba93b1ba878859 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Wed, 15 Feb 2023 17:28:28 +0200 Subject: [PATCH 1/7] Rename zstd.rs to zstd_seekable_wrapper.rs Signed-off-by: Ariel Miculas --- compression/src/lib.rs | 4 ++-- compression/src/{zstd.rs => zstd_seekable_wrapper.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename compression/src/{zstd.rs => zstd_seekable_wrapper.rs} (100%) diff --git a/compression/src/lib.rs b/compression/src/lib.rs index 1d55b73..76f1db6 100644 --- a/compression/src/lib.rs +++ b/compression/src/lib.rs @@ -1,8 +1,8 @@ use std::fs; use std::io; -mod zstd; -pub use zstd::*; +mod zstd_seekable_wrapper; +pub use zstd_seekable_wrapper::*; pub trait Compressor: io::Write { fn end(&mut self) -> io::Result<()>; diff --git a/compression/src/zstd.rs b/compression/src/zstd_seekable_wrapper.rs similarity index 100% rename from compression/src/zstd.rs rename to compression/src/zstd_seekable_wrapper.rs From a31e058f5874ed851f3f3b844dca0da4ca0f19bf Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 16 Feb 2023 13:46:06 +0200 Subject: [PATCH 2/7] Add zstd compression Signed-off-by: Ariel Miculas --- Cargo.lock | 51 +++++++++ Cargo.toml | 3 +- builder/Cargo.toml | 1 + builder/src/lib.rs | 44 +++++--- common/Cargo.toml | 8 ++ common/src/lib.rs | 5 + compression/Cargo.toml | 3 + compression/src/lib.rs | 96 +++++------------ compression/src/noop.rs | 59 +++++++++++ compression/src/zstd_seekable_wrapper.rs | 38 ++++--- compression/src/zstd_wrapper.rs | 128 +++++++++++++++++++++++ exe/Cargo.toml | 1 + exe/src/main.rs | 5 +- extractor/src/lib.rs | 7 +- format/src/types.rs | 12 +++ oci/src/lib.rs | 14 +-- reader/src/walk.rs | 4 +- 17 files changed, 362 insertions(+), 117 deletions(-) create mode 100644 common/Cargo.toml create mode 100644 common/src/lib.rs create mode 100644 compression/src/noop.rs create mode 100644 compression/src/zstd_wrapper.rs diff --git a/Cargo.lock b/Cargo.lock index bb40f15..430105c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -79,6 +79,7 @@ name = "builder" version = "0.1.0" dependencies = [ "anyhow", + "common", "compression", "fastcdc", "fastrand", @@ -105,6 +106,9 @@ name = "cc" version = "1.0.66" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -149,11 +153,18 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "common" +version = "0.1.0" + [[package]] name = "compression" version = "0.1.0" dependencies = [ + "anyhow", + "common", "tempfile", + "zstd", "zstd-seekable", ] @@ -419,6 +430,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4217ad341ebadf8d8e724e264f13e593e0648f5b3e94b3896a5df283be015ecc" +[[package]] +name = "jobserver" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "068b1ee6743e4d11fb9c6a1e6064b3693a1b600e7f5f5988047d98b3dc9fb90b" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -708,6 +728,7 @@ dependencies = [ "assert_cmd", "builder", "clap", + "compression", "ctrlc", "daemonize", "dir-diff", @@ -1233,6 +1254,25 @@ dependencies = [ "syn", ] +[[package]] +name = "zstd" +version = "0.12.3+zstd.1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76eea132fb024e0e13fd9c2f5d5d595d8a967aa72382ac2f9d39fcc95afd0806" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "6.0.4+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7afb4b54b8910cf5447638cb54bf4e8a65cbedd783af98b98c62ffe91f185543" +dependencies = [ + "libc", + "zstd-sys", +] + [[package]] name = "zstd-seekable" version = "0.1.7" @@ -1245,3 +1285,14 @@ dependencies = [ "thiserror", "threadpool", ] + +[[package]] +name = "zstd-sys" +version = "2.0.7+zstd.1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94509c3ba2fe55294d752b79842c530ccfab760192521df74a081a78d2b3c7f5" +dependencies = [ + "cc", + "libc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index c0a97ca..93d1507 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ members = [ "reader", "compression", "extractor", - "fsverity_helpers" + "fsverity_helpers", + "common" ] diff --git a/builder/Cargo.toml b/builder/Cargo.toml index fb88c9c..a59b6b0 100644 --- a/builder/Cargo.toml +++ b/builder/Cargo.toml @@ -10,6 +10,7 @@ format = { path = "../format" } oci = { path = "../oci" } reader = { path = "../reader" } fsverity_helpers = { path = "../fsverity_helpers" } +common = { path = "../common" } walkdir = "2" serde_cbor = "*" fastcdc = "3.0.0" diff --git a/builder/src/lib.rs b/builder/src/lib.rs index f68d93b..e5b6a57 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -1,3 +1,5 @@ +use common::{AVG_CHUNK_SIZE, MAX_CHUNK_SIZE, MIN_CHUNK_SIZE}; +use compression::Compression; use fsverity_helpers::{ check_fs_verity, fsverity_enable, get_fs_verity_digest, InnerHashAlgorithm, FS_VERITY_BLOCK_SIZE_DEFAULT, @@ -30,12 +32,6 @@ use fastcdc::v2020::StreamCDC; mod filesystem; use filesystem::FilesystemStream; -// Quoting from https://github.com/ronomon/deduplication -// An average chunk size of 64 KB is recommended for optimal end-to-end deduplication and compression efficiency -const MIN_CHUNK_SIZE: u32 = 16 * 1024; -const AVG_CHUNK_SIZE: u32 = 64 * 1024; -const MAX_CHUNK_SIZE: u32 = 256 * 1024; - const PUZZLEFS_IMAGE_MANIFEST_VERSION: u64 = 1; fn walker(rootfs: &Path) -> WalkDir { @@ -78,7 +74,7 @@ struct Other { additional: Option, } -fn process_chunks( +fn process_chunks( oci: &Image, mut chunker: StreamCDC, files: &mut [File], @@ -98,7 +94,7 @@ fn process_chunks( let chunk = result.unwrap(); let mut chunk_used: u64 = 0; - let desc = oci.put_blob::<_, compression::Noop, media_types::Chunk>(&*chunk.data)?; + let desc = oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; let blob_kind = BlobRefKind::Other { digest: desc.digest.underlying(), }; @@ -155,7 +151,7 @@ fn inode_encoded_size(num_inodes: usize) -> usize { format::cbor_size_of_list_header(num_inodes) + num_inodes * format::INODE_WIRE_SIZE } -fn build_delta( +fn build_delta( rootfs: &Path, oci: &Image, mut existing: Option, @@ -345,7 +341,7 @@ fn build_delta( AVG_CHUNK_SIZE, MAX_CHUNK_SIZE, ); - process_chunks(oci, fcdc, &mut files, verity_data)?; + process_chunks::(oci, fcdc, &mut files, verity_data)?; // total inode serailized size let num_inodes = pfs_inodes.len() + dirs.len() + files.len() + others.len(); @@ -461,9 +457,9 @@ fn build_delta( Ok(desc) } -pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Result { +pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Result { let mut verity_data: VerityData = BTreeMap::new(); - let desc = build_delta(rootfs, oci, None, &mut verity_data)?; + let desc = build_delta::(rootfs, oci, None, &mut verity_data)?; let metadatas = [BlobRef { offset: 0, kind: BlobRefKind::Other { @@ -486,7 +482,7 @@ pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Result { // add_rootfs_delta adds whatever the delta between the current rootfs and the puzzlefs // representation from the tag is. -pub fn add_rootfs_delta( +pub fn add_rootfs_delta( rootfs_path: &Path, oci: Image, tag: &str, @@ -503,7 +499,7 @@ pub fn add_rootfs_delta( )); } - let desc = build_delta(rootfs_path, &oci, Some(pfs), &mut verity_data)?; + let desc = build_delta::(rootfs_path, &oci, Some(pfs), &mut verity_data)?; let br = BlobRef { kind: BlobRefKind::Other { digest: desc.digest.underlying(), @@ -568,7 +564,7 @@ pub fn enable_fs_verity(oci: Image, tag: &str, manifest_root_hash: &str) -> Resu // TODO: figure out how to guard this with #[cfg(test)] pub fn build_test_fs(path: &Path, image: &Image) -> Result { - build_initial_rootfs(path, image) + build_initial_rootfs::(path, image) } #[cfg(test)] @@ -580,10 +576,14 @@ pub mod tests { use tempfile::tempdir; use format::{DirList, InodeMode}; + use oci::Digest; use reader::WalkPuzzleFS; + use std::convert::TryFrom; use std::path::PathBuf; use tempfile::TempDir; + type DefaultCompression = compression::Noop; + #[test] fn test_fs_generation() { // TODO: verify the hash value here since it's only one thing? problem is as we change the @@ -609,6 +609,13 @@ pub mod tests { let md = fs::symlink_metadata(image.blob_path().join(FILE_DIGEST)).unwrap(); assert!(md.is_file()); + let mut decompressor = image + .open_compressed_blob::( + &Digest::try_from(FILE_DIGEST).unwrap(), + None, + ) + .unwrap(); + let metadata_digest = rootfs.metadatas[0].try_into().unwrap(); let mut blob = image.open_metadata_blob(&metadata_digest, None).unwrap(); let inodes = blob.read_inodes().unwrap(); @@ -637,7 +644,10 @@ pub mod tests { if let InodeMode::Reg { offset } = inodes[1].mode { let chunks = blob.read_file_chunks(offset).unwrap(); assert_eq!(chunks.len(), 1); - assert_eq!(chunks[0].len, md.len()); + assert_eq!( + chunks[0].len, + decompressor.get_uncompressed_length().unwrap() + ); } else { panic!("bad inode mode: {:?}", inodes[1].mode); } @@ -659,7 +669,7 @@ pub mod tests { ) .unwrap(); - let (desc, image) = add_rootfs_delta(&delta_dir, image, tag).unwrap(); + let (desc, image) = add_rootfs_delta::(&delta_dir, image, tag).unwrap(); let new_tag = "test2"; image.add_tag(new_tag, desc).unwrap(); let delta = image diff --git a/common/Cargo.toml b/common/Cargo.toml new file mode 100644 index 0000000..b7723d9 --- /dev/null +++ b/common/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "common" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/common/src/lib.rs b/common/src/lib.rs new file mode 100644 index 0000000..6d179f0 --- /dev/null +++ b/common/src/lib.rs @@ -0,0 +1,5 @@ +// Quoting from https://github.com/ronomon/deduplication +// An average chunk size of 64 KB is recommended for optimal end-to-end deduplication and compression efficiency +pub const MIN_CHUNK_SIZE: u32 = 16 * 1024; +pub const AVG_CHUNK_SIZE: u32 = 64 * 1024; +pub const MAX_CHUNK_SIZE: u32 = 256 * 1024; diff --git a/compression/Cargo.toml b/compression/Cargo.toml index 3d827d7..f30d388 100644 --- a/compression/Cargo.toml +++ b/compression/Cargo.toml @@ -8,6 +8,9 @@ edition = "2018" [dependencies] zstd-seekable = "*" +zstd = "0.12.3" +common = {path = "../common"} [dev-dependencies] tempfile = "*" +anyhow = "*" diff --git a/compression/src/lib.rs b/compression/src/lib.rs index 76f1db6..1e63e81 100644 --- a/compression/src/lib.rs +++ b/compression/src/lib.rs @@ -1,101 +1,61 @@ use std::fs; use std::io; -mod zstd_seekable_wrapper; -pub use zstd_seekable_wrapper::*; +mod noop; +pub use noop::Noop; + +mod zstd_wrapper; +pub use zstd_wrapper::*; pub trait Compressor: io::Write { - fn end(&mut self) -> io::Result<()>; + // https://users.rust-lang.org/t/how-to-move-self-when-using-dyn-trait/50123 + fn end(self: Box) -> io::Result<()>; } -impl Compressor for fs::File { - fn end(&mut self) -> io::Result<()> { - Ok(()) - } +pub trait Decompressor: io::Read + io::Seek + Send { + fn get_uncompressed_length(&mut self) -> io::Result; } -pub trait Decompressor: io::Read + io::Seek + Send {} - -impl Decompressor for fs::File {} - pub trait Compression { - fn compress(dest: fs::File) -> Box; - fn decompress(source: fs::File) -> Box; + fn compress(dest: fs::File) -> io::Result>; + fn decompress(source: fs::File) -> io::Result>; fn append_extension(media_type: &str) -> String; } -pub struct Noop {} - -impl Compression for Noop { - fn compress(dest: fs::File) -> Box { - Box::new(dest) - } - - fn decompress(source: fs::File) -> Box { - Box::new(source) - } - - fn append_extension(media_type: &str) -> String { - media_type.to_string() - } -} - #[cfg(test)] mod tests { use super::*; use tempfile::NamedTempFile; - const TRUTH: &str = "meshuggah rocks"; + pub const TRUTH: &str = "meshuggah rocks"; - pub fn compress_decompress_noop() { - let f = NamedTempFile::new().unwrap(); - let mut compressed = C::compress(f.reopen().unwrap()); - compressed.write_all(TRUTH.as_bytes()).unwrap(); - compressed.end().unwrap(); + pub fn compress_decompress() -> anyhow::Result<()> { + let f = NamedTempFile::new()?; + let mut compressed = C::compress(f.reopen()?)?; + compressed.write_all(TRUTH.as_bytes())?; + compressed.end()?; let mut buf = vec![0_u8; TRUTH.len()]; - let n = C::decompress(f.reopen().unwrap()).read(&mut buf).unwrap(); + let n = C::decompress(f.reopen()?)?.read(&mut buf)?; assert_eq!(n, TRUTH.len()); assert_eq!(TRUTH.as_bytes(), buf); + Ok(()) } - pub fn compression_is_seekable() { - let f = NamedTempFile::new().unwrap(); - let mut compressed = C::compress(f.reopen().unwrap()); - compressed.write_all(TRUTH.as_bytes()).unwrap(); - compressed.end().unwrap(); + pub fn compression_is_seekable() -> anyhow::Result<()> { + let f = NamedTempFile::new()?; + let mut compressed = C::compress(f.reopen()?)?; + compressed.write_all(TRUTH.as_bytes())?; + compressed.end()?; let mut buf = vec![0_u8; 1024]; - let mut decompressor = C::decompress(f.reopen().unwrap()); - decompressor - .seek(io::SeekFrom::Start("meshuggah ".len() as u64)) - .unwrap(); - let n = decompressor.read(&mut buf).unwrap(); + let mut decompressor = C::decompress(f.reopen()?)?; + decompressor.seek(io::SeekFrom::Start("meshuggah ".len() as u64))?; + let n = decompressor.read(&mut buf)?; assert_eq!(n, 5); assert_eq!("rocks".as_bytes(), &buf[0..5]); - } - - #[test] - fn test_noop_roundtrip() { - compress_decompress_noop::(); - } - - #[test] - fn test_noop_seekable() { - compression_is_seekable::(); - } - - #[test] - fn test_noop_is_noop() { - // shouldn't mangle the file content if in no-op mode - let f = NamedTempFile::new().unwrap(); - Noop::compress(f.reopen().unwrap()) - .write_all(TRUTH.as_bytes()) - .unwrap(); - - let content = fs::read_to_string(f.path()).unwrap(); - assert_eq!(TRUTH, content); + Ok(()) } } diff --git a/compression/src/noop.rs b/compression/src/noop.rs new file mode 100644 index 0000000..95e7090 --- /dev/null +++ b/compression/src/noop.rs @@ -0,0 +1,59 @@ +use crate::{Compression, Compressor, Decompressor}; +use std::fs; +use std::io; + +pub struct Noop {} + +impl Compressor for fs::File { + fn end(self: Box) -> io::Result<()> { + Ok(()) + } +} + +impl Decompressor for fs::File { + fn get_uncompressed_length(&mut self) -> io::Result { + Ok(self.metadata()?.len()) + } +} + +impl Compression for Noop { + fn compress(dest: fs::File) -> io::Result> { + Ok(Box::new(dest)) + } + + fn decompress(source: fs::File) -> io::Result> { + Ok(Box::new(source)) + } + + fn append_extension(media_type: &str) -> String { + media_type.to_string() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::{compress_decompress, compression_is_seekable, TRUTH}; + use tempfile::NamedTempFile; + + #[test] + fn test_noop_roundtrip() -> anyhow::Result<()> { + compress_decompress::() + } + + #[test] + fn test_noop_seekable() -> anyhow::Result<()> { + compression_is_seekable::() + } + + #[test] + fn test_noop_is_noop() -> anyhow::Result<()> { + // shouldn't mangle the file content if in no-op mode + let f = NamedTempFile::new()?; + Noop::compress(f.reopen()?)?.write_all(TRUTH.as_bytes())?; + + let content = fs::read_to_string(f.path())?; + assert_eq!(TRUTH, content); + Ok(()) + } +} diff --git a/compression/src/zstd_seekable_wrapper.rs b/compression/src/zstd_seekable_wrapper.rs index 6fd9ebd..0cf3119 100644 --- a/compression/src/zstd_seekable_wrapper.rs +++ b/compression/src/zstd_seekable_wrapper.rs @@ -29,7 +29,7 @@ pub struct ZstdCompressor { } impl Compressor for ZstdCompressor { - fn end(&mut self) -> io::Result<()> { + fn end(mut self: Box) -> io::Result<()> { let size = self.stream.end_stream(&mut self.buf).map_err(err_to_io)?; self.f.write_all(&self.buf[0..size]) } @@ -59,7 +59,11 @@ pub struct ZstdDecompressor { uncompressed_length: u64, } -impl Decompressor for ZstdDecompressor {} +impl Decompressor for ZstdDecompressor { + fn get_uncompressed_length(&mut self) -> io::Result { + Ok(self.uncompressed_length) + } +} impl io::Seek for ZstdDecompressor { fn seek(&mut self, offset: io::SeekFrom) -> io::Result { @@ -100,22 +104,22 @@ impl io::Read for ZstdDecompressor { } } -pub struct Zstd {} +pub struct ZstdSeekable {} -impl Compression for Zstd { - fn compress(dest: fs::File) -> Box { +impl Compression for ZstdSeekable { + fn compress(dest: fs::File) -> io::Result> { // a "pretty high" compression level, since decompression should be nearly the same no // matter what compression level. Maybe we should turn this to 22 or whatever the max is... - let stream = SeekableCStream::new(17, FRAME_SIZE).unwrap(); - Box::new(ZstdCompressor { + let stream = SeekableCStream::new(17, FRAME_SIZE).map_err(err_to_io)?; + Ok(Box::new(ZstdCompressor { f: dest, stream, buf: vec![0_u8; FRAME_SIZE], - }) + })) } - fn decompress(source: fs::File) -> Box { - let stream = Seekable::init(Box::new(source)).unwrap(); + fn decompress(source: fs::File) -> io::Result> { + let stream = Seekable::init(Box::new(source)).map_err(err_to_io)?; // zstd-seekable doesn't like it when we pass a buffer past the end of the uncompressed // stream, so let's figure out the size of the uncompressed file so we can implement @@ -123,11 +127,11 @@ impl Compression for Zstd { let uncompressed_length = (0..stream.get_num_frames()) .map(|i| stream.get_frame_decompressed_size(i) as u64) .sum(); - Box::new(ZstdDecompressor { + Ok(Box::new(ZstdDecompressor { stream, offset: 0, uncompressed_length, - }) + })) } fn append_extension(media_type: &str) -> String { @@ -138,15 +142,15 @@ impl Compression for Zstd { #[cfg(test)] mod tests { use super::*; - use crate::tests::{compress_decompress_noop, compression_is_seekable}; + use crate::tests::{compress_decompress, compression_is_seekable}; #[test] - fn test_ztsd_roundtrip() { - compress_decompress_noop::(); + fn test_ztsd_roundtrip() -> anyhow::Result<()> { + compress_decompress::() } #[test] - fn test_zstd_seekable() { - compression_is_seekable::(); + fn test_zstd_seekable() -> anyhow::Result<()> { + compression_is_seekable::() } } diff --git a/compression/src/zstd_wrapper.rs b/compression/src/zstd_wrapper.rs new file mode 100644 index 0000000..60dc793 --- /dev/null +++ b/compression/src/zstd_wrapper.rs @@ -0,0 +1,128 @@ +use common::MAX_CHUNK_SIZE; +use std::cmp::min; +use std::convert::TryFrom; +use std::convert::TryInto; +use std::fs; +use std::io; +use std::io::Read; + +use crate::{Compression, Compressor, Decompressor}; + +const COMPRESSION_LEVEL: i32 = 3; + +fn err_to_io(e: E) -> io::Error { + io::Error::new(io::ErrorKind::Other, e) +} + +pub struct ZstdCompressor { + encoder: zstd::stream::write::Encoder<'static, std::fs::File>, +} + +impl Compressor for ZstdCompressor { + fn end(self: Box) -> io::Result<()> { + self.encoder.finish()?; + Ok(()) + } +} + +impl io::Write for ZstdCompressor { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.encoder.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.encoder.flush() + } +} + +pub struct ZstdDecompressor { + buf: Vec, + offset: u64, + uncompressed_length: u64, +} + +impl Decompressor for ZstdDecompressor { + fn get_uncompressed_length(&mut self) -> io::Result { + Ok(self.uncompressed_length) + } +} + +impl io::Seek for ZstdDecompressor { + fn seek(&mut self, offset: io::SeekFrom) -> io::Result { + match offset { + io::SeekFrom::Start(s) => { + self.offset = s; + } + io::SeekFrom::End(e) => { + if e > 0 { + return Err(io::Error::new(io::ErrorKind::Other, "zstd seek past end")); + } + self.offset = self.uncompressed_length - u64::try_from(-e).map_err(err_to_io)?; + } + io::SeekFrom::Current(c) => { + if c > 0 { + self.offset += u64::try_from(c).map_err(err_to_io)?; + } else { + self.offset -= u64::try_from(-c).map_err(err_to_io)?; + } + } + } + Ok(self.offset) + } +} + +impl io::Read for ZstdDecompressor { + fn read(&mut self, out: &mut [u8]) -> io::Result { + let len = min( + out.len(), + (self.uncompressed_length - self.offset) + .try_into() + .map_err(err_to_io)?, + ); + let offset: usize = self.offset.try_into().map_err(err_to_io)?; + out[..len].copy_from_slice(&self.buf[offset..offset + len]); + Ok(len) + } +} +pub struct Zstd {} + +impl Compression for Zstd { + fn compress(dest: fs::File) -> io::Result> { + let encoder = zstd::stream::write::Encoder::new(dest, COMPRESSION_LEVEL)?; + Ok(Box::new(ZstdCompressor { encoder })) + } + + fn decompress(mut source: fs::File) -> io::Result> { + let mut contents = Vec::new(); + source.read_to_end(&mut contents)?; + let mut decompressor = zstd::bulk::Decompressor::new()?; + let decompressed_buffer = + decompressor.decompress(&contents, MAX_CHUNK_SIZE.try_into().map_err(err_to_io)?)?; + let uncompressed_length = decompressed_buffer.len(); + Ok(Box::new(ZstdDecompressor { + buf: decompressed_buffer, + offset: 0, + uncompressed_length: uncompressed_length.try_into().map_err(err_to_io)?, + })) + } + + fn append_extension(media_type: &str) -> String { + format!("{media_type}+zstd") + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::tests::{compress_decompress, compression_is_seekable}; + + #[test] + fn test_ztsd_roundtrip() -> anyhow::Result<()> { + compress_decompress::() + } + + #[test] + fn test_zstd_seekable() -> anyhow::Result<()> { + compression_is_seekable::() + } +} diff --git a/exe/Cargo.toml b/exe/Cargo.toml index 4e18751..4c80e22 100644 --- a/exe/Cargo.toml +++ b/exe/Cargo.toml @@ -20,6 +20,7 @@ oci = { path = "../oci" } reader = { path = "../reader" } extractor = { path = "../extractor" } fsverity_helpers = { path = "../fsverity_helpers" } +compression = { path = "../compression" } hex = "*" [dev-dependencies] diff --git a/exe/src/main.rs b/exe/src/main.rs index 42b238b..7ded09a 100644 --- a/exe/src/main.rs +++ b/exe/src/main.rs @@ -1,5 +1,6 @@ use builder::{add_rootfs_delta, build_initial_rootfs, enable_fs_verity}; use clap::{Args, Parser, Subcommand}; +use compression::Noop; use daemonize::Daemonize; use env_logger::Env; use extractor::extract_rootfs; @@ -153,12 +154,12 @@ fn main() -> anyhow::Result<()> { let image = Image::new(oci_dir)?; let new_image = match b.base_layer { Some(base_layer) => { - let (desc, image) = add_rootfs_delta(rootfs, image, &base_layer)?; + let (desc, image) = add_rootfs_delta::(rootfs, image, &base_layer)?; image.add_tag(&b.tag, desc)?; image } None => { - let desc = build_initial_rootfs(rootfs, &image)?; + let desc = build_initial_rootfs::(rootfs, &image)?; image.add_tag(&b.tag, desc)?; Arc::new(image) } diff --git a/extractor/src/lib.rs b/extractor/src/lib.rs index 92622c3..de5f55f 100644 --- a/extractor/src/lib.rs +++ b/extractor/src/lib.rs @@ -157,7 +157,6 @@ mod tests { use std::fs; use std::fs::File; - use builder::build_initial_rootfs; use builder::build_test_fs; use oci::Image; use std::collections::HashMap; @@ -195,7 +194,7 @@ mod tests { } } - let rootfs_desc = build_initial_rootfs(&rootfs, &image).unwrap(); + let rootfs_desc = build_test_fs(&rootfs, &image).unwrap(); image.add_tag("test", rootfs_desc).unwrap(); @@ -246,7 +245,7 @@ mod tests { std::fs::set_permissions(foo, Permissions::from_mode(TESTED_PERMISSION)).unwrap(); - let rootfs_desc = build_initial_rootfs(&rootfs, &image).unwrap(); + let rootfs_desc = build_test_fs(&rootfs, &image).unwrap(); image.add_tag("test", rootfs_desc).unwrap(); @@ -285,7 +284,7 @@ mod tests { fs::metadata(&bar).unwrap().ino() ); - let rootfs_desc = build_initial_rootfs(&rootfs, &image).unwrap(); + let rootfs_desc = build_test_fs(&rootfs, &image).unwrap(); image.add_tag("test", rootfs_desc).unwrap(); diff --git a/format/src/types.rs b/format/src/types.rs index 75ca810..601696d 100644 --- a/format/src/types.rs +++ b/format/src/types.rs @@ -21,6 +21,7 @@ use serde::de::Visitor; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::error::{Result, WireFormatError}; +use hex::FromHexError; mod cbor_helpers; use cbor_helpers::cbor_get_array_size; @@ -710,6 +711,17 @@ impl Serialize for Digest { } } +impl TryFrom<&str> for Digest { + type Error = FromHexError; + fn try_from(s: &str) -> std::result::Result { + let digest = hex::decode(s)?; + let digest: [u8; SHA256_BLOCK_SIZE] = digest + .try_into() + .map_err(|_| FromHexError::InvalidStringLength)?; + Ok(Digest(digest)) + } +} + impl TryFrom for Digest { type Error = WireFormatError; fn try_from(v: BlobRef) -> std::result::Result { diff --git a/oci/src/lib.rs b/oci/src/lib.rs index 7af8c49..911a729 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -88,11 +88,12 @@ impl Image { buf: R, ) -> Result { let tmp = NamedTempFile::new_in(&self.oci_dir)?; - let mut compressed = C::compress(tmp.reopen()?); + let mut compressed = C::compress(tmp.reopen()?)?; let mut hasher = Sha256::new(); let mut t = TeeReader::new(buf, &mut hasher); let size = io::copy(&mut t, &mut compressed)?; + compressed.end()?; let digest = hasher.finalize(); let media_type = C::append_extension(MT::name()); @@ -134,7 +135,7 @@ impl Image { verity: Option<&[u8]>, ) -> io::Result> { let f = self.open_raw_blob(digest, verity)?; - Ok(C::decompress(f)) + C::decompress(f) } pub fn open_metadata_blob( @@ -189,7 +190,7 @@ impl Image { } else { file_verity = None; } - let mut blob = self.open_raw_blob(digest, file_verity)?; + let mut blob = self.open_compressed_blob::(digest, file_verity)?; blob.seek(io::SeekFrom::Start(chunk.offset + addl_offset))?; let n = blob.read(buf)?; Ok(n) @@ -229,6 +230,7 @@ impl Image { mod tests { use super::*; use tempfile::tempdir; + type DefaultCompression = compression::Noop; #[test] fn test_put_blob_correct_hash() { @@ -257,7 +259,7 @@ mod tests { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); let mut desc = image - .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); desc.set_name("foo"); let mut index = Index::default(); @@ -275,10 +277,10 @@ mod tests { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); let desc1 = image - .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); let desc2 = image - .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); assert_eq!(desc1, desc2); } diff --git a/reader/src/walk.rs b/reader/src/walk.rs index 6aaecbe..9a4bafb 100644 --- a/reader/src/walk.rs +++ b/reader/src/walk.rs @@ -77,7 +77,7 @@ mod tests { use std::fs; use std::path::Path; - use builder::{build_initial_rootfs, build_test_fs}; + use builder::build_test_fs; use oci::Image; use super::*; @@ -126,7 +126,7 @@ mod tests { xattr::set(f, "user.meshuggah", b"rocks").unwrap(); } - let rootfs_desc = build_initial_rootfs(&rootfs, &image).unwrap(); + let rootfs_desc = build_test_fs(&rootfs, &image).unwrap(); image.add_tag("test", rootfs_desc).unwrap(); let mut pfs = PuzzleFS::open(image, "test", None).unwrap(); From 947b39192c55aa6a109bea515c4ef57235195c03 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 16 Mar 2023 15:53:53 +0200 Subject: [PATCH 3/7] Fix hash and fs-verity computation for compressed blobs Signed-off-by: Ariel Miculas --- Cargo.lock | 7 ------- builder/src/lib.rs | 13 ++++++++----- oci/Cargo.toml | 1 - oci/src/descriptor.rs | 2 +- oci/src/lib.rs | 27 ++++++++++++++++----------- 5 files changed, 25 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 430105c..c79fed4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -557,7 +557,6 @@ dependencies = [ "serde", "serde_json", "sha2", - "tee", "tempfile", ] @@ -963,12 +962,6 @@ dependencies = [ "time", ] -[[package]] -name = "tee" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c12559dba7383625faaff75be24becf35bfc885044375bcab931111799a3da" - [[package]] name = "tempfile" version = "3.2.0" diff --git a/builder/src/lib.rs b/builder/src/lib.rs index e5b6a57..a801468 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -94,12 +94,12 @@ fn process_chunks( let chunk = result.unwrap(); let mut chunk_used: u64 = 0; - let desc = oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; + let (desc, fs_verity_digest) = oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; let blob_kind = BlobRefKind::Other { digest: desc.digest.underlying(), }; - let verity_hash = get_fs_verity_digest(&chunk.data)?; + let verity_hash = fs_verity_digest; verity_data.insert(desc.digest.underlying(), verity_hash); while chunk_used < chunk.length as u64 { @@ -450,7 +450,7 @@ fn build_delta( md_buf.append(&mut files_buf); md_buf.append(&mut others_buf); - let desc = oci.put_blob::<_, compression::Noop, media_types::Inodes>(md_buf.as_slice())?; + let (desc, _) = oci.put_blob::<_, compression::Noop, media_types::Inodes>(md_buf.as_slice())?; let verity_hash = get_fs_verity_digest(md_buf.as_slice())?; verity_data.insert(desc.digest.underlying(), verity_hash); @@ -477,7 +477,9 @@ pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Resul manifest_version: PUZZLEFS_IMAGE_MANIFEST_VERSION, }, )?; - oci.put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice()) + Ok(oci + .put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice())? + .0) } // add_rootfs_delta adds whatever the delta between the current rootfs and the puzzlefs @@ -515,7 +517,8 @@ pub fn add_rootfs_delta( let mut rootfs_buf = Vec::new(); serde_cbor::to_writer(&mut rootfs_buf, &rootfs)?; Ok(( - oci.put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice())?, + oci.put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice())? + .0, oci, )) } diff --git a/oci/Cargo.toml b/oci/Cargo.toml index a44b06c..5594c57 100644 --- a/oci/Cargo.toml +++ b/oci/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" [dependencies] hex = "*" sha2 = "*" -tee = "*" tempfile = "*" compression = { path = "../compression" } format = { path = "../format" } diff --git a/oci/src/descriptor.rs b/oci/src/descriptor.rs index 63bf7e9..91940b8 100644 --- a/oci/src/descriptor.rs +++ b/oci/src/descriptor.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -pub use format::Digest; +pub use format::{Digest, SHA256_BLOCK_SIZE}; const NAME_ANNOTATION: &str = "org.opencontainers.image.ref.name"; diff --git a/oci/src/lib.rs b/oci/src/lib.rs index 911a729..cb3c58f 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -8,13 +8,13 @@ use std::io; use std::io::{Read, Seek}; use std::path::{Path, PathBuf}; +use fsverity_helpers::get_fs_verity_digest; use serde::{Deserialize, Serialize}; use sha2::{Digest as Sha2Digest, Sha256}; -use tee::TeeReader; use tempfile::NamedTempFile; use compression::{Compression, Decompressor}; -use format::{MetadataBlob, Result, Rootfs, VerityData, WireFormatError}; +use format::{MetadataBlob, Result, Rootfs, VerityData, WireFormatError, SHA256_BLOCK_SIZE}; use openat::Dir; use std::io::{Error, ErrorKind}; @@ -85,19 +85,23 @@ impl Image { pub fn put_blob( &self, - buf: R, - ) -> Result { - let tmp = NamedTempFile::new_in(&self.oci_dir)?; + mut buf: R, + ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE])> { + let mut tmp = NamedTempFile::new_in(&self.oci_dir)?; let mut compressed = C::compress(tmp.reopen()?)?; let mut hasher = Sha256::new(); - let mut t = TeeReader::new(buf, &mut hasher); - let size = io::copy(&mut t, &mut compressed)?; + let size = io::copy(&mut buf, &mut compressed)?; compressed.end()?; + let mut compressed_data = Vec::new(); + tmp.read_to_end(&mut compressed_data)?; + + hasher.update(&compressed_data[..]); let digest = hasher.finalize(); let media_type = C::append_extension(MT::name()); let descriptor = Descriptor::new(digest.into(), size, media_type); + let fs_verity_digest = get_fs_verity_digest(&compressed_data[..])?; let path = self.blob_path().join(descriptor.digest.to_string()); // avoid replacing the data blob so we don't drop fsverity data @@ -109,14 +113,15 @@ impl Image { if existing_digest != digest { return Err(Error::new( ErrorKind::AlreadyExists, - "blob already exists and it's not content addressable", + format!("blob already exists and it's not content addressable existing digest {}, new digest {}", + hex::encode(existing_digest), hex::encode(digest)) ) .into()); } } else { tmp.persist(path).map_err(|e| e.error)?; } - Ok(descriptor) + Ok((descriptor, fs_verity_digest)) } fn open_raw_blob(&self, digest: &Digest, verity: Option<&[u8]>) -> io::Result { @@ -236,7 +241,7 @@ mod tests { fn test_put_blob_correct_hash() { let dir = tempdir().unwrap(); let image: Image = Image::new(dir.path()).unwrap(); - let desc = image + let (desc, _) = image .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); @@ -258,7 +263,7 @@ mod tests { fn test_put_get_index() { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); - let mut desc = image + let (mut desc, _) = image .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); desc.set_name("foo"); From 1b901a9619dd55de84a1291a40898515127731e8 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Thu, 16 Feb 2023 21:55:32 +0200 Subject: [PATCH 4/7] Default to Zstd compression in puzzlefs image builds and in tests Signed-off-by: Ariel Miculas --- builder/src/lib.rs | 6 +++--- exe/src/main.rs | 6 +++--- oci/src/lib.rs | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/builder/src/lib.rs b/builder/src/lib.rs index a801468..b9fbcd6 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -567,7 +567,7 @@ pub fn enable_fs_verity(oci: Image, tag: &str, manifest_root_hash: &str) -> Resu // TODO: figure out how to guard this with #[cfg(test)] pub fn build_test_fs(path: &Path, image: &Image) -> Result { - build_initial_rootfs::(path, image) + build_initial_rootfs::(path, image) } #[cfg(test)] @@ -585,7 +585,7 @@ pub mod tests { use std::path::PathBuf; use tempfile::TempDir; - type DefaultCompression = compression::Noop; + type DefaultCompression = compression::Zstd; #[test] fn test_fs_generation() { @@ -607,7 +607,7 @@ pub mod tests { // there should be a blob that matches the hash of the test data, since it all gets input // as one chunk and there's only one file const FILE_DIGEST: &str = - "d9e749d9367fc908876749d6502eb212fee88c9a94892fb07da5ef3ba8bc39ed"; + "a7b1fbc3c77f9ffc40c051e3608d607d63eebcd23c559958043eccb64bdab7ff"; let md = fs::symlink_metadata(image.blob_path().join(FILE_DIGEST)).unwrap(); assert!(md.is_file()); diff --git a/exe/src/main.rs b/exe/src/main.rs index 7ded09a..328e26f 100644 --- a/exe/src/main.rs +++ b/exe/src/main.rs @@ -1,6 +1,6 @@ use builder::{add_rootfs_delta, build_initial_rootfs, enable_fs_verity}; use clap::{Args, Parser, Subcommand}; -use compression::Noop; +use compression::Zstd; use daemonize::Daemonize; use env_logger::Env; use extractor::extract_rootfs; @@ -154,12 +154,12 @@ fn main() -> anyhow::Result<()> { let image = Image::new(oci_dir)?; let new_image = match b.base_layer { Some(base_layer) => { - let (desc, image) = add_rootfs_delta::(rootfs, image, &base_layer)?; + let (desc, image) = add_rootfs_delta::(rootfs, image, &base_layer)?; image.add_tag(&b.tag, desc)?; image } None => { - let desc = build_initial_rootfs::(rootfs, &image)?; + let desc = build_initial_rootfs::(rootfs, &image)?; image.add_tag(&b.tag, desc)?; Arc::new(image) } diff --git a/oci/src/lib.rs b/oci/src/lib.rs index cb3c58f..5a83bbc 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -195,7 +195,7 @@ impl Image { } else { file_verity = None; } - let mut blob = self.open_compressed_blob::(digest, file_verity)?; + let mut blob = self.open_compressed_blob::(digest, file_verity)?; blob.seek(io::SeekFrom::Start(chunk.offset + addl_offset))?; let n = blob.read(buf)?; Ok(n) @@ -235,7 +235,7 @@ impl Image { mod tests { use super::*; use tempfile::tempdir; - type DefaultCompression = compression::Noop; + type DefaultCompression = compression::Zstd; #[test] fn test_put_blob_correct_hash() { From b79560bf82bbeebe6468b6036ad2bc3deaef2366 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Tue, 28 Mar 2023 22:32:18 +0300 Subject: [PATCH 5/7] Add support for optional compression via command line parameter Default to no compression in puzzlefs builds and zstd compression in tests. This commit adds a new field in BlobRef which stores whether the blob is compressed or not. Per blob information is useful if we want to skip compressing the blob in cases where the compressed version has a larger size than the uncompressed version (e.g. when the blob is already compressed or it has a high enough entropy that it cannot be compressed further). Signed-off-by: Ariel Miculas --- builder/src/lib.rs | 25 ++++++++++++++++++------- exe/src/main.rs | 16 +++++++++++++--- format/src/types.rs | 19 +++++++++++++++++-- oci/src/lib.rs | 22 ++++++++++++++++------ 4 files changed, 64 insertions(+), 18 deletions(-) diff --git a/builder/src/lib.rs b/builder/src/lib.rs index b9fbcd6..010d285 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -5,6 +5,7 @@ use fsverity_helpers::{ FS_VERITY_BLOCK_SIZE_DEFAULT, }; use oci::Digest; +use std::any::Any; use std::backtrace::Backtrace; use std::cmp::min; use std::collections::{BTreeMap, HashMap}; @@ -74,7 +75,7 @@ struct Other { additional: Option, } -fn process_chunks( +fn process_chunks( oci: &Image, mut chunker: StreamCDC, files: &mut [File], @@ -94,7 +95,8 @@ fn process_chunks( let chunk = result.unwrap(); let mut chunk_used: u64 = 0; - let (desc, fs_verity_digest) = oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; + let (desc, fs_verity_digest, compressed) = + oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; let blob_kind = BlobRefKind::Other { digest: desc.digest.underlying(), }; @@ -111,6 +113,7 @@ fn process_chunks( let blob = BlobRef { offset: chunk_used, kind: blob_kind, + compressed, }; file.as_mut() @@ -151,7 +154,7 @@ fn inode_encoded_size(num_inodes: usize) -> usize { format::cbor_size_of_list_header(num_inodes) + num_inodes * format::INODE_WIRE_SIZE } -fn build_delta( +fn build_delta( rootfs: &Path, oci: &Image, mut existing: Option, @@ -369,6 +372,7 @@ fn build_delta( Ok(BlobRef { offset: offset as u64, kind: BlobRefKind::Local, + compressed: false, }) }) .transpose()?; @@ -400,6 +404,7 @@ fn build_delta( Ok(BlobRef { offset: offset as u64, kind: BlobRefKind::Local, + compressed: false, }) }) .transpose()?; @@ -429,6 +434,7 @@ fn build_delta( Ok(BlobRef { offset: offset as u64, kind: BlobRefKind::Local, + compressed: false, }) }) .transpose()?; @@ -450,14 +456,18 @@ fn build_delta( md_buf.append(&mut files_buf); md_buf.append(&mut others_buf); - let (desc, _) = oci.put_blob::<_, compression::Noop, media_types::Inodes>(md_buf.as_slice())?; + let (desc, ..) = + oci.put_blob::<_, compression::Noop, media_types::Inodes>(md_buf.as_slice())?; let verity_hash = get_fs_verity_digest(md_buf.as_slice())?; verity_data.insert(desc.digest.underlying(), verity_hash); Ok(desc) } -pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Result { +pub fn build_initial_rootfs( + rootfs: &Path, + oci: &Image, +) -> Result { let mut verity_data: VerityData = BTreeMap::new(); let desc = build_delta::(rootfs, oci, None, &mut verity_data)?; let metadatas = [BlobRef { @@ -465,6 +475,7 @@ pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Resul kind: BlobRefKind::Other { digest: desc.digest.underlying(), }, + compressed: false, }] .to_vec(); @@ -484,7 +495,7 @@ pub fn build_initial_rootfs(rootfs: &Path, oci: &Image) -> Resul // add_rootfs_delta adds whatever the delta between the current rootfs and the puzzlefs // representation from the tag is. -pub fn add_rootfs_delta( +pub fn add_rootfs_delta( rootfs_path: &Path, oci: Image, tag: &str, @@ -493,7 +504,6 @@ pub fn add_rootfs_delta( let pfs = PuzzleFS::open(oci, tag, None)?; let oci = Arc::clone(&pfs.oci); let mut rootfs = oci.open_rootfs_blob::(tag, None)?; - if rootfs.manifest_version != PUZZLEFS_IMAGE_MANIFEST_VERSION { return Err(WireFormatError::InvalidImageVersion( rootfs.manifest_version.to_string(), @@ -507,6 +517,7 @@ pub fn add_rootfs_delta( digest: desc.digest.underlying(), }, offset: 0, + compressed: false, }; if !rootfs.metadatas.iter().any(|&x| x == br) { diff --git a/exe/src/main.rs b/exe/src/main.rs index 328e26f..87e2dc4 100644 --- a/exe/src/main.rs +++ b/exe/src/main.rs @@ -1,6 +1,6 @@ use builder::{add_rootfs_delta, build_initial_rootfs, enable_fs_verity}; use clap::{Args, Parser, Subcommand}; -use compression::Zstd; +use compression::{Noop, Zstd}; use daemonize::Daemonize; use env_logger::Env; use extractor::extract_rootfs; @@ -40,6 +40,8 @@ struct Build { tag: String, #[arg(short, long, value_name = "base-layer")] base_layer: Option, + #[arg(short, long, value_name = "compressed")] + compression: bool, } #[derive(Args)] @@ -154,12 +156,20 @@ fn main() -> anyhow::Result<()> { let image = Image::new(oci_dir)?; let new_image = match b.base_layer { Some(base_layer) => { - let (desc, image) = add_rootfs_delta::(rootfs, image, &base_layer)?; + let (desc, image) = if b.compression { + add_rootfs_delta::(rootfs, image, &base_layer)? + } else { + add_rootfs_delta::(rootfs, image, &base_layer)? + }; image.add_tag(&b.tag, desc)?; image } None => { - let desc = build_initial_rootfs::(rootfs, &image)?; + let desc = if b.compression { + build_initial_rootfs::(rootfs, &image)? + } else { + build_initial_rootfs::(rootfs, &image)? + }; image.add_tag(&b.tag, desc)?; Arc::new(image) } diff --git a/format/src/types.rs b/format/src/types.rs index 601696d..5fbe9f7 100644 --- a/format/src/types.rs +++ b/format/src/types.rs @@ -94,8 +94,11 @@ const BLOB_REF_SIZE: usize = 1 /* mode */ + 32 /* digest */ + 8 /* offset */; pub struct BlobRef { pub offset: u64, pub kind: BlobRefKind, + pub compressed: bool, } +const COMPRESSED_BIT: u8 = 1 << 7; + impl BlobRef { fn fixed_length_serialize(&self, state: &mut [u8; BLOB_REF_SIZE]) { state[0..8].copy_from_slice(&self.offset.to_le_bytes()); @@ -106,6 +109,10 @@ impl BlobRef { state[9..41].copy_from_slice(digest); } }; + // reuse state[8] for compression, since it only stores the BlobRefKind enum variant + if self.compressed { + state[8] |= COMPRESSED_BIT; + } } fn fixed_length_deserialize( @@ -113,7 +120,8 @@ impl BlobRef { ) -> std::result::Result { let offset = u64::from_le_bytes(state[0..8].try_into().unwrap()); - let kind = match state[8] { + let compressed = (state[8] & COMPRESSED_BIT) != 0; + let kind = match state[8] & !COMPRESSED_BIT { 0 => BlobRefKind::Local, 1 => BlobRefKind::Other { digest: state[9..41].try_into().unwrap(), @@ -126,7 +134,11 @@ impl BlobRef { } }; - Ok(BlobRef { offset, kind }) + Ok(BlobRef { + offset, + kind, + compressed, + }) } } @@ -522,6 +534,7 @@ mod tests { additional: Some(BlobRef { offset: 42, kind: BlobRefKind::Local, + compressed: false, }), }, ]; @@ -548,6 +561,7 @@ mod tests { let local = BlobRef { offset: 42, kind: BlobRefKind::Local, + compressed: true, }; blobref_roundtrip(local) } @@ -560,6 +574,7 @@ mod tests { let other = BlobRef { offset: 42, kind: BlobRefKind::Other { digest }, + compressed: true, }; blobref_roundtrip(other) } diff --git a/oci/src/lib.rs b/oci/src/lib.rs index 5a83bbc..4abb538 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -1,6 +1,7 @@ extern crate hex; use fsverity_helpers::check_fs_verity; +use std::any::Any; use std::backtrace::Backtrace; use std::convert::TryFrom; use std::fs; @@ -83,13 +84,18 @@ impl Image { PathBuf::from("blobs/sha256") } - pub fn put_blob( + pub fn put_blob( &self, mut buf: R, - ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE])> { + ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE], bool)> { let mut tmp = NamedTempFile::new_in(&self.oci_dir)?; let mut compressed = C::compress(tmp.reopen()?)?; let mut hasher = Sha256::new(); + // generics may not be the best way to implement compression, alternatives: + // trait objects, but they add runtime overhead + // an enum together with enum_dispatch + let compressed_blob = + std::any::TypeId::of::() != std::any::TypeId::of::(); let size = io::copy(&mut buf, &mut compressed)?; compressed.end()?; @@ -121,7 +127,7 @@ impl Image { } else { tmp.persist(path).map_err(|e| e.error)?; } - Ok((descriptor, fs_verity_digest)) + Ok((descriptor, fs_verity_digest, compressed_blob)) } fn open_raw_blob(&self, digest: &Digest, verity: Option<&[u8]>) -> io::Result { @@ -195,7 +201,11 @@ impl Image { } else { file_verity = None; } - let mut blob = self.open_compressed_blob::(digest, file_verity)?; + let mut blob = if chunk.compressed { + self.open_compressed_blob::(digest, file_verity)? + } else { + self.open_compressed_blob::(digest, file_verity)? + }; blob.seek(io::SeekFrom::Start(chunk.offset + addl_offset))?; let n = blob.read(buf)?; Ok(n) @@ -241,7 +251,7 @@ mod tests { fn test_put_blob_correct_hash() { let dir = tempdir().unwrap(); let image: Image = Image::new(dir.path()).unwrap(); - let (desc, _) = image + let (desc, ..) = image .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); @@ -263,7 +273,7 @@ mod tests { fn test_put_get_index() { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); - let (mut desc, _) = image + let (mut desc, ..) = image .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) .unwrap(); desc.set_name("foo"); From 92c3635d32a7f1339367d0bebe7734ea3b847152 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Wed, 29 Mar 2023 15:46:41 +0300 Subject: [PATCH 6/7] Replace File with generic read/write + seek traits in Compression The main reason for this change is to be able to use std::io::Cursor, an in-memory buffer which can be used as a replacement for File. This is useful because we want to compress blobs in-memory and only write them to disk if they take up less space than the uncompressed blob. Signed-off-by: Ariel Miculas --- builder/src/lib.rs | 17 +++++----- compression/src/lib.rs | 15 +++++---- compression/src/noop.rs | 55 +++++++++++++++++++++++++++------ compression/src/zstd_wrapper.rs | 17 +++++----- oci/src/lib.rs | 36 +++++++++++---------- 5 files changed, 90 insertions(+), 50 deletions(-) diff --git a/builder/src/lib.rs b/builder/src/lib.rs index 010d285..70323a6 100644 --- a/builder/src/lib.rs +++ b/builder/src/lib.rs @@ -75,7 +75,7 @@ struct Other { additional: Option, } -fn process_chunks( +fn process_chunks Compression<'a> + Any>( oci: &Image, mut chunker: StreamCDC, files: &mut [File], @@ -96,7 +96,7 @@ fn process_chunks( let mut chunk_used: u64 = 0; let (desc, fs_verity_digest, compressed) = - oci.put_blob::<_, C, media_types::Chunk>(&*chunk.data)?; + oci.put_blob::(&chunk.data)?; let blob_kind = BlobRefKind::Other { digest: desc.digest.underlying(), }; @@ -154,7 +154,7 @@ fn inode_encoded_size(num_inodes: usize) -> usize { format::cbor_size_of_list_header(num_inodes) + num_inodes * format::INODE_WIRE_SIZE } -fn build_delta( +fn build_delta Compression<'a> + Any>( rootfs: &Path, oci: &Image, mut existing: Option, @@ -456,15 +456,14 @@ fn build_delta( md_buf.append(&mut files_buf); md_buf.append(&mut others_buf); - let (desc, ..) = - oci.put_blob::<_, compression::Noop, media_types::Inodes>(md_buf.as_slice())?; + let (desc, ..) = oci.put_blob::(md_buf.as_slice())?; let verity_hash = get_fs_verity_digest(md_buf.as_slice())?; verity_data.insert(desc.digest.underlying(), verity_hash); Ok(desc) } -pub fn build_initial_rootfs( +pub fn build_initial_rootfs Compression<'a> + Any>( rootfs: &Path, oci: &Image, ) -> Result { @@ -489,13 +488,13 @@ pub fn build_initial_rootfs( }, )?; Ok(oci - .put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice())? + .put_blob::(rootfs_buf.as_slice())? .0) } // add_rootfs_delta adds whatever the delta between the current rootfs and the puzzlefs // representation from the tag is. -pub fn add_rootfs_delta( +pub fn add_rootfs_delta Compression<'a> + Any>( rootfs_path: &Path, oci: Image, tag: &str, @@ -528,7 +527,7 @@ pub fn add_rootfs_delta( let mut rootfs_buf = Vec::new(); serde_cbor::to_writer(&mut rootfs_buf, &rootfs)?; Ok(( - oci.put_blob::<_, compression::Noop, media_types::Rootfs>(rootfs_buf.as_slice())? + oci.put_blob::(rootfs_buf.as_slice())? .0, oci, )) diff --git a/compression/src/lib.rs b/compression/src/lib.rs index 1e63e81..2268bc0 100644 --- a/compression/src/lib.rs +++ b/compression/src/lib.rs @@ -1,5 +1,6 @@ -use std::fs; +#![feature(seek_stream_len)] use std::io; +use std::io::Seek; mod noop; pub use noop::Noop; @@ -16,9 +17,11 @@ pub trait Decompressor: io::Read + io::Seek + Send { fn get_uncompressed_length(&mut self) -> io::Result; } -pub trait Compression { - fn compress(dest: fs::File) -> io::Result>; - fn decompress(source: fs::File) -> io::Result>; +pub trait Compression<'a> { + fn compress(dest: W) -> io::Result>; + fn decompress( + source: R, + ) -> io::Result>; fn append_extension(media_type: &str) -> String; } @@ -29,7 +32,7 @@ mod tests { pub const TRUTH: &str = "meshuggah rocks"; - pub fn compress_decompress() -> anyhow::Result<()> { + pub fn compress_decompress Compression<'a>>() -> anyhow::Result<()> { let f = NamedTempFile::new()?; let mut compressed = C::compress(f.reopen()?)?; compressed.write_all(TRUTH.as_bytes())?; @@ -43,7 +46,7 @@ mod tests { Ok(()) } - pub fn compression_is_seekable() -> anyhow::Result<()> { + pub fn compression_is_seekable Compression<'a>>() -> anyhow::Result<()> { let f = NamedTempFile::new()?; let mut compressed = C::compress(f.reopen()?)?; compressed.write_all(TRUTH.as_bytes())?; diff --git a/compression/src/noop.rs b/compression/src/noop.rs index 95e7090..cac021e 100644 --- a/compression/src/noop.rs +++ b/compression/src/noop.rs @@ -1,28 +1,64 @@ use crate::{Compression, Compressor, Decompressor}; -use std::fs; use std::io; +use std::io::{Read, Seek, Write}; pub struct Noop {} -impl Compressor for fs::File { +pub struct NoopCompressor { + encoder: Box, +} + +impl io::Write for NoopCompressor { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.encoder.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.encoder.flush() + } +} + +impl Compressor for NoopCompressor { fn end(self: Box) -> io::Result<()> { Ok(()) } } -impl Decompressor for fs::File { +pub struct NoopDecompressor { + decoder: Box, +} + +impl Seek for NoopDecompressor { + fn seek(&mut self, offset: io::SeekFrom) -> io::Result { + self.decoder.seek(offset) + } +} + +impl Read for NoopDecompressor { + fn read(&mut self, out: &mut [u8]) -> io::Result { + self.decoder.read(out) + } +} + +impl Decompressor for NoopDecompressor { fn get_uncompressed_length(&mut self) -> io::Result { - Ok(self.metadata()?.len()) + self.decoder.stream_len() } } -impl Compression for Noop { - fn compress(dest: fs::File) -> io::Result> { - Ok(Box::new(dest)) +impl<'a> Compression<'a> for Noop { + fn compress(dest: W) -> io::Result> { + Ok(Box::new(NoopCompressor { + encoder: Box::new(dest), + })) } - fn decompress(source: fs::File) -> io::Result> { - Ok(Box::new(source)) + fn decompress( + source: R, + ) -> io::Result> { + Ok(Box::new(NoopDecompressor { + decoder: Box::new(source), + })) } fn append_extension(media_type: &str) -> String { @@ -34,6 +70,7 @@ impl Compression for Noop { mod tests { use super::*; use crate::tests::{compress_decompress, compression_is_seekable, TRUTH}; + use std::fs; use tempfile::NamedTempFile; #[test] diff --git a/compression/src/zstd_wrapper.rs b/compression/src/zstd_wrapper.rs index 60dc793..d083e21 100644 --- a/compression/src/zstd_wrapper.rs +++ b/compression/src/zstd_wrapper.rs @@ -2,9 +2,8 @@ use common::MAX_CHUNK_SIZE; use std::cmp::min; use std::convert::TryFrom; use std::convert::TryInto; -use std::fs; use std::io; -use std::io::Read; +use std::io::{Read, Write}; use crate::{Compression, Compressor, Decompressor}; @@ -14,18 +13,18 @@ fn err_to_io(e: E) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } -pub struct ZstdCompressor { - encoder: zstd::stream::write::Encoder<'static, std::fs::File>, +pub struct ZstdCompressor { + encoder: zstd::stream::write::Encoder<'static, W>, } -impl Compressor for ZstdCompressor { +impl Compressor for ZstdCompressor { fn end(self: Box) -> io::Result<()> { self.encoder.finish()?; Ok(()) } } -impl io::Write for ZstdCompressor { +impl io::Write for ZstdCompressor { fn write(&mut self, buf: &[u8]) -> io::Result { self.encoder.write(buf) } @@ -86,13 +85,13 @@ impl io::Read for ZstdDecompressor { } pub struct Zstd {} -impl Compression for Zstd { - fn compress(dest: fs::File) -> io::Result> { +impl<'a> Compression<'a> for Zstd { + fn compress(dest: W) -> io::Result> { let encoder = zstd::stream::write::Encoder::new(dest, COMPRESSION_LEVEL)?; Ok(Box::new(ZstdCompressor { encoder })) } - fn decompress(mut source: fs::File) -> io::Result> { + fn decompress(mut source: R) -> io::Result> { let mut contents = Vec::new(); source.read_to_end(&mut contents)?; let mut decompressor = zstd::bulk::Decompressor::new()?; diff --git a/oci/src/lib.rs b/oci/src/lib.rs index 4abb538..9405d7f 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -24,6 +24,8 @@ pub use descriptor::{Descriptor, Digest}; mod index; pub use index::Index; +use std::io::Cursor; +use std::io::Write; pub mod media_types; @@ -84,12 +86,12 @@ impl Image { PathBuf::from("blobs/sha256") } - pub fn put_blob( + pub fn put_blob Compression<'a> + Any, MT: media_types::MediaType>( &self, - mut buf: R, + mut buf: &[u8], ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE], bool)> { - let mut tmp = NamedTempFile::new_in(&self.oci_dir)?; - let mut compressed = C::compress(tmp.reopen()?)?; + let mut compressed_data = Cursor::new(Vec::::new()); + let mut compressed = C::compress(&mut compressed_data)?; let mut hasher = Sha256::new(); // generics may not be the best way to implement compression, alternatives: // trait objects, but they add runtime overhead @@ -97,17 +99,15 @@ impl Image { let compressed_blob = std::any::TypeId::of::() != std::any::TypeId::of::(); - let size = io::copy(&mut buf, &mut compressed)?; + let _uncompressed_size = io::copy(&mut buf, &mut compressed)?; compressed.end()?; + let compressed_size = compressed_data.get_ref().len() as u64; - let mut compressed_data = Vec::new(); - tmp.read_to_end(&mut compressed_data)?; - - hasher.update(&compressed_data[..]); + hasher.update(&compressed_data.get_ref()[..]); let digest = hasher.finalize(); let media_type = C::append_extension(MT::name()); - let descriptor = Descriptor::new(digest.into(), size, media_type); - let fs_verity_digest = get_fs_verity_digest(&compressed_data[..])?; + let descriptor = Descriptor::new(digest.into(), compressed_size, media_type); + let fs_verity_digest = get_fs_verity_digest(&compressed_data.get_ref()[..])?; let path = self.blob_path().join(descriptor.digest.to_string()); // avoid replacing the data blob so we don't drop fsverity data @@ -125,6 +125,8 @@ impl Image { .into()); } } else { + let mut tmp = NamedTempFile::new_in(&self.oci_dir)?; + tmp.write_all(compressed_data.get_ref())?; tmp.persist(path).map_err(|e| e.error)?; } Ok((descriptor, fs_verity_digest, compressed_blob)) @@ -140,7 +142,7 @@ impl Image { Ok(file) } - pub fn open_compressed_blob( + pub fn open_compressed_blob Compression<'a>>( &self, digest: &Digest, verity: Option<&[u8]>, @@ -167,7 +169,7 @@ impl Image { Ok(file) } - pub fn open_rootfs_blob( + pub fn open_rootfs_blob Compression<'a>>( &self, tag: &str, verity: Option<&[u8]>, @@ -252,7 +254,7 @@ mod tests { let dir = tempdir().unwrap(); let image: Image = Image::new(dir.path()).unwrap(); let (desc, ..) = image - .put_blob::<_, compression::Noop, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::("meshuggah rocks".as_bytes()) .unwrap(); const DIGEST: &str = "3abd5ce0f91f640d88dca1f26b37037b02415927cacec9626d87668a715ec12d"; @@ -274,7 +276,7 @@ mod tests { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); let (mut desc, ..) = image - .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::("meshuggah rocks".as_bytes()) .unwrap(); desc.set_name("foo"); let mut index = Index::default(); @@ -292,10 +294,10 @@ mod tests { let dir = tempdir().unwrap(); let image = Image::new(dir.path()).unwrap(); let desc1 = image - .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::("meshuggah rocks".as_bytes()) .unwrap(); let desc2 = image - .put_blob::<_, DefaultCompression, media_types::Chunk>("meshuggah rocks".as_bytes()) + .put_blob::("meshuggah rocks".as_bytes()) .unwrap(); assert_eq!(desc1, desc2); } From 53f62f811d150f7fd5b1b45b1bd04f1ec819f127 Mon Sep 17 00:00:00 2001 From: Ariel Miculas Date: Wed, 29 Mar 2023 18:07:29 +0300 Subject: [PATCH 7/7] Store the uncompressed blob if the compressed blob has a larger size Signed-off-by: Ariel Miculas --- oci/src/lib.rs | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/oci/src/lib.rs b/oci/src/lib.rs index 9405d7f..01cb619 100644 --- a/oci/src/lib.rs +++ b/oci/src/lib.rs @@ -88,7 +88,7 @@ impl Image { pub fn put_blob Compression<'a> + Any, MT: media_types::MediaType>( &self, - mut buf: &[u8], + buf: &[u8], ) -> Result<(Descriptor, [u8; SHA256_BLOCK_SIZE], bool)> { let mut compressed_data = Cursor::new(Vec::::new()); let mut compressed = C::compress(&mut compressed_data)?; @@ -96,17 +96,28 @@ impl Image { // generics may not be the best way to implement compression, alternatives: // trait objects, but they add runtime overhead // an enum together with enum_dispatch - let compressed_blob = + let mut compressed_blob = std::any::TypeId::of::() != std::any::TypeId::of::(); - let _uncompressed_size = io::copy(&mut buf, &mut compressed)?; + // without the clone, the io::copy leaves us with an empty slice + // we're only cloning the reference, which is ok because the slice itself gets mutated + // i.e. the slice advances through the buffer as it is being read + let uncompressed_size = io::copy(&mut <&[u8]>::clone(&buf), &mut compressed)?; compressed.end()?; let compressed_size = compressed_data.get_ref().len() as u64; - hasher.update(&compressed_data.get_ref()[..]); + // store the uncompressed blob if the compressed version has bigger size + let final_data = if compressed_blob && compressed_size >= uncompressed_size { + compressed_blob = false; + buf + } else { + compressed_data.get_ref() + }; + + hasher.update(final_data); let digest = hasher.finalize(); let media_type = C::append_extension(MT::name()); - let descriptor = Descriptor::new(digest.into(), compressed_size, media_type); + let descriptor = Descriptor::new(digest.into(), uncompressed_size, media_type); let fs_verity_digest = get_fs_verity_digest(&compressed_data.get_ref()[..])?; let path = self.blob_path().join(descriptor.digest.to_string()); @@ -126,7 +137,7 @@ impl Image { } } else { let mut tmp = NamedTempFile::new_in(&self.oci_dir)?; - tmp.write_all(compressed_data.get_ref())?; + tmp.write_all(final_data)?; tmp.persist(path).map_err(|e| e.error)?; } Ok((descriptor, fs_verity_digest, compressed_blob))