Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nydus-image: support small chunks mergence #1202

Merged
merged 4 commits into from
Apr 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions rafs/src/builder/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ impl Blob {
blob_mgr: &mut BlobManager,
blob_writer: &mut ArtifactWriter,
) -> Result<()> {
// Dump buffered batch chunk data if exists.
if let Some(ref batch) = ctx.blob_batch_generator {
if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() {
let mut batch = batch.lock().unwrap();
if !batch.chunk_data_buf_is_empty() {
let (pre_compressed_offset, compressed_size, _) = Node::write_chunk_data(
&ctx,
blob_ctx,
blob_writer,
batch.chunk_data_buf(),
)?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.clear_chunk_data_buf();
}
}
}
if !ctx.blob_features.contains(BlobFeatures::SEPARATE)
&& (ctx.blob_inline_meta || ctx.features.is_enabled(Feature::BlobToc))
{
Expand Down Expand Up @@ -143,23 +159,27 @@ impl Blob {
// Prepare blob meta information data.
let blob_meta_info = &blob_ctx.blob_meta_info;
let mut ci_data = blob_meta_info.as_byte_slice();
let mut zran_buf = Vec::new();
let mut inflate_buf = Vec::new();
let mut header = blob_ctx.blob_meta_header;
if let Some(ref zran) = ctx.blob_zran_generator {
let (zran_data, zran_count) = zran.lock().unwrap().to_vec()?;
header.set_ci_zran_count(zran_count);
let (inflate_data, inflate_count) = zran.lock().unwrap().to_vec()?;
header.set_ci_zran_count(inflate_count);
header.set_ci_zran_offset(ci_data.len() as u64);
header.set_ci_zran_size(zran_data.len() as u64);
header.set_ci_zran_size(inflate_data.len() as u64);
header.set_ci_zran(true);
header.set_separate_blob(true);
zran_buf = [ci_data, &zran_data].concat();
ci_data = &zran_buf;
inflate_buf = [ci_data, &inflate_data].concat();
ci_data = &inflate_buf;
} else if let Some(ref batch) = ctx.blob_batch_generator {
let (inflate_data, inflate_count) = batch.lock().unwrap().to_vec()?;
header.set_ci_zran_count(inflate_count);
header.set_ci_zran_offset(ci_data.len() as u64);
header.set_ci_zran_size(inflate_data.len() as u64);
header.set_ci_batch(true);
inflate_buf = [ci_data, &inflate_data].concat();
ci_data = &inflate_buf;
} else if ctx.blob_tar_reader.is_some() {
header.set_separate_blob(true);
header.set_ci_zran(false);
} else {
header.set_separate_blob(false);
jiangliu marked this conversation as resolved.
Show resolved Hide resolved
header.set_ci_zran(false);
};

let mut compressor = compress::Algorithm::Zstd;
Expand Down Expand Up @@ -211,8 +231,10 @@ impl Blob {
// Generate ToC entry for `blob.meta` and write chunk digest array.
if ctx.features.is_enabled(Feature::BlobToc) {
let mut hasher = RafsDigest::hasher(digest::Algorithm::Sha256);
let ci_data = if ctx.blob_features.contains(BlobFeatures::ZRAN) {
zran_buf.as_slice()
let ci_data = if ctx.blob_features.contains(BlobFeatures::BATCH)
|| ctx.blob_features.contains(BlobFeatures::ZRAN)
{
inflate_buf.as_slice()
hangvane marked this conversation as resolved.
Show resolved Hide resolved
} else {
blob_ctx.blob_meta_info.as_byte_slice()
};
Expand Down
1 change: 1 addition & 0 deletions rafs/src/builder/core/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ impl Bootstrap {
compressor: ctx.compressor,
digester: ctx.digester,
chunk_size: ctx.chunk_size,
batch_size: ctx.batch_size,
explicit_uidgid: ctx.explicit_uidgid,
version: ctx.fs_version,
is_tarfs_mode: rs.meta.flags.contains(RafsSuperFlags::TARTFS_MODE),
Expand Down
1 change: 1 addition & 0 deletions rafs/src/builder/core/chunk_dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ mod tests {
compressor: compress::Algorithm::Lz4Block,
digester: digest::Algorithm::Blake3,
chunk_size: 0x100000,
batch_size: 0,
explicit_uidgid: true,
is_tarfs_mode: false,
};
Expand Down
19 changes: 17 additions & 2 deletions rafs/src/builder/core/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use nydus_storage::device::{BlobFeatures, BlobInfo};
use nydus_storage::factory::BlobFactory;
use nydus_storage::meta::toc::{TocEntryList, TocLocation};
use nydus_storage::meta::{
toc, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader, BlobMetaChunkArray,
BlobMetaChunkInfo, ZranContextGenerator,
toc, BatchContextGenerator, BlobChunkInfoV2Ondisk, BlobCompressionContextHeader,
BlobMetaChunkArray, BlobMetaChunkInfo, ZranContextGenerator,
};
use nydus_utils::digest::DigestData;
use nydus_utils::{compress, digest, div_round_up, round_down, BufReaderInfo};
Expand Down Expand Up @@ -464,6 +464,9 @@ impl BlobContext {
blob_ctx
.blob_meta_header
.set_chunk_info_v2(features.contains(BlobFeatures::CHUNK_INFO_V2));
blob_ctx
.blob_meta_header
.set_ci_batch(features.contains(BlobFeatures::BATCH));
blob_ctx
.blob_meta_header
.set_ci_zran(features.contains(BlobFeatures::ZRAN));
Expand Down Expand Up @@ -653,6 +656,7 @@ impl BlobContext {
chunk.uncompressed_offset(),
chunk.uncompressed_size(),
chunk.is_compressed(),
chunk.is_batch(),
0,
);
}
Expand Down Expand Up @@ -1074,6 +1078,8 @@ pub struct BuildContext {
pub whiteout_spec: WhiteoutSpec,
/// Chunk slice size.
pub chunk_size: u32,
/// Batch chunk data size.
pub batch_size: u32,
/// Version number of output metadata and data blob.
pub fs_version: RafsVersion,
/// Whether any directory/file has extended attributes.
Expand All @@ -1092,6 +1098,7 @@ pub struct BuildContext {
/// Storage writing blob to single file or a directory.
pub blob_storage: Option<ArtifactStorage>,
pub blob_zran_generator: Option<Mutex<ZranContextGenerator<File>>>,
pub blob_batch_generator: Option<Mutex<BatchContextGenerator>>,
pub blob_tar_reader: Option<BufReaderInfo<File>>,
pub blob_features: BlobFeatures,
pub blob_inline_meta: bool,
Expand Down Expand Up @@ -1141,6 +1148,7 @@ impl BuildContext {
whiteout_spec,

chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32,
batch_size: 0,
fs_version: RafsVersion::default(),

conversion_type,
Expand All @@ -1149,6 +1157,7 @@ impl BuildContext {
prefetch,
blob_storage,
blob_zran_generator: None,
blob_batch_generator: None,
blob_tar_reader: None,
blob_features,
blob_inline_meta,
Expand All @@ -1167,6 +1176,10 @@ impl BuildContext {
self.chunk_size = chunk_size;
}

pub fn set_batch_size(&mut self, batch_size: u32) {
self.batch_size = batch_size;
}

pub fn set_configuration(&mut self, config: Arc<ConfigV2>) {
self.configuration = config;
}
Expand All @@ -1184,6 +1197,7 @@ impl Default for BuildContext {
whiteout_spec: WhiteoutSpec::default(),

chunk_size: RAFS_DEFAULT_CHUNK_SIZE as u32,
batch_size: 0,
fs_version: RafsVersion::default(),

conversion_type: ConversionType::default(),
Expand All @@ -1192,6 +1206,7 @@ impl Default for BuildContext {
prefetch: Prefetch::default(),
blob_storage: None,
blob_zran_generator: None,
blob_batch_generator: None,
blob_tar_reader: None,
blob_features: BlobFeatures::empty(),
has_xattr: true,
Expand Down
86 changes: 63 additions & 23 deletions rafs/src/builder/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl Node {
};

let chunk_data = &mut data_buf[0..uncompressed_size as usize];
let (mut chunk, chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?;
let (mut chunk, mut chunk_info) = self.read_file_chunk(ctx, reader, chunk_data)?;
if let Some(h) = inode_hasher.as_mut() {
h.digest_update(chunk.id().as_ref());
}
Expand All @@ -298,8 +298,10 @@ impl Node {
if ctx.conversion_type == ConversionType::TarToTarfs {
chunk.set_uncompressed_offset(chunk.compressed_offset());
chunk.set_uncompressed_size(chunk.compressed_size());
} else {
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?;
} else if let Some(info) =
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?
{
chunk_info = Some(info);
}

let chunk = Arc::new(chunk);
Expand Down Expand Up @@ -366,14 +368,16 @@ impl Node {
Ok((chunk, chunk_info))
}

/// Dump a chunk from u8 slice into the data blob.
/// Return `BlobChunkInfoV2Ondisk` when the chunk is added into a batch chunk.
fn dump_file_chunk(
&self,
ctx: &BuildContext,
blob_ctx: &mut BlobContext,
blob_writer: &mut ArtifactWriter,
chunk_data: &[u8],
chunk: &mut ChunkWrapper,
) -> Result<()> {
) -> Result<Option<BlobChunkInfoV2Ondisk>> {
let uncompressed_size = chunk_data.len() as u32;
let aligned_chunk_size = if ctx.aligned_chunk {
// Safe to unwrap because `chunk_size` is much less than u32::MAX.
Expand All @@ -387,32 +391,68 @@ impl Node {
chunk.set_uncompressed_offset(pre_uncompressed_offset);
chunk.set_uncompressed_size(uncompressed_size);

let compressed_size = if ctx.blob_features.contains(BlobFeatures::SEPARATE) {
// For `tar-ref`, `targz-ref`, `estargz-ref` and `estargzindex-ref`.
chunk.compressed_size()
} else {
// For other case which needs to write chunk data to data blobs.
let (compressed, is_compressed) = compress::compress(chunk_data, ctx.compressor)
.with_context(|| format!("failed to compress node file {:?}", self.path()))?;
let compressed_size = compressed.len() as u32;
let pre_compressed_offset = blob_ctx.current_compressed_offset;
blob_writer
.write_all(&compressed)
.context("failed to write blob")?;
blob_ctx.blob_hash.update(&compressed);
blob_ctx.current_compressed_offset += compressed_size as u64;
blob_ctx.compressed_blob_size += compressed_size as u64;
let mut chunk_info = None;

if self.inode.child_count() == 1
&& uncompressed_size < ctx.batch_size / 2
&& ctx.blob_batch_generator.is_some()
{
// This chunk will be added into a batch chunk.
let mut batch = ctx.blob_batch_generator.as_ref().unwrap().lock().unwrap();

if batch.chunk_data_buf_len() as u32 + uncompressed_size < ctx.batch_size {
// Add into current batch chunk directly.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
batch.append_chunk_data_buf(chunk_data);
} else {
// Dump current batch chunk if exists, and then add into a new batch chunk.
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_compressed_offset, compressed_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.clear_chunk_data_buf();
}

// Add into a new batch chunk.
chunk_info =
Some(batch.generate_chunk_info(pre_uncompressed_offset, uncompressed_size)?);
batch.append_chunk_data_buf(chunk_data);
}
} else if !ctx.blob_features.contains(BlobFeatures::SEPARATE) {
// For other case which needs to write chunk data to data blobs.
let (pre_compressed_offset, compressed_size, is_compressed) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, chunk_data)
.with_context(|| format!("failed to write chunk data {:?}", self.path()))?;
chunk.set_compressed_offset(pre_compressed_offset);
chunk.set_compressed_size(compressed_size);
chunk.set_compressed(is_compressed);
compressed_size
};
}

event_tracer!("blob_uncompressed_size", +uncompressed_size);
event_tracer!("blob_compressed_size", +compressed_size);

Ok(())
Ok(chunk_info)
}

pub fn write_chunk_data(
ctx: &BuildContext,
blob_ctx: &mut BlobContext,
blob_writer: &mut ArtifactWriter,
chunk_data: &[u8],
) -> Result<(u64, u32, bool)> {
let (compressed, is_compressed) = compress::compress(chunk_data, ctx.compressor)
.with_context(|| "failed to compress node file".to_string())?;
let compressed_size = compressed.len() as u32;
let pre_compressed_offset = blob_ctx.current_compressed_offset;
blob_writer
.write_all(&compressed)
.context("failed to write blob")?;
blob_ctx.blob_hash.update(&compressed);
blob_ctx.current_compressed_offset += compressed_size as u64;
blob_ctx.compressed_blob_size += compressed_size as u64;

Ok((pre_compressed_offset, compressed_size, is_compressed))
}

fn deduplicate_chunk(
Expand Down
21 changes: 21 additions & 0 deletions rafs/src/metadata/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,27 @@ impl ChunkWrapper {
}
}

/// Set flag for whether chunk is batch chunk.
pub fn set_batch(&mut self, batch: bool) {
self.ensure_owned();
match self {
ChunkWrapper::V5(c) => c.flags.set(BlobChunkFlags::BATCH, batch),
ChunkWrapper::V6(c) => c.flags.set(BlobChunkFlags::BATCH, batch),
ChunkWrapper::Ref(_c) => panic!("unexpected"),
}
}

/// Check whether the chunk is batch chunk or not.
pub fn is_batch(&self) -> bool {
match self {
ChunkWrapper::V5(c) => c.flags.contains(BlobChunkFlags::BATCH),
ChunkWrapper::V6(c) => c.flags.contains(BlobChunkFlags::BATCH),
ChunkWrapper::Ref(c) => as_blob_v5_chunk_info(c.deref())
.flags()
.contains(BlobChunkFlags::BATCH),
}
}

#[allow(clippy::too_many_arguments)]
/// Set a group of chunk information fields.
pub fn set_chunk_info(
Expand Down
4 changes: 2 additions & 2 deletions rafs/src/metadata/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ impl InodeWrapper {
}
}

/// Set inode content size of regular file, directory and symlink.
/// Get inode content size of regular file, directory and symlink.
pub fn size(&self) -> u64 {
match self {
InodeWrapper::V5(i) => i.i_size,
Expand All @@ -285,7 +285,7 @@ impl InodeWrapper {
}
}

/// Get inode content size.
/// Set inode content size.
pub fn set_size(&mut self, size: u64) {
self.ensure_owned();
match self {
Expand Down
7 changes: 5 additions & 2 deletions rafs/src/metadata/layout/v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1634,7 +1634,8 @@ impl RafsV6Blob {

let count = chunk_count as u64;
if blob_features.contains(BlobFeatures::CHUNK_INFO_V2)
&& blob_features.contains(BlobFeatures::ZRAN)
&& (blob_features.contains(BlobFeatures::BATCH)
|| blob_features.contains(BlobFeatures::ZRAN))
{
if ci_uncompr_size < count * size_of::<BlobChunkInfoV2Ondisk>() as u64 {
error!(
Expand All @@ -1651,7 +1652,9 @@ impl RafsV6Blob {
);
return false;
}
} else if blob_features.contains(BlobFeatures::ZRAN) {
} else if blob_features.contains(BlobFeatures::BATCH)
|| blob_features.contains(BlobFeatures::ZRAN)
{
error!(
"RafsV6Blob: idx {} invalid feature bits {}",
blob_index,
Expand Down
Loading