From fe2adfa70a373805b23fd5b8c97a64667c2e5afb Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Sun, 16 Apr 2023 12:22:39 +0800 Subject: [PATCH] rafs: enhance builder to support batch chunk Add `--batch-size` subcommand on nydus-image. Add build time support of batch chunk. Signed-off-by: Wenhao Ren --- rafs/src/builder/core/blob.rs | 28 +++++++++++- rafs/src/builder/core/bootstrap.rs | 1 + rafs/src/builder/core/chunk_dict.rs | 1 + rafs/src/builder/core/context.rs | 19 +++++++- rafs/src/builder/core/node.rs | 45 ++++++++++++++++--- rafs/src/metadata/chunk.rs | 21 +++++++++ rafs/src/metadata/inode.rs | 4 +- rafs/src/metadata/layout/v6.rs | 7 ++- rafs/src/metadata/mod.rs | 6 +++ src/bin/nydus-image/main.rs | 68 ++++++++++++++++++++++++++++- storage/src/meta/mod.rs | 3 ++ 11 files changed, 188 insertions(+), 15 deletions(-) diff --git a/rafs/src/builder/core/blob.rs b/rafs/src/builder/core/blob.rs index 340fd368b86..ba766829a3e 100644 --- a/rafs/src/builder/core/blob.rs +++ b/rafs/src/builder/core/blob.rs @@ -102,6 +102,22 @@ impl Blob { blob_mgr: &mut BlobManager, blob_writer: &mut ArtifactWriter, ) -> Result<()> { + // Dump buffered batch chunk data if exists. + if let Some(ref batch) = ctx.blob_batch_generator { + if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() { + let mut batch = batch.lock().unwrap(); + if !batch.chunk_data_buf_is_empty() { + let (pre_compressed_offset, compressed_size, _) = Node::write_chunk_data( + &ctx, + blob_ctx, + blob_writer, + batch.chunk_data_buf(), + )?; + batch.add_context(pre_compressed_offset, compressed_size); + batch.clear_chunk_data_buf(); + } + } + } if !ctx.blob_features.contains(BlobFeatures::SEPARATE) && (ctx.blob_inline_meta || ctx.features.is_enabled(Feature::BlobToc)) { @@ -154,6 +170,14 @@ impl Blob { header.set_separate_blob(true); inflate_buf = [ci_data, &inflate_data].concat(); ci_data = &inflate_buf; + } else if let Some(ref batch) = ctx.blob_batch_generator { + let (inflate_data, inflate_count) = batch.lock().unwrap().to_vec()?; + header.set_ci_zran_count(inflate_count); + header.set_ci_zran_offset(ci_data.len() as u64); + header.set_ci_zran_size(inflate_data.len() as u64); + header.set_ci_batch(true); + inflate_buf = [ci_data, &inflate_data].concat(); + ci_data = &inflate_buf; } else if ctx.blob_tar_reader.is_some() { header.set_separate_blob(true); }; @@ -207,7 +231,9 @@ impl Blob { // Generate ToC entry for `blob.meta` and write chunk digest array. if ctx.features.is_enabled(Feature::BlobToc) { let mut hasher = RafsDigest::hasher(digest::Algorithm::Sha256); - let ci_data = if ctx.blob_features.contains(BlobFeatures::ZRAN) { + let ci_data = if ctx.blob_features.contains(BlobFeatures::BATCH) + || ctx.blob_features.contains(BlobFeatures::ZRAN) + { inflate_buf.as_slice() } else { blob_ctx.blob_meta_info.as_byte_slice() diff --git a/rafs/src/builder/core/bootstrap.rs b/rafs/src/builder/core/bootstrap.rs index de497e919ed..9e82a4c9173 100644 --- a/rafs/src/builder/core/bootstrap.rs +++ b/rafs/src/builder/core/bootstrap.rs @@ -296,6 +296,7 @@ impl Bootstrap { compressor: ctx.compressor, digester: ctx.digester, chunk_size: ctx.chunk_size, + batch_size: ctx.batch_size, explicit_uidgid: ctx.explicit_uidgid, version: ctx.fs_version, is_tarfs_mode: rs.meta.flags.contains(RafsSuperFlags::TARTFS_MODE), diff --git a/rafs/src/builder/core/chunk_dict.rs b/rafs/src/builder/core/chunk_dict.rs index f0376e352c5..c65858300d7 100644 --- a/rafs/src/builder/core/chunk_dict.rs +++ b/rafs/src/builder/core/chunk_dict.rs @@ -263,6 +263,7 @@ mod tests { compressor: compress::Algorithm::Lz4Block, digester: digest::Algorithm::Blake3, chunk_size: 0x100000, + batch_size: 0, explicit_uidgid: true, is_tarfs_mode: false, }; diff --git a/rafs/src/builder/core/context.rs b/rafs/src/builder/core/context.rs index 9c427db450e..d5f534d8799 100644 --- a/rafs/src/builder/core/context.rs +++ b/rafs/src/builder/core/context.rs @@ -26,8 +26,8 @@ use nydus_storage::device::{BlobFeatures, BlobInfo}; use nydus_storage::factory::BlobFactory; use nydus_storage::meta::toc::{TocEntryList, TocLocation}; use nydus_storage::meta::{ - toc, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader, BlobMetaChunkArray, - BlobMetaChunkInfo, ZranContextGenerator, + toc, BatchContextGenerator, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader, + BlobMetaChunkArray, BlobMetaChunkInfo, ZranContextGenerator, }; use nydus_utils::digest::DigestData; use nydus_utils::{compress, digest, div_round_up, round_down, BufReaderInfo}; @@ -464,6 +464,9 @@ impl BlobContext { blob_ctx .blob_meta_header .set_chunk_info_v2(features.contains(BlobFeatures::CHUNK_INFO_V2)); + blob_ctx + .blob_meta_header + .set_ci_batch(features.contains(BlobFeatures::BATCH)); blob_ctx .blob_meta_header .set_ci_zran(features.contains(BlobFeatures::ZRAN)); @@ -653,6 +656,7 @@ impl BlobContext { chunk.uncompressed_offset(), chunk.uncompressed_size(), chunk.is_compressed(), + chunk.is_batch(), 0, ); } @@ -1074,6 +1078,8 @@ pub struct BuildContext { pub whiteout_spec: WhiteoutSpec, /// Chunk slice size. pub chunk_size: u32, + /// Batch chunk data size. + pub batch_size: u32, /// Version number of output metadata and data blob. pub fs_version: RafsVersion, /// Whether any directory/file has extended attributes. @@ -1092,6 +1098,7 @@ pub struct BuildContext { /// Storage writing blob to single file or a directory. pub blob_storage: Option, pub blob_zran_generator: Option>>, + pub blob_batch_generator: Option>, pub blob_tar_reader: Option>, pub blob_features: BlobFeatures, pub blob_inline_meta: bool, @@ -1141,6 +1148,7 @@ impl BuildContext { whiteout_spec, chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32, + batch_size: 0, fs_version: RafsVersion::default(), conversion_type, @@ -1149,6 +1157,7 @@ impl BuildContext { prefetch, blob_storage, blob_zran_generator: None, + blob_batch_generator: None, blob_tar_reader: None, blob_features, blob_inline_meta, @@ -1167,6 +1176,10 @@ impl BuildContext { self.chunk_size = chunk_size; } + pub fn set_batch_size(&mut self, batch_size: u32) { + self.batch_size = batch_size; + } + pub fn set_configuration(&mut self, config: Arc) { self.configuration = config; } @@ -1184,6 +1197,7 @@ impl Default for BuildContext { whiteout_spec: WhiteoutSpec::default(), chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32, + batch_size: 0, fs_version: RafsVersion::default(), conversion_type: ConversionType::default(), @@ -1192,6 +1206,7 @@ impl Default for BuildContext { prefetch: Prefetch::default(), blob_storage: None, blob_zran_generator: None, + blob_batch_generator: None, blob_tar_reader: None, blob_features: BlobFeatures::empty(), has_xattr: true, diff --git a/rafs/src/builder/core/node.rs b/rafs/src/builder/core/node.rs index 27499c628a4..5fa68577a63 100644 --- a/rafs/src/builder/core/node.rs +++ b/rafs/src/builder/core/node.rs @@ -271,7 +271,7 @@ impl Node { }; let chunk_data = &mut data_buf[0..uncompressed_size as usize]; - let (mut chunk, chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?; + let (mut chunk, mut chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?; if let Some(h) = inode_hasher.as_mut() { h.digest_update(chunk.id().as_ref()); } @@ -298,8 +298,10 @@ impl Node { if ctx.conversion_type == ConversionType::TarToTarfs { chunk.set_uncompressed_offset(chunk.compressed_offset()); chunk.set_uncompressed_size(chunk.compressed_size()); - } else { - self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?; + } else if let Some(info) = + self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)? + { + chunk_info = Some(info); } let chunk = Arc::new(chunk); @@ -366,6 +368,8 @@ impl Node { Ok((chunk, chunk_info)) } + /// Dump a chunk from u8 slice into the data blob. + /// Return `BlobChunkInfoV2Ondisk` when the chunk is added into a batch chunk. fn dump_file_chunk( &self, ctx: &BuildContext, @@ -373,7 +377,7 @@ impl Node { blob_writer: &mut ArtifactWriter, chunk_data: &[u8], chunk: &mut ChunkWrapper, - ) -> Result<()> { + ) -> Result> { let uncompressed_size = chunk_data.len() as u32; let aligned_chunk_size = if ctx.aligned_chunk { // Safe to unwrap because `chunk_size` is much less than u32::MAX. @@ -387,7 +391,36 @@ impl Node { chunk.set_uncompressed_offset(pre_uncompressed_offset); chunk.set_uncompressed_size(uncompressed_size); - if !ctx.blob_features.contains(BlobFeatures::SEPARATE) { + let mut chunk_info = None; + + if self.inode.child_count() == 1 + && uncompressed_size < ctx.batch_size / 2 + && ctx.blob_batch_generator.is_some() + { + // This chunk will be added into a batch chunk. + let mut batch = ctx.blob_batch_generator.as_ref().unwrap().lock().unwrap(); + + if batch.chunk_data_buf_len() as u32 + uncompressed_size < ctx.batch_size { + // Add into current batch chunk directly. + chunk_info = + Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?); + batch.append_chunk_data_buf(chunk_data); + } else { + // Dump current batch chunk if exists, and then add into a new batch chunk. + if !batch.chunk_data_buf_is_empty() { + // Dump current batch chunk. + let (pre_compressed_offset, compressed_size, _) = + Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?; + batch.add_context(pre_compressed_offset, compressed_size); + batch.clear_chunk_data_buf(); + } + + // Add into a new batch chunk. + chunk_info = + Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?); + batch.append_chunk_data_buf(chunk_data); + } + } else if !ctx.blob_features.contains(BlobFeatures::SEPARATE) { // For other case which needs to write chunk data to data blobs. let (pre_compressed_offset, compressed_size, is_compressed) = Self::write_chunk_data(ctx, blob_ctx, blob_writer, chunk_data) @@ -399,7 +432,7 @@ impl Node { event_tracer!("blob_uncompressed_size", +uncompressed_size); - Ok(()) + Ok(chunk_info) } pub fn write_chunk_data( diff --git a/rafs/src/metadata/chunk.rs b/rafs/src/metadata/chunk.rs index b82ea884af7..cb9a9277b3a 100644 --- a/rafs/src/metadata/chunk.rs +++ b/rafs/src/metadata/chunk.rs @@ -248,6 +248,27 @@ impl ChunkWrapper { } } + /// Set flag for whether chunk is batch chunk. + pub fn set_batch(&mut self, batch: bool) { + self.ensure_owned(); + match self { + ChunkWrapper::V5(c) => c.flags.set(BlobChunkFlags::BATCH, batch), + ChunkWrapper::V6(c) => c.flags.set(BlobChunkFlags::BATCH, batch), + ChunkWrapper::Ref(_c) => panic!("unexpected"), + } + } + + /// Check whether the chunk is batch chunk or not. + pub fn is_batch(&self) -> bool { + match self { + ChunkWrapper::V5(c) => c.flags.contains(BlobChunkFlags::BATCH), + ChunkWrapper::V6(c) => c.flags.contains(BlobChunkFlags::BATCH), + ChunkWrapper::Ref(c) => as_blob_v5_chunk_info(c.deref()) + .flags() + .contains(BlobChunkFlags::BATCH), + } + } + #[allow(clippy::too_many_arguments)] /// Set a group of chunk information fields. pub fn set_chunk_info( diff --git a/rafs/src/metadata/inode.rs b/rafs/src/metadata/inode.rs index 32217aba902..22657b5776f 100644 --- a/rafs/src/metadata/inode.rs +++ b/rafs/src/metadata/inode.rs @@ -276,7 +276,7 @@ impl InodeWrapper { } } - /// Set inode content size of regular file, directory and symlink. + /// Get inode content size of regular file, directory and symlink. pub fn size(&self) -> u64 { match self { InodeWrapper::V5(i) => i.i_size, @@ -285,7 +285,7 @@ impl InodeWrapper { } } - /// Get inode content size. + /// Set inode content size. pub fn set_size(&mut self, size: u64) { self.ensure_owned(); match self { diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index 777a8dee04f..e74aa6bdb69 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -1634,7 +1634,8 @@ impl RafsV6Blob { let count = chunk_count as u64; if blob_features.contains(BlobFeatures::CHUNK_INFO_V2) - && blob_features.contains(BlobFeatures::ZRAN) + && (blob_features.contains(BlobFeatures::BATCH) + || blob_features.contains(BlobFeatures::ZRAN)) { if ci_uncompr_size < count * size_of::() as u64 { error!( @@ -1651,7 +1652,9 @@ impl RafsV6Blob { ); return false; } - } else if blob_features.contains(BlobFeatures::ZRAN) { + } else if blob_features.contains(BlobFeatures::BATCH) + || blob_features.contains(BlobFeatures::ZRAN) + { error!( "RafsV6Blob: idx {} invalid feature bits {}", blob_index, diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index 072fca1ce42..a317f74197b 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -367,6 +367,8 @@ pub struct RafsSuperConfig { pub digester: digest::Algorithm, /// Size of data chunks. pub chunk_size: u32, + /// Size of batch data chunks. + pub batch_size: u32, /// Whether `explicit_uidgid` enabled or not. pub explicit_uidgid: bool, /// RAFS in TARFS mode. @@ -429,6 +431,8 @@ pub struct RafsSuperMeta { pub root_inode: Inode, /// Chunk size. pub chunk_size: u32, + /// Batch chunk size. + pub batch_size: u32, /// Number of inodes in the filesystem. pub inodes_count: u64, /// V5: superblock flags for Rafs v5. @@ -525,6 +529,7 @@ impl RafsSuperMeta { compressor: self.get_compressor(), digester: self.get_digester(), chunk_size: self.chunk_size, + batch_size: self.batch_size, explicit_uidgid: self.explicit_uidgid(), is_tarfs_mode: self.flags.contains(RafsSuperFlags::TARTFS_MODE), } @@ -540,6 +545,7 @@ impl Default for RafsSuperMeta { inodes_count: 0, root_inode: 0, chunk_size: 0, + batch_size: 0, flags: RafsSuperFlags::empty(), inode_table_entries: 0, inode_table_offset: 0, diff --git a/src/bin/nydus-image/main.rs b/src/bin/nydus-image/main.rs index 51a3f1fb49f..4548ad85437 100644 --- a/src/bin/nydus-image/main.rs +++ b/src/bin/nydus-image/main.rs @@ -18,7 +18,7 @@ use std::convert::TryFrom; use std::fs::{self, metadata, DirEntry, File, OpenOptions}; use std::os::unix::fs::FileTypeExt; use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use anyhow::{bail, Context, Result}; use clap::parser::ValueSource; @@ -37,7 +37,7 @@ use nydus_storage::backend::localfs::LocalFs; use nydus_storage::backend::BlobBackend; use nydus_storage::device::BlobFeatures; use nydus_storage::factory::BlobFactory; -use nydus_storage::meta::format_blob_features; +use nydus_storage::meta::{format_blob_features, BatchContextGenerator}; use nydus_storage::{RAFS_DEFAULT_CHUNK_SIZE, RAFS_MAX_CHUNK_SIZE}; use nydus_utils::trace::{EventTracerClass, TimingTracerClass, TraceClass}; use nydus_utils::{compress, digest, event_tracer, register_tracer, root_tracer, timing_tracer}; @@ -264,6 +264,13 @@ fn prepare_cmd_args(bti_string: &'static str) -> App { .help("Set the size of data chunks, must be power of two and between 0x1000-0x1000000:") .required(false), ) + .arg( + Arg::new("batch-size") + .long("batch-size") + .help("Set the batch size to merge small chunks, must be power of two, between 0x1000-0x1000000 or be zero:") + .required(false) + .default_value("0"), + ) .arg( Arg::new("compressor") .long("compressor") @@ -718,6 +725,7 @@ impl Command { let repeatable = matches.get_flag("repeatable"); let version = Self::get_fs_version(matches)?; let chunk_size = Self::get_chunk_size(matches, conversion_type)?; + let batch_size = Self::get_batch_size(matches, version, conversion_type, chunk_size)?; let aligned_chunk = if version.is_v6() && conversion_type != ConversionType::TarToTarfs { true } else { @@ -937,6 +945,7 @@ impl Command { ); build_ctx.set_fs_version(version); build_ctx.set_chunk_size(chunk_size); + build_ctx.set_batch_size(batch_size); let mut config = Self::get_configuration(matches)?; if let Some(cache) = Arc::get_mut(&mut config).unwrap().cache.as_mut() { @@ -952,6 +961,7 @@ impl Command { compressor, digester, chunk_size, + batch_size, explicit_uidgid: !repeatable, is_tarfs_mode: false, }; @@ -971,6 +981,14 @@ impl Command { BootstrapManager::new(Some(bootstrap_path), parent_path) }; + // Legality has been checked and filtered by `get_batch_size()`. + if build_ctx.batch_size > 0 { + let generator = BatchContextGenerator::new(build_ctx.batch_size)?; + build_ctx.blob_batch_generator = Some(Mutex::new(generator)); + build_ctx.blob_features.insert(BlobFeatures::BATCH); + build_ctx.blob_features.insert(BlobFeatures::CHUNK_INFO_V2); + } + let mut builder: Box = match conversion_type { ConversionType::DirectoryToRafs => Box::new(DirectoryBuilder::new()), ConversionType::EStargzIndexToRef => Box::new(StargzBuilder::new(blob_data_size)), @@ -1452,6 +1470,52 @@ impl Command { } } + fn get_batch_size( + matches: &ArgMatches, + version: RafsVersion, + ty: ConversionType, + chunk_size: u32, + ) -> Result { + match matches.get_one::("batch-size") { + None => Ok(0), + Some(v) => { + let batch_size = if v.starts_with("0x") || v.starts_with("0X") { + u32::from_str_radix(&v[2..], 16).context(format!("invalid batch size {}", v))? + } else { + v.parse::() + .context(format!("invalid chunk size {}", v))? + }; + if batch_size > 0 { + if version.is_v5() { + bail!("`--batch-size` with non-zero value conflicts with `--fs-version 5`"); + } + match ty { + ConversionType::DirectoryToRafs + | ConversionType::EStargzToRafs + | ConversionType::TargzToRafs + | ConversionType::TarToRafs => { + if batch_size as u64 > RAFS_MAX_CHUNK_SIZE + || batch_size < 0x1000 + || !batch_size.is_power_of_two() + { + bail!("invalid batch size: {}", batch_size); + } + if batch_size > chunk_size { + bail!( + "batch size 0x{:x} is bigger than chunk size 0x{:x}", + batch_size, + chunk_size + ); + } + } + _ => bail!("unsupported ConversionType for batch chunk: {}", ty), + } + } + Ok(batch_size) + } + } + } + fn get_prefetch(matches: &ArgMatches) -> Result { let prefetch_policy = matches .get_one::("prefetch-policy") diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index a060f1e66fd..d83e333795d 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -1005,6 +1005,7 @@ impl BlobMetaChunkArray { } /// Add an entry of v2 chunk compression information into the array. + #[allow(clippy::too_many_arguments)] pub fn add_v2( &mut self, compressed_offset: u64, @@ -1012,6 +1013,7 @@ impl BlobMetaChunkArray { uncompressed_offset: u64, uncompressed_size: u32, compressed: bool, + is_batch: bool, data: u64, ) { match self { @@ -1022,6 +1024,7 @@ impl BlobMetaChunkArray { meta.set_uncompressed_offset(uncompressed_offset); meta.set_uncompressed_size(uncompressed_size); meta.set_compressed(compressed); + meta.set_batch(is_batch); meta.set_data(data); v.push(meta); }