Skip to content

Commit

Permalink
rafs: enhance builder to support batch chunk
Browse files Browse the repository at this point in the history
Add `--batch-size` subcommand on nydus-image.
Add build time support of batch chunk.

Signed-off-by: Wenhao Ren <[email protected]>
  • Loading branch information
hangvane committed Apr 16, 2023
1 parent b853c7f commit fe2adfa
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 15 deletions.
28 changes: 27 additions & 1 deletion rafs/src/builder/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ impl Blob {
blob_mgr: &mut BlobManager,
blob_writer: &mut ArtifactWriter,
) -> Result<()> {
// Dump buffered batch chunk data if exists.
if let Some(ref batch) = ctx.blob_batch_generator {
if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() {
let mut batch = batch.lock().unwrap();
if !batch.chunk_data_buf_is_empty() {
let (pre_compressed_offset, compressed_size, _) = Node::write_chunk_data(
&ctx,
blob_ctx,
blob_writer,
batch.chunk_data_buf(),
)?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.clear_chunk_data_buf();
}
}
}
if !ctx.blob_features.contains(BlobFeatures::SEPARATE)
&& (ctx.blob_inline_meta || ctx.features.is_enabled(Feature::BlobToc))
{
Expand Down Expand Up @@ -154,6 +170,14 @@ impl Blob {
header.set_separate_blob(true);
inflate_buf = [ci_data, &inflate_data].concat();
ci_data = &inflate_buf;
} else if let Some(ref batch) = ctx.blob_batch_generator {
let (inflate_data, inflate_count) = batch.lock().unwrap().to_vec()?;
header.set_ci_zran_count(inflate_count);
header.set_ci_zran_offset(ci_data.len() as u64);
header.set_ci_zran_size(inflate_data.len() as u64);
header.set_ci_batch(true);
inflate_buf = [ci_data, &inflate_data].concat();
ci_data = &inflate_buf;
} else if ctx.blob_tar_reader.is_some() {
header.set_separate_blob(true);
};
Expand Down Expand Up @@ -207,7 +231,9 @@ impl Blob {
// Generate ToC entry for `blob.meta` and write chunk digest array.
if ctx.features.is_enabled(Feature::BlobToc) {
let mut hasher = RafsDigest::hasher(digest::Algorithm::Sha256);
let ci_data = if ctx.blob_features.contains(BlobFeatures::ZRAN) {
let ci_data = if ctx.blob_features.contains(BlobFeatures::BATCH)
|| ctx.blob_features.contains(BlobFeatures::ZRAN)
{
inflate_buf.as_slice()
} else {
blob_ctx.blob_meta_info.as_byte_slice()
Expand Down
1 change: 1 addition & 0 deletions rafs/src/builder/core/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl Bootstrap {
compressor: ctx.compressor,
digester: ctx.digester,
chunk_size: ctx.chunk_size,
batch_size: ctx.batch_size,
explicit_uidgid: ctx.explicit_uidgid,
version: ctx.fs_version,
is_tarfs_mode: rs.meta.flags.contains(RafsSuperFlags::TARTFS_MODE),
Expand Down
1 change: 1 addition & 0 deletions rafs/src/builder/core/chunk_dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ mod tests {
compressor: compress::Algorithm::Lz4Block,
digester: digest::Algorithm::Blake3,
chunk_size: 0x100000,
batch_size: 0,
explicit_uidgid: true,
is_tarfs_mode: false,
};
Expand Down
19 changes: 17 additions & 2 deletions rafs/src/builder/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use nydus_storage::device::{BlobFeatures, BlobInfo};
use nydus_storage::factory::BlobFactory;
use nydus_storage::meta::toc::{TocEntryList, TocLocation};
use nydus_storage::meta::{
toc, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader, BlobMetaChunkArray,
BlobMetaChunkInfo, ZranContextGenerator,
toc, BatchContextGenerator, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader,
BlobMetaChunkArray, BlobMetaChunkInfo, ZranContextGenerator,
};
use nydus_utils::digest::DigestData;
use nydus_utils::{compress, digest, div_round_up, round_down, BufReaderInfo};
Expand Down Expand Up @@ -464,6 +464,9 @@ impl BlobContext {
blob_ctx
.blob_meta_header
.set_chunk_info_v2(features.contains(BlobFeatures::CHUNK_INFO_V2));
blob_ctx
.blob_meta_header
.set_ci_batch(features.contains(BlobFeatures::BATCH));
blob_ctx
.blob_meta_header
.set_ci_zran(features.contains(BlobFeatures::ZRAN));
Expand Down Expand Up @@ -653,6 +656,7 @@ impl BlobContext {
chunk.uncompressed_offset(),
chunk.uncompressed_size(),
chunk.is_compressed(),
chunk.is_batch(),
0,
);
}
Expand Down Expand Up @@ -1074,6 +1078,8 @@ pub struct BuildContext {
pub whiteout_spec: WhiteoutSpec,
/// Chunk slice size.
pub chunk_size: u32,
/// Batch chunk data size.
pub batch_size: u32,
/// Version number of output metadata and data blob.
pub fs_version: RafsVersion,
/// Whether any directory/file has extended attributes.
Expand All @@ -1092,6 +1098,7 @@ pub struct BuildContext {
/// Storage writing blob to single file or a directory.
pub blob_storage: Option<ArtifactStorage>,
pub blob_zran_generator: Option<Mutex<ZranContextGenerator<File>>>,
pub blob_batch_generator: Option<Mutex<BatchContextGenerator>>,
pub blob_tar_reader: Option<BufReaderInfo<File>>,
pub blob_features: BlobFeatures,
pub blob_inline_meta: bool,
Expand Down Expand Up @@ -1141,6 +1148,7 @@ impl BuildContext {
whiteout_spec,

chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32,
batch_size: 0,
fs_version: RafsVersion::default(),

conversion_type,
Expand All @@ -1149,6 +1157,7 @@ impl BuildContext {
prefetch,
blob_storage,
blob_zran_generator: None,
blob_batch_generator: None,
blob_tar_reader: None,
blob_features,
blob_inline_meta,
Expand All @@ -1167,6 +1176,10 @@ impl BuildContext {
self.chunk_size = chunk_size;
}

pub fn set_batch_size(&mut self, batch_size: u32) {
self.batch_size = batch_size;
}

pub fn set_configuration(&mut self, config: Arc<ConfigV2>) {
self.configuration = config;
}
Expand All @@ -1184,6 +1197,7 @@ impl Default for BuildContext {
whiteout_spec: WhiteoutSpec::default(),

chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32,
batch_size: 0,
fs_version: RafsVersion::default(),

conversion_type: ConversionType::default(),
Expand All @@ -1192,6 +1206,7 @@ impl Default for BuildContext {
prefetch: Prefetch::default(),
blob_storage: None,
blob_zran_generator: None,
blob_batch_generator: None,
blob_tar_reader: None,
blob_features: BlobFeatures::empty(),
has_xattr: true,
Expand Down
45 changes: 39 additions & 6 deletions rafs/src/builder/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Node {
};

let chunk_data = &mut data_buf[0..uncompressed_size as usize];
let (mut chunk, chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?;
let (mut chunk, mut chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?;
if let Some(h) = inode_hasher.as_mut() {
h.digest_update(chunk.id().as_ref());
}
Expand All @@ -298,8 +298,10 @@ impl Node {
if ctx.conversion_type == ConversionType::TarToTarfs {
chunk.set_uncompressed_offset(chunk.compressed_offset());
chunk.set_uncompressed_size(chunk.compressed_size());
} else {
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?;
} else if let Some(info) =
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?
{
chunk_info = Some(info);
}

let chunk = Arc::new(chunk);
Expand Down Expand Up @@ -366,14 +368,16 @@ impl Node {
Ok((chunk, chunk_info))
}

/// Dump a chunk from u8 slice into the data blob.
/// Return `BlobChunkInfoV2Ondisk` when the chunk is added into a batch chunk.
fn dump_file_chunk(
&self,
ctx: &BuildContext,
blob_ctx: &mut BlobContext,
blob_writer: &mut ArtifactWriter,
chunk_data: &[u8],
chunk: &mut ChunkWrapper,
) -> Result<()> {
) -> Result<Option<BlobChunkInfoV2Ondisk>> {
let uncompressed_size = chunk_data.len() as u32;
let aligned_chunk_size = if ctx.aligned_chunk {
// Safe to unwrap because `chunk_size` is much less than u32::MAX.
Expand All @@ -387,7 +391,36 @@ impl Node {
chunk.set_uncompressed_offset(pre_uncompressed_offset);
chunk.set_uncompressed_size(uncompressed_size);

if !ctx.blob_features.contains(BlobFeatures::SEPARATE) {
let mut chunk_info = None;

if self.inode.child_count() == 1
&& uncompressed_size < ctx.batch_size / 2
&& ctx.blob_batch_generator.is_some()
{
// This chunk will be added into a batch chunk.
let mut batch = ctx.blob_batch_generator.as_ref().unwrap().lock().unwrap();

if batch.chunk_data_buf_len() as u32 + uncompressed_size < ctx.batch_size {
// Add into current batch chunk directly.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
batch.append_chunk_data_buf(chunk_data);
} else {
// Dump current batch chunk if exists, and then add into a new batch chunk.
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_compressed_offset, compressed_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.clear_chunk_data_buf();
}

// Add into a new batch chunk.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
batch.append_chunk_data_buf(chunk_data);
}
} else if !ctx.blob_features.contains(BlobFeatures::SEPARATE) {
// For other case which needs to write chunk data to data blobs.
let (pre_compressed_offset, compressed_size, is_compressed) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, chunk_data)
Expand All @@ -399,7 +432,7 @@ impl Node {

event_tracer!("blob_uncompressed_size", +uncompressed_size);

Ok(())
Ok(chunk_info)
}

pub fn write_chunk_data(
Expand Down
21 changes: 21 additions & 0 deletions rafs/src/metadata/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,27 @@ impl ChunkWrapper {
}
}

/// Set flag for whether chunk is batch chunk.
pub fn set_batch(&mut self, batch: bool) {
self.ensure_owned();
match self {
ChunkWrapper::V5(c) => c.flags.set(BlobChunkFlags::BATCH, batch),
ChunkWrapper::V6(c) => c.flags.set(BlobChunkFlags::BATCH, batch),
ChunkWrapper::Ref(_c) => panic!("unexpected"),
}
}

/// Check whether the chunk is batch chunk or not.
pub fn is_batch(&self) -> bool {
match self {
ChunkWrapper::V5(c) => c.flags.contains(BlobChunkFlags::BATCH),
ChunkWrapper::V6(c) => c.flags.contains(BlobChunkFlags::BATCH),
ChunkWrapper::Ref(c) => as_blob_v5_chunk_info(c.deref())
.flags()
.contains(BlobChunkFlags::BATCH),
}
}

#[allow(clippy::too_many_arguments)]
/// Set a group of chunk information fields.
pub fn set_chunk_info(
Expand Down
4 changes: 2 additions & 2 deletions rafs/src/metadata/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl InodeWrapper {
}
}

/// Set inode content size of regular file, directory and symlink.
/// Get inode content size of regular file, directory and symlink.
pub fn size(&self) -> u64 {
match self {
InodeWrapper::V5(i) => i.i_size,
Expand All @@ -285,7 +285,7 @@ impl InodeWrapper {
}
}

/// Get inode content size.
/// Set inode content size.
pub fn set_size(&mut self, size: u64) {
self.ensure_owned();
match self {
Expand Down
7 changes: 5 additions & 2 deletions rafs/src/metadata/layout/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,8 @@ impl RafsV6Blob {

let count = chunk_count as u64;
if blob_features.contains(BlobFeatures::CHUNK_INFO_V2)
&& blob_features.contains(BlobFeatures::ZRAN)
&& (blob_features.contains(BlobFeatures::BATCH)
|| blob_features.contains(BlobFeatures::ZRAN))
{
if ci_uncompr_size < count * size_of::<BlobChunkInfoV2Ondisk>() as u64 {
error!(
Expand All @@ -1651,7 +1652,9 @@ impl RafsV6Blob {
);
return false;
}
} else if blob_features.contains(BlobFeatures::ZRAN) {
} else if blob_features.contains(BlobFeatures::BATCH)
|| blob_features.contains(BlobFeatures::ZRAN)
{
error!(
"RafsV6Blob: idx {} invalid feature bits {}",
blob_index,
Expand Down
6 changes: 6 additions & 0 deletions rafs/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ pub struct RafsSuperConfig {
pub digester: digest::Algorithm,
/// Size of data chunks.
pub chunk_size: u32,
/// Size of batch data chunks.
pub batch_size: u32,
/// Whether `explicit_uidgid` enabled or not.
pub explicit_uidgid: bool,
/// RAFS in TARFS mode.
Expand Down Expand Up @@ -429,6 +431,8 @@ pub struct RafsSuperMeta {
pub root_inode: Inode,
/// Chunk size.
pub chunk_size: u32,
/// Batch chunk size.
pub batch_size: u32,
/// Number of inodes in the filesystem.
pub inodes_count: u64,
/// V5: superblock flags for Rafs v5.
Expand Down Expand Up @@ -525,6 +529,7 @@ impl RafsSuperMeta {
compressor: self.get_compressor(),
digester: self.get_digester(),
chunk_size: self.chunk_size,
batch_size: self.batch_size,
explicit_uidgid: self.explicit_uidgid(),
is_tarfs_mode: self.flags.contains(RafsSuperFlags::TARTFS_MODE),
}
Expand All @@ -540,6 +545,7 @@ impl Default for RafsSuperMeta {
inodes_count: 0,
root_inode: 0,
chunk_size: 0,
batch_size: 0,
flags: RafsSuperFlags::empty(),
inode_table_entries: 0,
inode_table_offset: 0,
Expand Down
Loading

0 comments on commit fe2adfa

Please sign in to comment.