Skip to content

Commit

Permalink
storage: fix batch chunk runtime error
Browse files Browse the repository at this point in the history
1. rewrite _add_more_chunks() to fix the runtime error for batch chunks due to discontinuous chunk types.
2. change the error type to display the runtime error tips correctly.
3. add more runtime outputs for zran and batch chunks.
4. accelerate the lazy load of the blob meta context.
5. add the runtime metadata validation for batch chunks.
6. add unit test for _add_more_chunks().

Signed-off-by: Wenhao Ren <[email protected]>
  • Loading branch information
hangvane committed Oct 24, 2023
1 parent 9632d18 commit 62c345e
Show file tree
Hide file tree
Showing 19 changed files with 741 additions and 266 deletions.
4 changes: 2 additions & 2 deletions builder/src/core/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ impl Blob {
if let Some((_, blob_ctx)) = blob_mgr.get_current_blob() {
let mut batch = batch.lock().unwrap();
if !batch.chunk_data_buf_is_empty() {
let (pre_compressed_offset, compressed_size, _) = Node::write_chunk_data(
let (_, compressed_size, _) = Node::write_chunk_data(
&ctx,
blob_ctx,
blob_writer,
batch.chunk_data_buf(),
)?;
batch.add_context(pre_compressed_offset, compressed_size);
batch.add_context(compressed_size);
batch.clear_chunk_data_buf();
}
}
Expand Down
59 changes: 41 additions & 18 deletions builder/src/core/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,23 @@ impl Node {
chunk.set_blob_index(blob_index);
chunk.set_index(chunk_index);
chunk.set_file_offset(file_offset);
let mut dumped_size = chunk.compressed_size();
if ctx.conversion_type == ConversionType::TarToTarfs {
chunk.set_uncompressed_offset(chunk.compressed_offset());
chunk.set_uncompressed_size(chunk.compressed_size());
} else if let Some(info) =
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?
{
chunk_info = Some(info);
} else {
let (info, d_size) =
self.dump_file_chunk(ctx, blob_ctx, blob_writer, chunk_data, &mut chunk)?;
if info.is_some() {
chunk_info = info;
}
if let Some(d_size) = d_size {
dumped_size = d_size;
}
}

let chunk = Arc::new(chunk);
blob_size += chunk.compressed_size() as u64;
blob_size += dumped_size as u64;
if ctx.conversion_type != ConversionType::TarToTarfs {
blob_ctx.add_chunk_meta_info(&chunk, chunk_info)?;
blob_mgr
Expand Down Expand Up @@ -388,15 +394,18 @@ impl Node {
}

/// Dump a chunk from u8 slice into the data blob.
/// Return `BlobChunkInfoV2Ondisk` when the chunk is added into a batch chunk.
/// Return `BlobChunkInfoV2Ondisk` iff the chunk is added into a batch chunk.
/// Return dumped size iff not `BlobFeatures::SEPARATE`.
/// Dumped size can be zero if chunk data is cached in Batch Generator,
/// and may contain previous chunk data cached in Batch Generator.
fn dump_file_chunk(
&self,
ctx: &BuildContext,
blob_ctx: &mut BlobContext,
blob_writer: &mut dyn Artifact,
chunk_data: &[u8],
chunk: &mut ChunkWrapper,
) -> Result<Option<BlobChunkInfoV2Ondisk>> {
) -> Result<(Option<BlobChunkInfoV2Ondisk>, Option<u32>)> {
let d_size = chunk_data.len() as u32;
let aligned_d_size = if ctx.aligned_chunk {
// Safe to unwrap because `chunk_size` is much less than u32::MAX.
Expand All @@ -412,51 +421,65 @@ impl Node {

let mut chunk_info = None;
let encrypted = blob_ctx.blob_cipher != crypt::Algorithm::None;
let mut dumped_size = None;

if self.inode.child_count() == 1
if ctx.blob_batch_generator.is_some()
&& self.inode.child_count() == 1
&& d_size < ctx.batch_size / 2
&& ctx.blob_batch_generator.is_some()
{
// This chunk will be added into a batch chunk.
let mut batch = ctx.blob_batch_generator.as_ref().unwrap().lock().unwrap();

if batch.chunk_data_buf_len() as u32 + d_size < ctx.batch_size {
// Add into current batch chunk directly.
chunk_info = Some(batch.generate_chunk_info(pre_d_offset, d_size, encrypted)?);
chunk_info = Some(batch.generate_chunk_info(
blob_ctx.current_compressed_offset,
pre_d_offset,
d_size,
encrypted,
)?);
batch.append_chunk_data_buf(chunk_data);
} else {
// Dump current batch chunk if exists, and then add into a new batch chunk.
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_c_offset, c_size, _) =
let (_, c_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_c_offset, c_size);
dumped_size = Some(c_size);
batch.add_context(c_size);
batch.clear_chunk_data_buf();
}

// Add into a new batch chunk.
chunk_info = Some(batch.generate_chunk_info(pre_d_offset, d_size, encrypted)?);
chunk_info = Some(batch.generate_chunk_info(
blob_ctx.current_compressed_offset,
pre_d_offset,
d_size,
encrypted,
)?);
batch.append_chunk_data_buf(chunk_data);
}
} else if !ctx.blob_features.contains(BlobFeatures::SEPARATE) {
// For other case which needs to write chunk data to data blobs.
// For other case which needs to write chunk data to data blobs. Which means,
// `tar-ref`, `targz-ref`, `estargz-ref`, and `estargzindex-ref`, are excluded.

// Interrupt and dump buffered batch chunks.
// TODO: cancel the interruption.
if let Some(batch) = &ctx.blob_batch_generator {
let mut batch = batch.lock().unwrap();
if !batch.chunk_data_buf_is_empty() {
// Dump current batch chunk.
let (pre_c_offset, c_size, _) =
let (_, c_size, _) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, batch.chunk_data_buf())?;
batch.add_context(pre_c_offset, c_size);
dumped_size = Some(c_size);
batch.add_context(c_size);
batch.clear_chunk_data_buf();
}
}

let (pre_c_offset, c_size, is_compressed) =
Self::write_chunk_data(ctx, blob_ctx, blob_writer, chunk_data)
.with_context(|| format!("failed to write chunk data {:?}", self.path()))?;
dumped_size = Some(dumped_size.unwrap_or(0) + c_size);
chunk.set_compressed_offset(pre_c_offset);
chunk.set_compressed_size(c_size);
chunk.set_compressed(is_compressed);
Expand All @@ -467,7 +490,7 @@ impl Node {
}
event_tracer!("blob_uncompressed_size", +d_size);

Ok(chunk_info)
Ok((chunk_info, dumped_size))
}

pub fn write_chunk_data(
Expand Down
4 changes: 4 additions & 0 deletions rafs/src/metadata/cached_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,10 @@ impl BlobChunkInfo for CachedChunkInfoV5 {
self.index()
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.flags.contains(BlobChunkFlags::COMPRESSED)
}
Expand Down
4 changes: 4 additions & 0 deletions rafs/src/metadata/direct_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,10 @@ impl BlobChunkInfo for DirectChunkInfoV5 {
self.index()
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.chunk(self.state().deref())
.flags
Expand Down
9 changes: 9 additions & 0 deletions rafs/src/metadata/direct_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,11 @@ impl BlobChunkInfo for DirectChunkInfoV6 {
self.index()
}

fn is_batch(&self) -> bool {
let state = self.state();
self.v5_chunk(&state).flags.contains(BlobChunkFlags::BATCH)
}

fn is_compressed(&self) -> bool {
let state = self.state();
self.v5_chunk(&state)
Expand Down Expand Up @@ -1535,6 +1540,10 @@ impl BlobChunkInfo for TarfsChunkInfoV6 {
self.size
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
false
}
Expand Down
4 changes: 4 additions & 0 deletions rafs/src/metadata/layout/v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,10 @@ pub mod tests {
self.index
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.flags.contains(BlobChunkFlags::COMPRESSED)
}
Expand Down
4 changes: 4 additions & 0 deletions rafs/src/metadata/md_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ impl BlobChunkInfo for V5IoChunk {
self.index
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.flags.contains(BlobChunkFlags::COMPRESSED)
}
Expand Down
4 changes: 4 additions & 0 deletions rafs/src/mock/mock_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl BlobChunkInfo for MockChunkInfo {
self.c_index
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.c_flags.contains(BlobChunkFlags::COMPRESSED)
}
Expand Down
2 changes: 1 addition & 1 deletion src/bin/nydus-image/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,7 @@ impl Command {
u32::from_str_radix(&v[2..], 16).context(format!("invalid batch size {}", v))?
} else {
v.parse::<u32>()
.context(format!("invalid chunk size {}", v))?
.context(format!("invalid batch size {}", v))?
};
if batch_size > 0 {
if version.is_v5() {
Expand Down
4 changes: 4 additions & 0 deletions src/bin/nydus-image/unpack/pax/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ impl BlobChunkInfo for MockChunkInfo {
self.uncompress_size
}

fn is_batch(&self) -> bool {
false
}

fn is_compressed(&self) -> bool {
self.is_compressed
}
Expand Down
Loading

0 comments on commit 62c345e

Please sign in to comment.