Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Wenhao Ren <[email protected]>
  • Loading branch information
hangvane committed Jan 18, 2024
1 parent 596492b commit aa354d7
Show file tree
Hide file tree
Showing 23 changed files with 1,245 additions and 22 deletions.
31 changes: 16 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions builder/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ mod tests {
use super::*;
use nydus_api::ConfigV2;
use nydus_rafs::metadata::RafsSuperConfig;
use nydus_storage::backend::registry::StreamCallback;
use nydus_storage::backend::{BackendResult, BlobReader};
use nydus_storage::device::v5::BlobV5ChunkInfo;
use nydus_storage::device::{BlobChunkFlags, BlobChunkInfo, BlobFeatures};
Expand Down Expand Up @@ -759,6 +760,16 @@ mod tests {
Ok(i)
}

fn try_read_stream(
&self,
_offset: u64,
_size: u32,
_processed: &mut u64,
_f: &mut StreamCallback,
) -> BackendResult<()> {
unimplemented!()
}

fn metrics(&self) -> &BackendMetrics {
// Safe because nydusd must have backend attached with id, only image builder can no id
// but use backend instance to upload blob.
Expand Down
68 changes: 66 additions & 2 deletions rafs/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,10 @@ impl Rafs {
}
if self.fs_prefetch {
// Device should be ready before any prefetch.
self.device.start_prefetch();
self.prefetch(r, prefetch_files);
// self.device.start_prefetch();
// self.prefetch(r, prefetch_files);
self.device.start_stream_prefetch();
self.new_stream_prefetch(r, prefetch_files);
}
self.initialized = true;

Expand Down Expand Up @@ -327,6 +329,7 @@ impl Rafs {
}

impl Rafs {
#[allow(unused)]
fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {
let sb = self.sb.clone();
let device = self.device.clone();
Expand All @@ -338,6 +341,26 @@ impl Rafs {
});
}

#[allow(unused)]
fn new_stream_prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {
debug!("CMDebug: new_stream_prefetch2");
let sb = self.sb.clone();
let device = self.device.clone();
let prefetch_all = self.prefetch_all;
let root_ino = self.root_ino();

let _ = std::thread::spawn(move || {
Self::new_do_stream_prefetch(
root_ino,
reader,
prefetch_files,
prefetch_all,
sb,
device,
);
});
}

/// for blobfs
pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> Result<()> {
self.device.fetch_range_synchronous(prefetches)
Expand All @@ -347,6 +370,7 @@ impl Rafs {
self.sb.superblock.root_ino()
}

#[allow(unused)]
fn do_prefetch(
root_ino: u64,
mut reader: RafsIoReader,
Expand Down Expand Up @@ -472,6 +496,46 @@ impl Rafs {
}
}

fn new_do_stream_prefetch(
root_ino: u64,
mut reader: RafsIoReader,
prefetch_files: Option<Vec<PathBuf>>,
_prefetch_all: bool,
sb: Arc<RafsSuper>,
device: BlobDevice,
) {
debug!("CMDebug: new_do_stream_prefetch2");
// Bootstrap has non-empty prefetch table indicating a full prefetch
let inlay_prefetch_all = sb
.is_inlay_prefetch_all(&mut reader)
.map_err(|e| error!("Detect prefetch table error {}", e))
.unwrap_or_default();

// Nydusd has a CLI option indicating a full prefetch
let startup_prefetch_all = prefetch_files
.as_ref()
.map(|f| f.len() == 1 && f[0].as_os_str() == "/")
.unwrap_or(false);

// User specified prefetch files have high priority to be prefetched.
// Moreover, user specified prefetch files list will override those on-disk prefetch table.
if !startup_prefetch_all && !inlay_prefetch_all {
// Then do file based prefetch based on:
// - prefetch listed passed in by user
// - or file prefetch list in metadata
// TODO: change this to iterator
let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb));
let res = sb.new_stream_prefetch_files(&device, &mut reader, root_ino, inodes);
match res {
Ok(true) => {
info!("Root inode was found, but it should not prefetch all files!")
}
Ok(false) => {}
Err(e) => error!("No file to be prefetched {:?}", e),
}
}
}

fn convert_file_list(files: &[PathBuf], sb: &Arc<RafsSuper>) -> Vec<Inode> {
let mut inodes = Vec::<Inode>::with_capacity(files.len());

Expand Down
1 change: 1 addition & 0 deletions rafs/src/metadata/direct_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ impl RafsInode for OndiskInodeWrapper {
curr_chunk_index == tail_chunk_index,
)
.ok_or_else(|| einval!("failed to get chunk information"))?;
//TODO:这里应该考虑某个中间的chunk的blob_index不同的情况
if desc.blob.blob_index() != descs.blob_index() {
vec.push(descs);
descs = BlobIoVec::new(desc.blob.clone());
Expand Down
61 changes: 61 additions & 0 deletions rafs/src/metadata/md_v6.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,67 @@ impl RafsSuper {

Ok(found_root_inode)
}

pub(crate) fn new_stream_prefetch_data_v6(
&self,
device: &BlobDevice,
r: &mut RafsIoReader,
root_ino: Inode,
) -> RafsResult<bool> {
debug!("CMDebug: new_stream_prefetch_data_v62");
let hint_entries = self.meta.prefetch_table_entries as usize;
if hint_entries == 0 {
return Ok(false);
}

// Try to prefetch according to the list of files specified by the
// builder's `--prefetch-policy fs` option.
let mut prefetch_table = RafsV6PrefetchTable::new();
prefetch_table
.load_prefetch_table_from(r, self.meta.prefetch_table_offset, hint_entries)
.map_err(|e| {
error!("Failed in loading hint prefetch table at offset {}", e);
RafsError::Prefetch(format!(
"Failed in loading hint prefetch table at offset {}. {:?}",
self.meta.prefetch_table_offset, e
))
})?;
debug!("prefetch table contents {:?}", prefetch_table);

let mut hardlinks: HashSet<u64> = HashSet::new();
let mut fetched_ranges: HashMap<u32, HashSet<u64>> = HashMap::new();
let mut found_root_inode = false;
for ino in prefetch_table.inodes {
// Inode number 0 is invalid, it was added because prefetch table has to be aligned.
if ino == 0 {
break;
}
if ino as Inode == root_ino {
found_root_inode = true;
}
// debug!("CMDebug: hint prefetch inode {}", ino);

let ranges = self
.get_inode_ranges(ino as u64, &mut hardlinks, &mut fetched_ranges, device)
.map_err(|e| {
RafsError::Prefetch(format!("Failed in get inode chunk ranges. {:?}", e))
})?;

// debug!("CMDebug: prefetch inode: {}, ranges: {:?}", ino, ranges);

for r in ranges {
device.new_stream_prefetch(r).map_err(|e| {
RafsError::Prefetch(format!("Failed to add inode prefetch range. {:?}", e))
})?;
}
}

device.flush_stream_prefetch().map_err(|e| {
RafsError::Prefetch(format!("Failed to flush inode prefetch range. {:?}", e))
})?;

Ok(found_root_inode)
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit aa354d7

Please sign in to comment.