From aa354d74f9dd0f59d6a39cff60a2481619b2f342 Mon Sep 17 00:00:00 2001 From: Wenhao Ren Date: Thu, 18 Jan 2024 20:18:35 +0800 Subject: [PATCH] initial commit Signed-off-by: Wenhao Ren --- Cargo.lock | 31 +-- builder/src/compact.rs | 11 + rafs/src/fs.rs | 68 +++++- rafs/src/metadata/direct_v6.rs | 1 + rafs/src/metadata/md_v6.rs | 61 +++++ rafs/src/metadata/mod.rs | 124 ++++++++++ src/bin/nydus-image/unpack/pax/test.rs | 11 + storage/Cargo.toml | 1 + storage/src/backend/http_proxy.rs | 11 + storage/src/backend/localdisk.rs | 12 + storage/src/backend/localfs.rs | 12 + storage/src/backend/mod.rs | 134 ++++++++++- storage/src/backend/object_storage.rs | 11 + storage/src/backend/registry.rs | 158 +++++++++++++ storage/src/cache/cachedfile.rs | 184 +++++++++++++++ storage/src/cache/dummycache.rs | 25 +- storage/src/cache/filecache/mod.rs | 10 +- storage/src/cache/fscache/mod.rs | 10 +- storage/src/cache/mod.rs | 17 ++ storage/src/cache/streaming.rs | 313 +++++++++++++++++++++++++ storage/src/device.rs | 40 +++- storage/src/meta/mod.rs | 11 + storage/src/test.rs | 11 + 23 files changed, 1245 insertions(+), 22 deletions(-) create mode 100644 storage/src/cache/streaming.rs diff --git a/Cargo.lock b/Cargo.lock index 04da1334ec7..9d29d2a716a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -586,53 +586,53 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 1.0.98", + "syn 2.0.37", ] [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", "futures-io", @@ -1362,6 +1362,7 @@ dependencies = [ "httpdate", "hyper", "hyperlocal", + "indexmap", "lazy_static", "leaky-bucket", "libc", diff --git a/builder/src/compact.rs b/builder/src/compact.rs index 97cd6594b17..4fb6cf723f1 100644 --- a/builder/src/compact.rs +++ b/builder/src/compact.rs @@ -669,6 +669,7 @@ mod tests { use super::*; use nydus_api::ConfigV2; use nydus_rafs::metadata::RafsSuperConfig; + use nydus_storage::backend::registry::StreamCallback; use nydus_storage::backend::{BackendResult, BlobReader}; use nydus_storage::device::v5::BlobV5ChunkInfo; use nydus_storage::device::{BlobChunkFlags, BlobChunkInfo, BlobFeatures}; @@ -759,6 +760,16 @@ mod tests { Ok(i) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &BackendMetrics { // Safe because nydusd must have backend attached with id, only image builder can no id // but use backend instance to upload blob. diff --git a/rafs/src/fs.rs b/rafs/src/fs.rs index 15a1848c4ce..d4d6b0852cc 100644 --- a/rafs/src/fs.rs +++ b/rafs/src/fs.rs @@ -175,8 +175,10 @@ impl Rafs { } if self.fs_prefetch { // Device should be ready before any prefetch. - self.device.start_prefetch(); - self.prefetch(r, prefetch_files); + // self.device.start_prefetch(); + // self.prefetch(r, prefetch_files); + self.device.start_stream_prefetch(); + self.new_stream_prefetch(r, prefetch_files); } self.initialized = true; @@ -327,6 +329,7 @@ impl Rafs { } impl Rafs { + #[allow(unused)] fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option>) { let sb = self.sb.clone(); let device = self.device.clone(); @@ -338,6 +341,26 @@ impl Rafs { }); } + #[allow(unused)] + fn new_stream_prefetch(&self, reader: RafsIoReader, prefetch_files: Option>) { + debug!("CMDebug: new_stream_prefetch2"); + let sb = self.sb.clone(); + let device = self.device.clone(); + let prefetch_all = self.prefetch_all; + let root_ino = self.root_ino(); + + let _ = std::thread::spawn(move || { + Self::new_do_stream_prefetch( + root_ino, + reader, + prefetch_files, + prefetch_all, + sb, + device, + ); + }); + } + /// for blobfs pub fn fetch_range_synchronous(&self, prefetches: &[BlobPrefetchRequest]) -> Result<()> { self.device.fetch_range_synchronous(prefetches) @@ -347,6 +370,7 @@ impl Rafs { self.sb.superblock.root_ino() } + #[allow(unused)] fn do_prefetch( root_ino: u64, mut reader: RafsIoReader, @@ -472,6 +496,46 @@ impl Rafs { } } + fn new_do_stream_prefetch( + root_ino: u64, + mut reader: RafsIoReader, + prefetch_files: Option>, + _prefetch_all: bool, + sb: Arc, + device: BlobDevice, + ) { + debug!("CMDebug: new_do_stream_prefetch2"); + // Bootstrap has non-empty prefetch table indicating a full prefetch + let inlay_prefetch_all = sb + .is_inlay_prefetch_all(&mut reader) + .map_err(|e| error!("Detect prefetch table error {}", e)) + .unwrap_or_default(); + + // Nydusd has a CLI option indicating a full prefetch + let startup_prefetch_all = prefetch_files + .as_ref() + .map(|f| f.len() == 1 && f[0].as_os_str() == "/") + .unwrap_or(false); + + // User specified prefetch files have high priority to be prefetched. + // Moreover, user specified prefetch files list will override those on-disk prefetch table. + if !startup_prefetch_all && !inlay_prefetch_all { + // Then do file based prefetch based on: + // - prefetch listed passed in by user + // - or file prefetch list in metadata + // TODO: change this to iterator + let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb)); + let res = sb.new_stream_prefetch_files(&device, &mut reader, root_ino, inodes); + match res { + Ok(true) => { + info!("Root inode was found, but it should not prefetch all files!") + } + Ok(false) => {} + Err(e) => error!("No file to be prefetched {:?}", e), + } + } + } + fn convert_file_list(files: &[PathBuf], sb: &Arc) -> Vec { let mut inodes = Vec::::with_capacity(files.len()); diff --git a/rafs/src/metadata/direct_v6.rs b/rafs/src/metadata/direct_v6.rs index 831e2ab41ca..81d7d9f26d4 100644 --- a/rafs/src/metadata/direct_v6.rs +++ b/rafs/src/metadata/direct_v6.rs @@ -847,6 +847,7 @@ impl RafsInode for OndiskInodeWrapper { curr_chunk_index == tail_chunk_index, ) .ok_or_else(|| einval!("failed to get chunk information"))?; + //TODO:这里应该考虑某个中间的chunk的blob_index不同的情况 if desc.blob.blob_index() != descs.blob_index() { vec.push(descs); descs = BlobIoVec::new(desc.blob.clone()); diff --git a/rafs/src/metadata/md_v6.rs b/rafs/src/metadata/md_v6.rs index a0b11d0dd00..f96666087f3 100644 --- a/rafs/src/metadata/md_v6.rs +++ b/rafs/src/metadata/md_v6.rs @@ -151,6 +151,67 @@ impl RafsSuper { Ok(found_root_inode) } + + pub(crate) fn new_stream_prefetch_data_v6( + &self, + device: &BlobDevice, + r: &mut RafsIoReader, + root_ino: Inode, + ) -> RafsResult { + debug!("CMDebug: new_stream_prefetch_data_v62"); + let hint_entries = self.meta.prefetch_table_entries as usize; + if hint_entries == 0 { + return Ok(false); + } + + // Try to prefetch according to the list of files specified by the + // builder's `--prefetch-policy fs` option. + let mut prefetch_table = RafsV6PrefetchTable::new(); + prefetch_table + .load_prefetch_table_from(r, self.meta.prefetch_table_offset, hint_entries) + .map_err(|e| { + error!("Failed in loading hint prefetch table at offset {}", e); + RafsError::Prefetch(format!( + "Failed in loading hint prefetch table at offset {}. {:?}", + self.meta.prefetch_table_offset, e + )) + })?; + debug!("prefetch table contents {:?}", prefetch_table); + + let mut hardlinks: HashSet = HashSet::new(); + let mut fetched_ranges: HashMap> = HashMap::new(); + let mut found_root_inode = false; + for ino in prefetch_table.inodes { + // Inode number 0 is invalid, it was added because prefetch table has to be aligned. + if ino == 0 { + break; + } + if ino as Inode == root_ino { + found_root_inode = true; + } + // debug!("CMDebug: hint prefetch inode {}", ino); + + let ranges = self + .get_inode_ranges(ino as u64, &mut hardlinks, &mut fetched_ranges, device) + .map_err(|e| { + RafsError::Prefetch(format!("Failed in get inode chunk ranges. {:?}", e)) + })?; + + // debug!("CMDebug: prefetch inode: {}, ranges: {:?}", ino, ranges); + + for r in ranges { + device.new_stream_prefetch(r).map_err(|e| { + RafsError::Prefetch(format!("Failed to add inode prefetch range. {:?}", e)) + })?; + } + } + + device.flush_stream_prefetch().map_err(|e| { + RafsError::Prefetch(format!("Failed to flush inode prefetch range. {:?}", e)) + })?; + + Ok(found_root_inode) + } } #[cfg(test)] diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index 0c12fc3dd88..24d56437d0e 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -17,6 +17,7 @@ use std::path::{Component, Path, PathBuf}; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; +use storage::device::BlobRange; use thiserror::Error; use anyhow::{bail, ensure}; @@ -962,6 +963,30 @@ impl RafsSuper { } } + pub fn new_stream_prefetch_files( + &self, + device: &BlobDevice, + r: &mut RafsIoReader, + root_ino: Inode, + files: Option>, + ) -> RafsResult { + debug!("CMDebug: new_stream_prefetch_files2"); + // Try to prefetch files according to the list specified by the `--prefetch-files` option. + if let Some(_files) = files { + unimplemented!(); + } else if self.meta.is_v5() { + Err(RafsError::Prefetch( + "Unsupported filesystem version, prefetch disabled".to_string(), + )) + } else if self.meta.is_v6() { + self.new_stream_prefetch_data_v6(device, r, root_ino) + } else { + Err(RafsError::Prefetch( + "Unknown filesystem version, prefetch disabled".to_string(), + )) + } + } + #[inline] fn prefetch_inode( device: &BlobDevice, @@ -1020,6 +1045,105 @@ impl RafsSuper { Ok(()) } + + #[inline] + fn get_inode_ranges_inner( + inode: &Arc, + hardlinks: &mut HashSet, + fetched_ranges: &mut HashMap>, + ranges: &mut Vec, + device: &BlobDevice, + ) -> Result<()> { + // Check for duplicated hardlinks. + if inode.is_hardlink() { + if hardlinks.contains(&inode.ino()) { + return Ok(()); + } else { + hardlinks.insert(inode.ino()); + } + } + + let mut bi_vecs = inode.alloc_bio_vecs(device, 0, inode.size() as usize, false)?; + for bi_vec in &mut bi_vecs { + // bi_vec + // .bi_vec + // .sort_by_key(|c| c.chunkinfo.compressed_offset()); + //每个bi_vec是单个blob,但里面可能不连续,需要对里面的每个desc判断是否能合并 + let mut i = 0; + 'vec: while i < bi_vec.len() { + let blob_idx = bi_vec.blob_index(); + let c_offset = bi_vec.bi_vec[i].chunkinfo.compressed_offset(); + let c_end = bi_vec.bi_vec[i].chunkinfo.compressed_end(); + + // 判断这个chunk是否已经下载过 + let fetched_blob_ranges = fetched_ranges.entry(blob_idx).or_insert(HashSet::new()); + if fetched_blob_ranges.contains(&c_offset) { + i += 1; + continue; + } + + // 尝试merge 进已有的range + // TODO:如果chunk在同一个blob,但是乱序的,是否存在这种情况,如何处理 + for r in &mut *ranges { + //先匹配blob + if r.blob_idx == blob_idx { + // 再看is_continuous + // TODO:对特殊格式进行处理 + if r.end == c_offset { + r.end = c_end; + fetched_blob_ranges.insert(c_offset); + i += 1; + continue 'vec; + } + } + } + // 放到一个新range里 + ranges.push(BlobRange { + blob_idx: bi_vec.blob_index(), + offset: c_offset, + end: c_end, + }); + fetched_blob_ranges.insert(c_offset); + i += 1; + } + } + + Ok(()) + } + + fn get_inode_ranges( + &self, + ino: u64, + hardlinks: &mut HashSet, + fetched_ranges: &mut HashMap>, + device: &BlobDevice, + ) -> Result> { + let inode = self + .superblock + .get_inode(ino, self.validate_digest) + .map_err(|_e| enoent!("Can't find inode"))?; + + let mut ranges = Vec::new(); + + if inode.is_dir() { + let mut descendants = Vec::new(); + let _ = inode.collect_descendants_inodes(&mut descendants)?; + for i in descendants.iter() { + Self::get_inode_ranges_inner(&i, hardlinks, fetched_ranges, &mut ranges, device)?; + } + } else if !inode.is_empty_size() && inode.is_reg() { + // An empty regular file will also be packed into nydus image, + // then it has a size of zero. + // Moreover, for rafs v5, symlink has size of zero but non-zero size + // for symlink size. For rafs v6, symlink size is also represented by i_size. + // So we have to restrain the condition here. + // debug!("CMDebug: 2"); + + Self::get_inode_ranges_inner(&inode, hardlinks, fetched_ranges, &mut ranges, device)?; + } + + Ok(ranges) + } } // For nydus-image diff --git a/src/bin/nydus-image/unpack/pax/test.rs b/src/bin/nydus-image/unpack/pax/test.rs index fcb86574ad5..b71d984ea18 100644 --- a/src/bin/nydus-image/unpack/pax/test.rs +++ b/src/bin/nydus-image/unpack/pax/test.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::{io::Read, sync::Arc}; +use nydus_storage::backend::registry::StreamCallback; use nydus_storage::backend::{BackendResult, BlobReader}; use nydus_storage::device::BlobChunkInfo; use nydus_utils::compress::{self, Algorithm}; @@ -35,6 +36,16 @@ impl BlobReader for MockBlobReader { Ok(end - offset) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &BackendMetrics { self.metrics.as_ref() } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 44197ec108f..1a12bc0b4ea 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -18,6 +18,7 @@ http = { version = "0.2.8", optional = true } httpdate = { version = "1.0", optional = true } hyper = { version = "0.14.11", optional = true } hyperlocal = { version = "0.8.0", optional = true } +indexmap = "1" lazy_static = "1.4.0" leaky-bucket = { version = "0.12.1", optional = true } libc = "0.2" diff --git a/storage/src/backend/http_proxy.rs b/storage/src/backend/http_proxy.rs index c1324fbef78..bccdec92619 100644 --- a/storage/src/backend/http_proxy.rs +++ b/storage/src/backend/http_proxy.rs @@ -15,6 +15,7 @@ use reqwest; use tokio::runtime::Runtime; use super::connection::{Connection, ConnectionConfig, ConnectionError}; +use super::registry::StreamCallback; use super::{BackendError, BackendResult, BlobBackend, BlobReader}; use std::path::Path; use std::{ @@ -265,6 +266,16 @@ impl BlobReader for HttpProxyReader { } } + fn try_read_stream( + &self, + _: u64, + _: u32, + _: &mut u64, + _: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &nydus_utils::metrics::BackendMetrics { &self.metrics } diff --git a/storage/src/backend/localdisk.rs b/storage/src/backend/localdisk.rs index 4475797d5ad..9ec9514b3f6 100644 --- a/storage/src/backend/localdisk.rs +++ b/storage/src/backend/localdisk.rs @@ -20,6 +20,8 @@ use nydus_utils::metrics::BackendMetrics; use crate::backend::{BackendError, BackendResult, BlobBackend, BlobReader}; use crate::utils::{readv, MemSliceCursor}; +use super::registry::StreamCallback; + type LocalDiskResult = std::result::Result; /// Error codes related to localdisk storage backend. @@ -91,6 +93,16 @@ impl BlobReader for LocalDiskBlob { }) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn readv( &self, bufs: &[FileVolatileSlice], diff --git a/storage/src/backend/localfs.rs b/storage/src/backend/localfs.rs index 6168a1903da..6cea09e47d5 100644 --- a/storage/src/backend/localfs.rs +++ b/storage/src/backend/localfs.rs @@ -21,6 +21,8 @@ use nydus_utils::metrics::BackendMetrics; use crate::backend::{BackendError, BackendResult, BlobBackend, BlobReader}; use crate::utils::{readv, MemSliceCursor}; +use super::registry::StreamCallback; + type LocalFsResult = std::result::Result; /// Error codes related to localfs storage backend. @@ -66,6 +68,16 @@ impl BlobReader for LocalFsEntry { }) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn readv( &self, bufs: &[FileVolatileSlice], diff --git a/storage/src/backend/mod.rs b/storage/src/backend/mod.rs index aec8db9de06..d0889657349 100644 --- a/storage/src/backend/mod.rs +++ b/storage/src/backend/mod.rs @@ -14,8 +14,8 @@ //! prefetching, which is to load data into page cache. //! - [LocalDisk](localdisk/struct.LocalDisk.html): backend driver to access blobs on local disk. -use std::fmt; use std::io::Read; +use std::{fmt, thread}; use std::{sync::Arc, time::Duration}; use fuse_backend_rs::file_buf::FileVolatileSlice; @@ -23,7 +23,9 @@ use nydus_utils::{ metrics::{BackendMetrics, ERROR_HOLDER}, DelayType, Delayer, }; +use reqwest::blocking::Response; +use crate::backend::registry::StreamCallback; use crate::utils::{alloc_buf, copyv}; use crate::StorageError; @@ -149,6 +151,56 @@ pub trait BlobReader: Send + Sync { } } + /// TODO: + fn try_read_stream( + &self, + offset: u64, + size: u32, + processed: &mut u64, + f: &mut StreamCallback, + ) -> BackendResult<()>; + + /// TODO: + fn read_stream(&self, offset: u64, size: u32, f: &mut StreamCallback) -> BackendResult { + let mut retry_count = self.retry_limit(); + + let mut delayer = Delayer::new(DelayType::BackOff, Duration::from_millis(500)); + + let mut processed_all = 0u64; + loop { + let begin_time = self.metrics().begin(); + + let mut processed = 0u64; + match self.try_read_stream(offset + processed_all as u64, size, &mut processed, f) { + Ok(()) => { + self.metrics().end(&begin_time, processed as usize, false); + return Ok(size as usize); + } + Err(err) => { + if processed > 0 { + self.metrics().end(&begin_time, processed as usize, true); + processed_all += processed; + } + if retry_count > 0 { + warn!( + "Read from backend failed: {:?}, retry count {}", + err, retry_count + ); + retry_count -= 1; + delayer.delay(); + } else { + ERROR_HOLDER + .lock() + .unwrap() + .push(&format!("{:?}", err)) + .unwrap_or_else(|_| error!("Failed when try to hold error")); + return Err(err); + } + } + } + } + } + /// Read as much as possible data into buffer. fn read_all(&self, buf: &mut [u8], offset: u64) -> BackendResult { let mut off = 0usize; @@ -274,3 +326,83 @@ impl Read for BlobBufReader { Ok(sz) } } + +struct ResponseBufReader { + buf: Option>, + pos: usize, + receiver: std::sync::mpsc::Receiver>, +} + +impl ResponseBufReader { + pub fn new(mut res: Response, size: u32) -> Self { + let (sender, receiver) = std::sync::mpsc::channel(); + + // 新开一个线程来流式下载和发送数据 + thread::spawn(move || { + let mut downloaded = 0u32; + while downloaded < size { + let count = std::cmp::min(102400, size - downloaded); + let mut buffer = vec![0u8; count as usize]; + res.read_exact(&mut buffer).unwrap(); + // debug!("sent: {}", buffer.len()); + sender.send(buffer).unwrap(); // 发送数据块到主线程 + downloaded += count as u32; + } + }); + + Self { + buf: None, + pos: 0, + receiver, + } + } +} + +impl Read for ResponseBufReader { + fn read(&mut self, target: &mut [u8]) -> std::io::Result { + let t_len = target.len(); + let mut t_pos = 0; + // read from buffer + if let Some(buf) = self.buf.as_ref() { + let copy_size = std::cmp::min(t_len, buf.len() - self.pos); + target[..copy_size].copy_from_slice(&buf[self.pos..self.pos + copy_size]); + self.pos += copy_size; + if self.pos == buf.len() { + self.buf = None; + self.pos = 0; + } + if copy_size == t_len { + return Ok(t_len); + } + t_pos += copy_size; + } + + // read from channel + while let Ok(data) = self.receiver.recv() { + // debug!( + // "t_pos: {}, t_len: {}, data.len(): {}", + // t_pos, + // t_len, + // data.len() + // ); + let copy_size = std::cmp::min(t_len - t_pos, data.len()); + target[t_pos..t_pos + copy_size].copy_from_slice(&data[..copy_size]); + t_pos += copy_size; + if copy_size == data.len() { + continue; + } + // store buffer before early exit + // assert!(copy_size < res_data.len()); + self.buf = Some(data); + self.pos = copy_size; + break; + } + if t_pos < t_len { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("read size is too big! {}", t_pos), + )); + } + Ok(t_len) + } +} diff --git a/storage/src/backend/object_storage.rs b/storage/src/backend/object_storage.rs index 7c2b8ba655c..3ac6a240ae7 100644 --- a/storage/src/backend/object_storage.rs +++ b/storage/src/backend/object_storage.rs @@ -17,6 +17,7 @@ use reqwest::Method; use nydus_utils::metrics::BackendMetrics; use super::connection::{Connection, ConnectionError}; +use super::registry::StreamCallback; use super::{BackendError, BackendResult, BlobBackend, BlobReader}; /// Error codes related to object storage backend. @@ -136,6 +137,16 @@ where .map(|size| size as usize)?) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &BackendMetrics { &self.metrics } diff --git a/storage/src/backend/registry.rs b/storage/src/backend/registry.rs index a9947f49efb..d4188930f11 100644 --- a/storage/src/backend/registry.rs +++ b/storage/src/backend/registry.rs @@ -27,6 +27,8 @@ use crate::backend::connection::{ }; use crate::backend::{BackendError, BackendResult, BlobBackend, BlobReader}; +use super::ResponseBufReader; + const REGISTRY_CLIENT_ID: &str = "nydus-registry-client"; const HEADER_AUTHORIZATION: &str = "Authorization"; const HEADER_WWW_AUTHENTICATE: &str = "www-authenticate"; @@ -68,6 +70,8 @@ impl From for BackendError { type RegistryResult = std::result::Result; +pub type StreamCallback = dyn FnMut(Box, u64, u64, &mut u64) -> Result<()>; + #[derive(Default)] struct Cache(RwLock); @@ -764,6 +768,147 @@ impl RegistryReader { .map_err(RegistryError::Transport) .map(|size| size as usize) } + + fn _try_read_stream( + &self, + offset: u64, + size: u32, + processed: &mut u64, + f: &mut StreamCallback, + allow_retry: bool, + ) -> RegistryResult<()> { + let url = format!("/blobs/sha256:{}", self.blob_id); + let url = self + .state + .url(url.as_str(), &[]) + .map_err(|e| RegistryError::Url(url, e))?; + let mut headers = HeaderMap::new(); + let end_at = offset + size as u64 - 1; + let range = format!("bytes={}-{}", offset, end_at); + headers.insert("Range", range.parse().unwrap()); + + let mut resp; + let cached_redirect = self.state.cached_redirect.get(&self.blob_id); + + if let Some(cached_redirect) = cached_redirect { + resp = self + .connection + .call::<&[u8]>( + Method::GET, + cached_redirect.as_str(), + None, + None, + &mut headers, + false, + ) + .map_err(RegistryError::Request)?; + + // The request has expired or has been denied, need to re-request + if allow_retry + && [StatusCode::UNAUTHORIZED, StatusCode::FORBIDDEN].contains(&resp.status()) + { + warn!( + "The redirected link has expired: {}, will retry read", + cached_redirect.as_str() + ); + self.state.cached_redirect.remove(&self.blob_id); + // Try read again only once + return self._try_read_stream(offset, size, processed, f, false); + } + } else { + resp = match self.request::<&[u8]>( + Method::GET, + url.as_str(), + None, + headers.clone(), + false, + ) { + Ok(res) => res, + Err(RegistryError::Request(ConnectionError::Common(e))) + if self.state.needs_fallback_http(&e) => + { + self.state.fallback_http(); + let url = format!("/blobs/sha256:{}", self.blob_id); + let url = self + .state + .url(url.as_str(), &[]) + .map_err(|e| RegistryError::Url(url, e))?; + self.request::<&[u8]>(Method::GET, url.as_str(), None, headers.clone(), false)? + } + Err(RegistryError::Request(ConnectionError::Common(e))) => { + if e.to_string().contains("self signed certificate") { + warn!("try to enable \"skip_verify: true\" option"); + } + return Err(RegistryError::Request(ConnectionError::Common(e))); + } + Err(e) => { + return Err(e); + } + }; + let status = resp.status(); + + // Handle redirect request and cache redirect url + if REDIRECTED_STATUS_CODE.contains(&status) { + if let Some(location) = resp.headers().get("location") { + let location = location.to_str().unwrap(); + let mut location = Url::parse(location) + .map_err(|e| RegistryError::Url(location.to_string(), e))?; + // Note: Some P2P proxy server supports only scheme specified origin blob server, + // so we need change scheme to `blob_url_scheme` here + if !self.state.blob_url_scheme.is_empty() { + location + .set_scheme(&self.state.blob_url_scheme) + .map_err(|_| { + RegistryError::Scheme(self.state.blob_url_scheme.clone()) + })?; + } + if !self.state.blob_redirected_host.is_empty() { + location + .set_host(Some(self.state.blob_redirected_host.as_str())) + .map_err(|e| { + error!( + "Failed to set blob redirected host to {}: {:?}", + self.state.blob_redirected_host.as_str(), + e + ); + RegistryError::Url(location.to_string(), e) + })?; + debug!("New redirected location {:?}", location.host_str()); + } + let resp_ret = self + .connection + .call::<&[u8]>( + Method::GET, + location.as_str(), + None, + None, + &mut headers, + true, + ) + .map_err(RegistryError::Request); + match resp_ret { + Ok(_resp) => { + resp = _resp; + self.state + .cached_redirect + .set(self.blob_id.clone(), location.as_str().to_string()) + } + Err(err) => { + return Err(err); + } + } + }; + } else { + resp = respond(resp, true).map_err(RegistryError::Request)?; + } + } + + let buf_reader = ResponseBufReader::new(resp, size); + + f(Box::new(buf_reader), offset, size as u64, processed) + .map_err(|e| RegistryError::Common(e.to_string()))?; + Ok(()) + } } impl BlobReader for RegistryReader { @@ -820,6 +965,19 @@ impl BlobReader for RegistryReader { }) } + fn try_read_stream( + &self, + offset: u64, + size: u32, + processed: &mut u64, + f: &mut StreamCallback, + ) -> BackendResult<()> { + self.first.handle_force(&mut || -> BackendResult<()> { + self._try_read_stream(offset, size, processed, f, true) + .map_err(BackendError::Registry) + }) + } + fn metrics(&self) -> &BackendMetrics { &self.metrics } diff --git a/storage/src/cache/cachedfile.rs b/storage/src/cache/cachedfile.rs index a18f8004fd2..232db2b25da 100644 --- a/storage/src/cache/cachedfile.rs +++ b/storage/src/cache/cachedfile.rs @@ -9,6 +9,7 @@ //! performance. It may be used by both the userspace `FileCacheMgr` or the `FsCacheMgr` based //! on the in-kernel fscache system. +use std::cmp; use std::collections::HashSet; use std::fs::File; use std::io::{ErrorKind, Read, Result}; @@ -28,6 +29,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobReader; use crate::cache::state::ChunkMap; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncPrefetchMessage, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobIoMergeState}; use crate::device::{ @@ -38,6 +40,9 @@ use crate::meta::{BlobCompressionContextInfo, BlobMetaChunk}; use crate::utils::{alloc_buf, copyv, readv, MemSliceCursor}; use crate::{StorageError, StorageResult, RAFS_BATCH_SIZE_TO_GAP_SHIFT, RAFS_DEFAULT_CHUNK_SIZE}; +use super::streaming::StreamingPrefetchMessage; +use super::ChunkDecompressState; + const DOWNLOAD_META_RETRY_COUNT: u32 = 5; const DOWNLOAD_META_RETRY_DELAY: u64 = 400; const ENCRYPTION_PAGE_SIZE: usize = 4096; @@ -141,6 +146,7 @@ pub(crate) struct FileCacheEntry { pub(crate) reader: Arc, pub(crate) runtime: Arc, pub(crate) workers: Arc, + pub(crate) stream_workers: Arc, pub(crate) blob_compressed_size: u64, pub(crate) blob_uncompressed_size: u64, @@ -517,6 +523,10 @@ impl BlobCache for FileCacheEntry { Ok(()) } + fn start_stream_prefetch(&self, blobs: Vec>) { + self.stream_workers.start_stream_prefetch(blobs); + } + fn stop_prefetch(&self) -> StorageResult<()> { loop { let val = self.prefetch_state.load(Ordering::Acquire); @@ -577,6 +587,20 @@ impl BlobCache for FileCacheEntry { Ok(0) } + fn new_add_stream_prefetch_range(&self, range: crate::device::BlobRange) -> Result<()> { + self.stream_workers.add_prefetch_range(range) + } + + fn flush_stream_prefetch(&self) -> Result<()> { + self.stream_workers.flush_waiting_queue() + } + + // 给添加一个range,这个range有可能是跟已有的请求连续的 + fn new_stream_prefetch_range(&self, blob_cache: Arc, offset: u64, size: u64) { + let msg = StreamingPrefetchMessage::new_blob_prefetch(blob_cache.clone(), offset, size); + let _ = self.stream_workers.send_prefetch_message(msg); + } + fn prefetch_range(&self, range: &BlobIoRange) -> Result { let mut pending = Vec::with_capacity(range.chunks.len()); if !self.chunk_map.is_persist() { @@ -693,6 +717,166 @@ impl BlobCache for FileCacheEntry { Ok(None) } } + + fn fetch_range_compressed_stream( + self: Arc, + offset: u64, + size: u64, + prefetch: bool, + ) -> std::io::Result<()> { + let entry = self.clone(); // 有风险,观察是否真正写入 + let fc_meta = entry.meta.as_ref().ok_or_else(|| enoent!())?; + let meta = fc_meta.get_blob_meta().ok_or_else(|| einval!())?; + let mut f = move |mut resp: Box, + offset_f: u64, + size_f: u64, + processed: &mut u64| + -> std::io::Result<()> { + let mut info_offset = 0u64; + while info_offset < size_f { + // 循环读取一些chunk info出来处理 + let info_size = cmp::min(size_f - info_offset, 0x2000_0000); + let infos = + meta.get_chunks_compressed(offset_f + info_offset, info_size, 0, prefetch)?; + + //TODO: batch的处理 + let end = &infos[infos.len() - 1]; + let start = &infos[0]; + info_offset += end.compressed_end() - start.compressed_offset(); + + // let mut a_offset = start.compressed_offset(); + // let mut a_size = 0u64; + // let mut a_idx = 0; + // for idx in 0..infos.len() { + for info in &infos { + // TODO:完善error处理 + entry + .chunk_map + .check_ready_and_mark_pending(info.as_ref()) + .ok(); + } + + // let mut start = 0; + // let mut end = 0; + // let mut c_size_accumulated = 0; + + // // 处理infos直到所有的都被处理 + // while start < infos.len() { + // // 确定下一批次范围 + // while end < infos.len() && c_size_accumulated < 0x20_0000 { + // c_size_accumulated += infos[end].compressed_size() as usize; + // end += 1; + // } + + // // 读取并解压缩当前批次 + // let infos_batch = &infos[start..end]; + // let mut chunk_buf = vec![0u8; c_size_accumulated]; + // resp.read_exact(chunk_buf.as_mut_slice())?; + + // let mut state = ChunkDecompressState::new( + // infos_batch[0].compressed_offset(), + // entry.as_ref(), + // infos_batch.iter().map(|v| v.as_ref()).collect(), + // chunk_buf, + // ); + + // let mut idx = 0; + // while let Some(Ok(buf)) = state.next() { + // let info = &infos_batch[idx]; + // idx += 1; + // entry.persist_chunk_data(info.as_ref(), &buf); + // *processed += info.compressed_size() as u64; + // } + + // // 准备下一批次的变量 + // start = end; + // c_size_accumulated = 0; + // } + + // let infos_iter = infos.chunks(20); + // for infos_b in infos_iter { + // let mut c_size = 0; + + // for info in infos_b { + // // 读取压缩数据块 + // c_size += info.compressed_size(); + // } + // let mut chunk_buf = vec![0u8; c_size as usize]; + // resp.read_exact(chunk_buf.as_mut_slice())?; + + // let mut state = ChunkDecompressState::new( + // infos_b[0].compressed_offset(), + // entry.as_ref(), + // infos_b.iter().map(|v| v.as_ref()).collect(), + // chunk_buf, + // ); + + // let mut idx = 0; + // while let Some(Ok(buf)) = state.next() { + // let info = &infos_b[idx]; + // idx += 1; + // entry.persist_chunk_data(info.as_ref(), &buf); + // *processed += info.compressed_size() as u64; + // } + // } + //////////// + for info in &infos { + // debug!( + // "fetch_range_compressed_stream: {}, {}", + // info.compressed_offset(), + // info.compressed_size() + // ); + let c_size = info.compressed_size(); + let mut chunk_buf = vec![0u8; c_size as usize]; + resp.read_exact(chunk_buf.as_mut_slice())?; + + let d_buf = ChunkDecompressState::new( + info.compressed_offset(), + entry.as_ref(), + vec![info.as_ref()], + chunk_buf, + ) + .next() + .unwrap() + .unwrap(); + entry.persist_chunk_data(info.as_ref(), &d_buf); + // entry.update_chunk_pending_status(info, true); + *processed += c_size as u64; + } + } + Ok(()) + }; + self.reader() + .read_stream(offset, size as u32, &mut f) + .map_err(|e| eio!(e))?; + Ok(()) + + // // Read requested data from the backend by altogether. + // let mut c_buf = alloc_buf(blob_size); + // let start = Instant::now(); + // let nr_read = self + // .reader() + // .read(c_buf.as_mut_slice(), blob_offset) + // .map_err(|e| eio!(e))?; + // if nr_read != blob_size { + // return Err(eio!(format!( + // "request for {} bytes but got {} bytes", + // blob_size, nr_read + // ))); + // } + // let duration = Instant::now().duration_since(start).as_millis(); + // debug!( + // "read_chunks_from_backend: {} {} {} bytes at {}, duration {}ms", + // std::thread::current().name().unwrap_or_default(), + // if prefetch { "prefetch" } else { "fetch" }, + // blob_size, + // blob_offset, + // duration + // ); + + // let chunks = chunks.iter().map(|v| v.as_ref()).collect(); + // Ok(ChunkDecompressState::new(blob_offset, self, chunks, c_buf)) + } } impl BlobObject for FileCacheEntry { diff --git a/storage/src/cache/dummycache.rs b/storage/src/cache/dummycache.rs index 7a0465f36e2..1906d381dc5 100644 --- a/storage/src/cache/dummycache.rs +++ b/storage/src/cache/dummycache.rs @@ -31,7 +31,7 @@ use crate::backend::{BlobBackend, BlobReader}; use crate::cache::state::{ChunkMap, NoopChunkMap}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{ - BlobChunkInfo, BlobFeatures, BlobInfo, BlobIoDesc, BlobIoVec, BlobPrefetchRequest, + BlobChunkInfo, BlobFeatures, BlobInfo, BlobIoDesc, BlobIoVec, BlobPrefetchRequest, BlobRange, }; use crate::utils::{alloc_buf, copyv}; use crate::{StorageError, StorageResult}; @@ -104,6 +104,8 @@ impl BlobCache for DummyCache { Ok(()) } + fn start_stream_prefetch(&self, _blobs: Vec>) {} + fn stop_prefetch(&self) -> StorageResult<()> { Ok(()) } @@ -121,6 +123,18 @@ impl BlobCache for DummyCache { Err(StorageError::Unsupported) } + fn new_add_stream_prefetch_range(&self, _range: BlobRange) -> Result<()> { + unimplemented!() + } + + fn new_stream_prefetch_range(&self, _blob_cache: Arc, _offset: u64, _size: u64) { + unimplemented!() + } + + fn flush_stream_prefetch(&self) -> Result<()> { + unimplemented!() + } + fn read(&self, iovec: &mut BlobIoVec, bufs: &[FileVolatileSlice]) -> Result { let bios = &iovec.bi_vec; @@ -164,6 +178,15 @@ impl BlobCache for DummyCache { .map(|(n, _)| n) .map_err(|e| eother!(e)) } + + fn fetch_range_compressed_stream( + self: Arc, + _offset: u64, + _size: u64, + _prefetch: bool, + ) -> std::io::Result<()> { + unimplemented!(); + } } /// A dummy implementation of [BlobCacheMgr](../trait.BlobCacheMgr.html), simply reporting each diff --git a/storage/src/cache/filecache/mod.rs b/storage/src/cache/filecache/mod.rs index 1e38f3b3072..e89a8f2d005 100644 --- a/storage/src/cache/filecache/mod.rs +++ b/storage/src/cache/filecache/mod.rs @@ -20,6 +20,7 @@ use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{ BlobStateMap, ChunkMap, DigestedChunkMap, IndexedChunkMap, NoopChunkMap, }; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo}; @@ -37,6 +38,7 @@ pub struct FileCacheMgr { prefetch_config: Arc, runtime: Arc, worker_mgr: Arc, + streaming_prefetch_mgr: Arc, work_dir: String, validate: bool, disable_indexed_map: bool, @@ -62,6 +64,7 @@ impl FileCacheMgr { let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let streaming_prefetch_mgr = StreamPrefetchMgr::new(); Ok(FileCacheMgr { blobs: Arc::new(RwLock::new(HashMap::new())), @@ -70,6 +73,7 @@ impl FileCacheMgr { prefetch_config, runtime, worker_mgr: Arc::new(worker_mgr), + streaming_prefetch_mgr: Arc::new(streaming_prefetch_mgr), work_dir: work_dir.to_owned(), disable_indexed_map: blob_cfg.disable_indexed_map, validate: config.cache_validate, @@ -100,6 +104,7 @@ impl FileCacheMgr { self.prefetch_config.clone(), self.runtime.clone(), self.worker_mgr.clone(), + self.streaming_prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -120,7 +125,8 @@ impl FileCacheMgr { impl BlobCacheMgr for FileCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + AsyncWorkerMgr::start(self.worker_mgr.clone())?; + StreamPrefetchMgr::start(self.streaming_prefetch_mgr.clone()) } fn destroy(&self) { @@ -183,6 +189,7 @@ impl FileCacheEntry { prefetch_config: Arc, runtime: Arc, workers: Arc, + stream_workers: Arc, ) -> Result { let is_separate_meta = blob_info.has_feature(BlobFeatures::SEPARATE); let is_tarfs = blob_info.features().is_tarfs(); @@ -328,6 +335,7 @@ impl FileCacheEntry { reader, runtime, workers, + stream_workers, blob_compressed_size, blob_uncompressed_size, diff --git a/storage/src/cache/fscache/mod.rs b/storage/src/cache/fscache/mod.rs index 5b2285c9b0e..bad38cd4069 100644 --- a/storage/src/cache/fscache/mod.rs +++ b/storage/src/cache/fscache/mod.rs @@ -16,6 +16,7 @@ use tokio::runtime::Runtime; use crate::backend::BlobBackend; use crate::cache::cachedfile::{FileCacheEntry, FileCacheMeta}; use crate::cache::state::{BlobStateMap, IndexedChunkMap, RangeMap}; +use crate::cache::streaming::StreamPrefetchMgr; use crate::cache::worker::{AsyncPrefetchConfig, AsyncWorkerMgr}; use crate::cache::{BlobCache, BlobCacheMgr}; use crate::device::{BlobFeatures, BlobInfo, BlobObject}; @@ -35,6 +36,7 @@ pub struct FsCacheMgr { prefetch_config: Arc, runtime: Arc, worker_mgr: Arc, + streaming_prefetch_mgr: Arc, work_dir: String, need_validation: bool, blobs_check_count: Arc, @@ -60,6 +62,7 @@ impl FsCacheMgr { let metrics = BlobcacheMetrics::new(id, work_dir); let prefetch_config: Arc = Arc::new((&config.prefetch).into()); let worker_mgr = AsyncWorkerMgr::new(metrics.clone(), prefetch_config.clone())?; + let streaming_prefetch_mgr = StreamPrefetchMgr::new(); BLOB_FACTORY.start_mgr_checker(); @@ -70,6 +73,7 @@ impl FsCacheMgr { prefetch_config, runtime, worker_mgr: Arc::new(worker_mgr), + streaming_prefetch_mgr: Arc::new(streaming_prefetch_mgr), work_dir: work_dir.to_owned(), need_validation: config.cache_validate, blobs_check_count: Arc::new(AtomicU8::new(0)), @@ -96,6 +100,7 @@ impl FsCacheMgr { self.prefetch_config.clone(), self.runtime.clone(), self.worker_mgr.clone(), + self.streaming_prefetch_mgr.clone(), )?; let entry = Arc::new(entry); let mut guard = self.blobs.write().unwrap(); @@ -116,7 +121,8 @@ impl FsCacheMgr { impl BlobCacheMgr for FsCacheMgr { fn init(&self) -> Result<()> { - AsyncWorkerMgr::start(self.worker_mgr.clone()) + AsyncWorkerMgr::start(self.worker_mgr.clone())?; + StreamPrefetchMgr::start(self.streaming_prefetch_mgr.clone()) } fn destroy(&self) { @@ -201,6 +207,7 @@ impl FileCacheEntry { prefetch_config: Arc, runtime: Arc, workers: Arc, + stream_workers: Arc, ) -> Result { if blob_info.has_feature(BlobFeatures::_V5_NO_EXT_BLOB_TABLE) { return Err(einval!("fscache does not support Rafs v5 blobs")); @@ -279,6 +286,7 @@ impl FileCacheEntry { reader, runtime, workers, + stream_workers, blob_compressed_size, blob_uncompressed_size: blob_info.uncompressed_size(), diff --git a/storage/src/cache/mod.rs b/storage/src/cache/mod.rs index 1ae6dda497e..7dd6a6c5d7c 100644 --- a/storage/src/cache/mod.rs +++ b/storage/src/cache/mod.rs @@ -30,6 +30,7 @@ use crate::backend::{BlobBackend, BlobReader}; use crate::cache::state::ChunkMap; use crate::device::{ BlobChunkInfo, BlobInfo, BlobIoDesc, BlobIoRange, BlobIoVec, BlobObject, BlobPrefetchRequest, + BlobRange, }; use crate::meta::BlobCompressionContextInfo; use crate::utils::{alloc_buf, check_digest}; @@ -42,6 +43,7 @@ mod dummycache; mod filecache; #[cfg(target_os = "linux")] mod fscache; +mod streaming; mod worker; pub mod state; @@ -215,6 +217,8 @@ pub trait BlobCache: Send + Sync { /// It should be paired with stop_prefetch(). fn start_prefetch(&self) -> StorageResult<()>; + fn start_stream_prefetch(&self, blobs: Vec>); + /// Stop prefetching blob data in background. /// /// It should be paired with start_prefetch(). @@ -231,6 +235,12 @@ pub trait BlobCache: Send + Sync { bios: &[BlobIoDesc], ) -> StorageResult; + fn new_add_stream_prefetch_range(&self, range: BlobRange) -> Result<()>; + + fn flush_stream_prefetch(&self) -> Result<()>; + + fn new_stream_prefetch_range(&self, blob_cache: Arc, offset: u64, size: u64); + /// Execute filesystem data prefetch. fn prefetch_range(&self, _range: &BlobIoRange) -> Result { Err(enosys!("doesn't support prefetch_range()")) @@ -400,6 +410,13 @@ pub trait BlobCache: Send + Sync { fn get_blob_meta_info(&self) -> Result>> { Ok(None) } + + fn fetch_range_compressed_stream( + self: Arc, + offset: u64, + size: u64, + prefetch: bool, + ) -> std::io::Result<()>; } /// An iterator to enumerate decompressed data for chunks. diff --git a/storage/src/cache/streaming.rs b/storage/src/cache/streaming.rs new file mode 100644 index 00000000000..dabfcc7a72b --- /dev/null +++ b/storage/src/cache/streaming.rs @@ -0,0 +1,313 @@ +use crate::cache::BlobCache; +use crate::device::BlobRange; +use indexmap::IndexMap; +use nydus_utils::async_helper::with_runtime; +use nydus_utils::mpmc::Channel; +use std::io::Result; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; +use tokio::runtime::Runtime; + +/// Asynchronous service request message. +pub enum StreamingPrefetchMessage { + /// Asynchronous blob layer prefetch request with (offset, size) of blob on storage backend. + BlobPrefetch(Arc, u64, u64), +} + +impl StreamingPrefetchMessage { + /// Create a new asynchronous blob prefetch request message. + pub fn new_blob_prefetch(blob_cache: Arc, offset: u64, size: u64) -> Self { + // debug!( + // "CMDebug: new_blob_prefetch, offset: {}, size: {}", + // offset, size + // ); + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) + } +} + +// 最大负债4MB +static MAX_DEBT: u64 = 0x400000; +// 小任务判断标准:<1MB +#[allow(unused)] +static MIN_TASK_SIZE: u64 = 0x100000; + +// 最小任务阈值: 512KB +static MIN_SUBMITTALBE_TASK_SIZE: u64 = 0x80000; + +struct PrefetchBuffer { + // 用于计算预取任务的 + // 最后更新的任务(大概率是最新的任务)的start_offset + last_modified: u64, + // 正在等待用于计算的任务队列 + buf: IndexMap, + // 目前为止总共计算了多少预取数据 + total_processed: u64, + blobs: Vec>, +} +pub struct StreamPrefetchMgr { + workers: AtomicU32, + threads_count: u32, + active: AtomicBool, + prefetch_channel: Arc>, + waiting: Mutex, + // 保存任务的队列 + new_channel: Arc>, + // 保存小任务的队列 + new_channel_small: Arc>, +} + +impl StreamPrefetchMgr { + pub fn new() -> Self { + let threads_count = 2; + Self { + workers: AtomicU32::new(0), + threads_count, + active: AtomicBool::new(false), + prefetch_channel: Arc::new(Channel::new()), + waiting: Mutex::new(PrefetchBuffer { + last_modified: 0, + buf: IndexMap::new(), + total_processed: 0, + blobs: Vec::new(), + }), + new_channel: Arc::new(Channel::new()), + new_channel_small: Arc::new(Channel::new()), + } + } + + /// Create working threads and start the event loop. + pub fn start(mgr: Arc) -> Result<()> { + Self::start_prefetch_workers(mgr)?; + + Ok(()) + } + + pub fn start_stream_prefetch(&self, blobs: Vec>) { + let mut waiting = self.waiting.lock().unwrap(); + waiting.blobs = blobs; + } + + /// Send an asynchronous service request message to the workers. + pub fn send_prefetch_message( + &self, + msg: StreamingPrefetchMessage, + ) -> std::result::Result<(), StreamingPrefetchMessage> { + self.prefetch_channel.send(msg) + } + + // 要求append是合法的,这里不检查合法性 + fn extend_range( + &self, + start_processed: u64, + r_new: BlobRange, + waiting: &mut PrefetchBuffer, + ) -> Result<()> { + let r = waiting.buf.get_mut(&start_processed).unwrap(); + let r_new_size = r_new.end - r_new.offset; + // TODO:判断数据债是否超过阈值 + // 全局已处理的数据量-任务发起前全局已处理的数据量 > 任务目前的长度 + MAX_DEPT + // 而且,任务不能太小 + if waiting.total_processed - start_processed > r.end - r.offset + MAX_DEBT + && r.end - r.offset < MIN_SUBMITTALBE_TASK_SIZE + { + // 数据债是否超过阈值,所以要提交该任务 + if let Some(r) = waiting.buf.remove(&start_processed) { + // 将该任务弹出并加入到任务队列 + debug!("CMDebug: 9999, total_processed: {}, start_processed: {}, r_end, {}, r_offset: {}", waiting.total_processed, start_processed, r.end, r.offset); + self.send_msg(r, &waiting.blobs)?; + // 将新任务添加到末尾 + waiting.buf.insert(waiting.total_processed, r_new); + waiting.last_modified = waiting.total_processed; + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "append_range: waiting_queue remove failed", + )); + } + } else { + // 数据债未超阈值, + r.end = r_new.end; + } + + // 为两种情况均更新current_offset + waiting.total_processed += r_new_size; + Ok(()) + } + + pub fn add_prefetch_range(&self, r_new: BlobRange) -> Result<()> { + let mut waiting = self.waiting.lock().unwrap(); + + // 这里处理了self.last_modified初始值问题,if==false + // debug!( + // "CMDebug: 4444, &waiting.last_modified: {}", + // &waiting.last_modified + // ); + if let Some(r_recent) = waiting.buf.get(&waiting.last_modified) { + // debug!("CMDebug: 6666"); + // TODO:完善这里对于is_countinous的判断 + if r_recent.blob_idx == r_new.blob_idx && r_recent.end == r_new.offset { + // debug!( + // "CMDebug: 1111, blob_idx: {}, offset: {}, end: {}", + // r_new.blob_idx, r_new.offset, r_new.end + // ); + self.extend_range(waiting.last_modified, r_new, &mut waiting)?; + return Ok(()); + // } else { + // debug!( + // "CMDebug: 2222, blob_idx: {}, offset: {}, end: {}, r_recent.blob_idx: {}, r_recent.end: {}", + // r_new.blob_idx, r_new.offset, r_new.end, r_recent.blob_idx, r_recent.end + // ); + } + } + //针对非连续的任务,需要判断任务列表了 + //1. 尝试extend到现有任务中 + for (start_offset, r) in waiting.buf.iter() { + if r.blob_idx == r_new.blob_idx && r.end == r_new.offset { + // debug!("CMDebug: 10101010"); + self.extend_range(*start_offset, r_new, &mut waiting)?; + return Ok(()); + } + } + // 2.append为新任务 + let r_new_size = r_new.end - r_new.offset; + let p = waiting.total_processed; + waiting.buf.insert(p, r_new); + // debug!("CMDebug: 3333, p: {}", p); + waiting.last_modified = waiting.total_processed; + waiting.total_processed += r_new_size; + // if let Some(r_recent) = waiting.buf.get(&waiting.last_modified) { + // debug!( + // "CMDebug: 5555, blob_idx: {}, offset: {}, end: {}", + // r_recent.blob_idx, r_recent.offset, r_recent.end + // ); + // } + Ok(()) + } + + #[inline] + fn send_msg(&self, r: BlobRange, blobs: &[Arc]) -> Result<()> { + let msg = StreamingPrefetchMessage::new_blob_prefetch( + blobs[r.blob_idx as usize].clone(), + r.offset, + r.end - r.offset, + ); + //let channel = &self.new_channel_small; + let channel = if r.end - r.offset < MIN_TASK_SIZE { + &self.new_channel_small + } else { + &self.new_channel + }; + debug!( + "CMDebug: send_msg, offset: {}, size: {}", + r.offset, + r.end - r.offset + ); + channel.send(msg).map_err(|_| { + std::io::Error::new(std::io::ErrorKind::Other, "Send prefetch message failed") + }) + } + + pub fn flush_waiting_queue(&self) -> Result<()> { + let mut waiting = self.waiting.lock().unwrap(); + let mut buf = std::mem::take(&mut waiting.buf); + + for (_, r) in buf.drain(..) { + self.send_msg(r, &waiting.blobs)?; + } + + Ok(()) + } + + fn start_prefetch_workers(mgr: Arc) -> Result<()> { + debug!("CMDebug: start_prefetch_workers"); + for num in 0..mgr.threads_count + 1 { + let mgr2 = mgr.clone(); + let res = thread::Builder::new() + .name(format!("nydus_storage_worker_{}", num)) + .spawn(move || { + mgr2.grow_n(1); + debug!("CMDebug: start_prefetch_workers, {}", num); + + with_runtime(|rt| { + if num == 0 { + rt.block_on(Self::handle_prefetch_requests_small(mgr2.clone(), rt)); + } else { + rt.block_on(Self::handle_prefetch_requests(mgr2.clone(), rt)); + } + }); + + mgr2.shrink_n(1); + info!("storage: worker thread {} exits.", num) + }); + + if let Err(e) = res { + error!("storage: failed to create worker thread, {:?}", e); + return Err(e); + } + } + mgr.active.store(true, Ordering::Release); + Ok(()) + } + + async fn handle_prefetch_requests(mgr: Arc, rt: &Runtime) { + loop { + let msg; + tokio::select! { + Ok(m) = mgr.new_channel.recv() => msg = m, + Ok(m) = mgr.new_channel_small.recv() => msg = m, + else => break, + } + match msg { + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => { + rt.spawn_blocking(move || { + let _ = Self::handle_blob_prefetch_request(blob_cache, offset, size); + }); + } + } + } + } + + // 专门处理小blob + async fn handle_prefetch_requests_small(mgr: Arc, rt: &Runtime) { + // debug!("CMDebug: handle_prefetch_requests"); + while let Ok(msg) = mgr.new_channel_small.recv().await { + // debug!("CMDebug: handle_prefetch_requests 1"); + + match msg { + StreamingPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => { + rt.spawn_blocking(move || { + let _ = Self::handle_blob_prefetch_request(blob_cache, offset, size); + }); + } + } + } + } + + fn handle_blob_prefetch_request( + cache: Arc, + offset: u64, + size: u64, + ) -> Result<()> { + debug!( + "CMDebug: storage: prefetch blob {} offset {} size {}", + cache.blob_id(), + offset, + size + ); + if size == 0 { + return Ok(()); + } + + cache.fetch_range_compressed_stream(offset, size, true)?; + + Ok(()) + } + + fn shrink_n(&self, n: u32) { + self.workers.fetch_sub(n, Ordering::Relaxed); + } + fn grow_n(&self, n: u32) { + self.workers.fetch_add(n, Ordering::Relaxed); + } +} diff --git a/storage/src/device.rs b/storage/src/device.rs index 08c3ee5409a..52f12394a03 100644 --- a/storage/src/device.rs +++ b/storage/src/device.rs @@ -775,7 +775,7 @@ pub struct BlobIoVec { /// Total size of blob IOs to be performed. bi_size: u64, /// Array of blob IOs, these IOs should executed sequentially. - pub(crate) bi_vec: Vec, + pub bi_vec: Vec, } impl BlobIoVec { @@ -1034,6 +1034,23 @@ pub struct BlobPrefetchRequest { pub len: u64, } +pub struct BlobRange { + pub blob_idx: u32, + /// Compressed offset into the blob to prefetch data. + pub offset: u64, + pub end: u64, +} + +impl Debug for BlobRange { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("BlobRange") + .field("blob_idx", &self.blob_idx) + .field("offset", &self.offset) + .field("end", &self.end) + .finish() + } +} + /// Trait to provide direct access to underlying uncompressed blob file. /// /// The suggested flow to make use of an `BlobObject` is as below: @@ -1186,6 +1203,18 @@ impl BlobDevice { Ok(()) } + pub fn new_stream_prefetch(&self, range: BlobRange) -> io::Result<()> { + let state = self.blobs.load(); + let blob = &state[range.blob_idx as usize]; + blob.new_add_stream_prefetch_range(range) + } + + pub fn flush_stream_prefetch(&self) -> io::Result<()> { + let state = self.blobs.load(); + let blob = &state[0]; + blob.flush_stream_prefetch() + } + /// Start the background blob data prefetch task. pub fn start_prefetch(&self) { for blob in self.blobs.load().iter() { @@ -1193,6 +1222,15 @@ impl BlobDevice { } } + pub fn start_stream_prefetch(&self) { + let blobs: Vec> = self.blobs.load().to_vec(); + if blobs.is_empty() { + error!("start_stream_prefetch failed: no blob"); + return; + } + blobs[0].clone().start_stream_prefetch(blobs); + } + /// Stop the background blob data prefetch task. pub fn stop_prefetch(&self) { for blob in self.blobs.load().iter() { diff --git a/storage/src/meta/mod.rs b/storage/src/meta/mod.rs index 39d9b8beb27..cf0ca3427b7 100644 --- a/storage/src/meta/mod.rs +++ b/storage/src/meta/mod.rs @@ -1993,6 +1993,7 @@ fn round_up_4k + BitAnd + Not + From< #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::backend::registry::StreamCallback; use crate::backend::{BackendResult, BlobReader}; use crate::device::BlobFeatures; use crate::RAFS_DEFAULT_CHUNK_SIZE; @@ -2018,6 +2019,16 @@ pub(crate) mod tests { Ok(ret) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &BackendMetrics { &self.metrics } diff --git a/storage/src/test.rs b/storage/src/test.rs index fc115f3b867..3e75b90f885 100644 --- a/storage/src/test.rs +++ b/storage/src/test.rs @@ -9,6 +9,7 @@ use nydus_utils::digest::RafsDigest; use nydus_utils::metrics::BackendMetrics; use super::impl_getter; +use crate::backend::registry::StreamCallback; use crate::backend::{BackendResult, BlobBackend, BlobReader}; use crate::device::v5::BlobV5ChunkInfo; use crate::device::{BlobChunkFlags, BlobChunkInfo}; @@ -32,6 +33,16 @@ impl BlobReader for MockBackend { Ok(i) } + fn try_read_stream( + &self, + _offset: u64, + _size: u32, + _processed: &mut u64, + _f: &mut StreamCallback, + ) -> BackendResult<()> { + unimplemented!() + } + fn metrics(&self) -> &BackendMetrics { // Safe because nydusd must have backend attached with id, only image builder can no id // but use backend instance to upload blob.