From 748c12e578503245857e59721a0dda87017804b3 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Thu, 16 Feb 2023 14:06:26 +0800 Subject: [PATCH 1/7] rafs: refine v6 related code Refine v6 related code and add two fields to meta info. Signed-off-by: Jiang Liu --- rafs/src/metadata/layout/v6.rs | 109 +++++++++++++++++++++------------ rafs/src/metadata/md_v6.rs | 8 ++- rafs/src/metadata/mod.rs | 6 ++ 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index 6cf13afd01b..f36804881ca 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -21,48 +21,53 @@ use nydus_storage::{RAFS_MAX_CHUNKS_PER_BLOB, RAFS_MAX_CHUNK_SIZE}; use nydus_utils::{compress, digest, round_up, ByteSize}; use crate::metadata::layout::v5::RafsV5ChunkInfo; -use crate::metadata::layout::MetaRange; -use crate::metadata::{layout::RafsXAttrs, RafsStore, RafsSuperFlags}; +use crate::metadata::layout::{MetaRange, RafsXAttrs}; +use crate::metadata::{RafsStore, RafsSuperFlags, RafsSuperMeta}; use crate::{impl_bootstrap_converter, impl_pub_getter_setter, RafsIoReader, RafsIoWrite}; /// EROFS metadata slot size. pub const EROFS_INODE_SLOT_SIZE: usize = 1 << EROFS_INODE_SLOT_BITS; +/// Bits of EROFS logical block size. +pub const EROFS_BLOCK_BITS: u8 = 12; /// EROFS logical block size. pub const EROFS_BLOCK_SIZE: u64 = 1u64 << EROFS_BLOCK_BITS; -/// EROFS plain inode. -pub const EROFS_INODE_FLAT_PLAIN: u16 = 0; -/// EROFS inline inode. -pub const EROFS_INODE_FLAT_INLINE: u16 = 2; -/// EROFS chunked inode. -pub const EROFS_INODE_CHUNK_BASED: u16 = 4; + +/// Offset of EROFS super block. +pub const EROFS_SUPER_OFFSET: u16 = 1024; +/// Size of EROFS super block. +pub const EROFS_SUPER_BLOCK_SIZE: u16 = 128; +/// Size of extended super block, used for rafs v6 specific fields +pub const EROFS_EXT_SUPER_BLOCK_SIZE: u16 = 256; /// EROFS device table offset. pub const EROFS_DEVTABLE_OFFSET: u16 = EROFS_SUPER_OFFSET + EROFS_SUPER_BLOCK_SIZE + EROFS_EXT_SUPER_BLOCK_SIZE; +/// Offseet for inode format flags: compact or extended. pub const EROFS_I_VERSION_BIT: u16 = 0; +/// Number of bits for inode format flags. pub const EROFS_I_VERSION_BITS: u16 = 1; +/// 32-byte on-disk inode +pub const EROFS_INODE_LAYOUT_COMPACT: u16 = 0; +/// 64-byte on-disk inode +pub const EROFS_INODE_LAYOUT_EXTENDED: u16 = 1; +/// Number of bits for inode data layout. pub const EROFS_I_DATALAYOUT_BITS: u16 = 3; +/// EROFS plain inode. +pub const EROFS_INODE_FLAT_PLAIN: u16 = 0; +/// EROFS inline inode. +pub const EROFS_INODE_FLAT_INLINE: u16 = 2; +/// EROFS chunked inode. +pub const EROFS_INODE_CHUNK_BASED: u16 = 4; -// Offset of EROFS super block. -pub const EROFS_SUPER_OFFSET: u16 = 1024; -// Size of EROFS super block. -pub const EROFS_SUPER_BLOCK_SIZE: u16 = 128; -// Size of extended super block, used for rafs v6 specific fields -const EROFS_EXT_SUPER_BLOCK_SIZE: u16 = 256; // Magic number for EROFS super block. const EROFS_SUPER_MAGIC_V1: u32 = 0xE0F5_E1E2; -// Bits of EROFS logical block size. -const EROFS_BLOCK_BITS: u8 = 12; // Bits of EROFS metadata slot size. const EROFS_INODE_SLOT_BITS: u8 = 5; -// 32-byte on-disk inode -const EROFS_INODE_LAYOUT_COMPACT: u16 = 0; -// 64-byte on-disk inode -const EROFS_INODE_LAYOUT_EXTENDED: u16 = 1; // Bit flag indicating whether the inode is chunked or not. const EROFS_CHUNK_FORMAT_INDEXES_FLAG: u16 = 0x0020; // Encoded chunk size (log2(chunk_size) - EROFS_BLOCK_BITS). const EROFS_CHUNK_FORMAT_SIZE_MASK: u16 = 0x001F; + /// Checksum of superblock, compatible with EROFS versions prior to Linux kernel 5.5. #[allow(dead_code)] const EROFS_FEATURE_COMPAT_SB_CHKSUM: u32 = 0x0000_0001; @@ -72,6 +77,7 @@ const EROFS_FEATURE_COMPAT_RAFS_V6: u32 = 0x4000_0000; const EROFS_FEATURE_INCOMPAT_CHUNKED_FILE: u32 = 0x0000_0004; /// Multi-devices, incompatible with EROFS versions prior to Linux kernel 5.16. const EROFS_FEATURE_INCOMPAT_DEVICE_TABLE: u32 = 0x0000_0008; + /// Size of SHA256 digest string. const BLOB_SHA256_LEN: usize = 64; const BLOB_MAX_SIZE_UNCOMPRESSED: u64 = 1u64 << 44; @@ -97,7 +103,7 @@ pub struct RafsV6SuperBlock { s_extslots: u8, /// Nid of the root directory. /// `root inode offset = s_meta_blkaddr * 4096 + s_root_nid * 32`. - pub s_root_nid: u16, + s_root_nid: u16, /// Total valid ino # s_inos: u64, /// Timestamp of filesystem creation. @@ -107,7 +113,7 @@ pub struct RafsV6SuperBlock { /// Total size of file system in blocks, used for statfs s_blocks: u32, /// Start block address of the metadata area. - pub s_meta_blkaddr: u32, + s_meta_blkaddr: u32, /// Start block address of the shared xattr area. s_xattr_blkaddr: u32, /// 128-bit uuid for volume @@ -150,13 +156,19 @@ impl RafsV6SuperBlock { meta_size ))); } - if meta_size & (EROFS_BLOCK_SIZE - 1) != 0 { return Err(einval!(format!( "invalid Rafs v6 metadata size: bootstrap size {} is not aligned", meta_size ))); } + let meta_addr = u32::from_le(self.s_meta_blkaddr) as u64 * EROFS_BLOCK_SIZE; + if meta_addr > meta_size { + return Err(einval!(format!( + "invalid Rafs v6 meta block address 0x{:x}, meta file size 0x{:x}", + meta_addr, meta_size + ))); + } if u32::from_le(self.s_magic) != EROFS_SUPER_MAGIC_V1 { return Err(einval!(format!( @@ -179,14 +191,14 @@ impl RafsV6SuperBlock { ))); } - if self.s_inos == 0 { - return Err(einval!("invalid inode number in Rafsv6 superblock")); - } - if self.s_extslots != 0 { return Err(einval!("invalid extended slots in Rafsv6 superblock")); } + if self.s_inos == 0 { + return Err(einval!("invalid inode number in Rafsv6 superblock")); + } + if self.s_u != 0 { return Err(einval!("invalid union field in Rafsv6 superblock")); } @@ -206,9 +218,9 @@ impl RafsV6SuperBlock { return Err(einval!("invalid extra device count in Rafsv6 superblock")); } - if u16::from_le(self.s_devt_slotoff) - != (EROFS_DEVTABLE_OFFSET / size_of::() as u16) - { + let devtable_off = + u16::from_le(self.s_devt_slotoff) as u64 * size_of::() as u64; + if devtable_off != EROFS_DEVTABLE_OFFSET as u64 { return Err(einval!(format!( "invalid device table slot offset {} in Rafsv6 superblock", u16::from_le(self.s_devt_slotoff) @@ -259,23 +271,34 @@ impl RafsV6SuperBlock { self.s_blocks = blocks.to_le(); } + /// Get root nid. + pub fn root_nid(&self) -> u16 { + u16::from_le(self.s_root_nid) + } + /// Set EROFS root nid. pub fn set_root_nid(&mut self, nid: u16) { self.s_root_nid = nid.to_le(); } + /// Get meta block address. + pub fn meta_addr(&self) -> u32 { + u32::from_le(self.s_meta_blkaddr) + } + /// Set EROFS meta block address. pub fn set_meta_addr(&mut self, meta_addr: u64) { assert!((meta_addr / EROFS_BLOCK_SIZE) <= u32::MAX as u64); self.s_meta_blkaddr = u32::to_le((meta_addr / EROFS_BLOCK_SIZE) as u32); } - /// Set number of extra devices. - pub fn set_extra_devices(&mut self, count: u16) { - self.s_extra_devices = count.to_le(); + /// Get device table offset. + pub fn device_table_offset(&self) -> u64 { + u16::from_le(self.s_devt_slotoff) as u64 * size_of::() as u64 } impl_pub_getter_setter!(magic, set_magic, s_magic, u32); + impl_pub_getter_setter!(extra_devices, set_extra_devices, s_extra_devices, u16); } impl RafsStore for RafsV6SuperBlock { @@ -365,7 +388,7 @@ impl RafsV6SuperBlockExt { } /// Validate the Rafs v6 super block. - pub fn validate(&self, meta_size: u64) -> Result<()> { + pub fn validate(&self, meta_size: u64, meta: &RafsSuperMeta) -> Result<()> { let mut flags = self.flags(); flags &= RafsSuperFlags::COMPRESSION_NONE.bits() | RafsSuperFlags::COMPRESSION_LZ4.bits() @@ -394,10 +417,13 @@ impl RafsV6SuperBlockExt { return Err(einval!("invalid chunk size in Rafs v6 extended superblock")); } + let devslot_end = meta.blob_device_table_offset + meta.blob_table_size as u64; + let blob_offset = self.blob_table_offset(); let blob_size = self.blob_table_size() as u64; if blob_offset & (EROFS_BLOCK_SIZE - 1) != 0 || blob_offset < EROFS_BLOCK_SIZE + || blob_offset < devslot_end || blob_size % size_of::() as u64 != 0 || blob_offset.checked_add(blob_size).is_none() || blob_offset + blob_size > meta_size @@ -409,11 +435,13 @@ impl RafsV6SuperBlockExt { } let blob_range = MetaRange::new(blob_offset, blob_size, true)?; + let mut chunk_info_tbl_range = None; if self.chunk_table_size() > 0 { let chunk_tbl_offset = self.chunk_table_offset(); let chunk_tbl_size = self.chunk_table_size(); if chunk_tbl_offset < EROFS_BLOCK_SIZE || chunk_tbl_offset % EROFS_BLOCK_SIZE != 0 + || chunk_tbl_offset < devslot_end || chunk_tbl_size % size_of::() as u64 != 0 || chunk_tbl_offset.checked_add(chunk_tbl_size).is_none() || chunk_tbl_offset + chunk_tbl_size > meta_size @@ -429,6 +457,7 @@ impl RafsV6SuperBlockExt { "blob table intersects with chunk table in Rafs v6 extended superblock", ))); } + chunk_info_tbl_range = Some(chunk_range); } // Legacy RAFS may have zero prefetch table offset but non-zero prefetch table size for @@ -438,6 +467,7 @@ impl RafsV6SuperBlockExt { let tbl_size = self.prefetch_table_size() as u64; if tbl_offset < EROFS_BLOCK_SIZE || tbl_size % size_of::() as u64 != 0 + || tbl_offset < devslot_end || tbl_offset.checked_add(tbl_size).is_none() || tbl_offset + tbl_size > meta_size { @@ -452,6 +482,13 @@ impl RafsV6SuperBlockExt { "blob table intersects with prefetch table in Rafs v6 extended superblock", ))); } + if let Some(chunk_range) = chunk_info_tbl_range.as_ref() { + if chunk_range.intersect_with(&prefetch_range) { + return Err(einval!(format!( + "chunk information table intersects with prefetch table in Rafs v6 extended superblock", + ))); + } + } } Ok(()) @@ -1224,10 +1261,6 @@ impl RafsV6Device { return Err(einval!(msg)); } - if u32::from_le(self.mapped_blkaddr) != 0 { - return Err(einval!("invalid mapped_addr in Rafs v6 Device")); - } - Ok(()) } } diff --git a/rafs/src/metadata/md_v6.rs b/rafs/src/metadata/md_v6.rs index 5d2abb829ae..a6f9e95d941 100644 --- a/rafs/src/metadata/md_v6.rs +++ b/rafs/src/metadata/md_v6.rs @@ -31,12 +31,14 @@ impl RafsSuper { sb.validate(end)?; self.meta.version = RAFS_SUPER_VERSION_V6; self.meta.magic = sb.magic(); - self.meta.meta_blkaddr = sb.s_meta_blkaddr; - self.meta.root_nid = sb.s_root_nid; + self.meta.meta_blkaddr = sb.meta_addr(); + self.meta.root_nid = sb.root_nid(); + self.meta.blob_device_table_count = sb.extra_devices() as u32; + self.meta.blob_device_table_offset = sb.device_table_offset(); let mut ext_sb = RafsV6SuperBlockExt::new(); ext_sb.load(r)?; - ext_sb.validate(end)?; + ext_sb.validate(end, &self.meta)?; self.meta.chunk_size = ext_sb.chunk_size(); self.meta.blob_table_offset = ext_sb.blob_table_offset(); self.meta.blob_table_size = ext_sb.blob_table_size(); diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index af1775e8dd0..893f2e82576 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -423,6 +423,10 @@ pub struct RafsSuperMeta { pub extended_blob_table_offset: u64, /// Offset of the extended blob information table into the metadata blob. pub extended_blob_table_entries: u32, + /// Number of RAFS v6 blob device entries in the devslot table. + pub blob_device_table_count: u32, + /// Offset of the RAFS v6 devslot table. + pub blob_device_table_offset: u64, /// Offset of the inode prefetch table into the metadata blob. pub prefetch_table_offset: u64, /// Size of the inode prefetch table. @@ -520,6 +524,8 @@ impl Default for RafsSuperMeta { blob_table_offset: 0, extended_blob_table_offset: 0, extended_blob_table_entries: 0, + blob_device_table_count: 0, + blob_device_table_offset: 0, prefetch_table_offset: 0, prefetch_table_entries: 0, attr_timeout: Duration::from_secs(RAFS_DEFAULT_ATTR_TIMEOUT), From c8b13ebef569e8ce40dd8ebe9c6e679918cea1d0 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Wed, 15 Feb 2023 21:38:45 +0800 Subject: [PATCH 2/7] rafs: load mapped-blkaddr for each data blob Load the mapped_blkaddr field for each data blob, later it will be used compose a RAFS v6 image into a block device. Signed-off-by: Jiang Liu --- rafs/src/metadata/direct_v6.rs | 21 +++++--- rafs/src/metadata/layout/v6.rs | 65 +++++++++++++++++-------- rafs/src/metadata/mod.rs | 15 +++++- service/src/blob_cache.rs | 87 ++++++++++++++++++++++------------ 4 files changed, 131 insertions(+), 57 deletions(-) diff --git a/rafs/src/metadata/direct_v6.rs b/rafs/src/metadata/direct_v6.rs index 2725c515419..8c71777e03e 100644 --- a/rafs/src/metadata/direct_v6.rs +++ b/rafs/src/metadata/direct_v6.rs @@ -38,15 +38,16 @@ use nydus_utils::{digest::RafsDigest, div_round_up, round_up}; use crate::metadata::layout::v5::RafsV5ChunkInfo; use crate::metadata::layout::v6::{ - recover_namespace, RafsV6BlobTable, RafsV6Dirent, RafsV6InodeChunkAddr, RafsV6InodeCompact, - RafsV6InodeExtended, RafsV6OndiskInode, RafsV6XattrEntry, RafsV6XattrIbodyHeader, - EROFS_BLOCK_SIZE, EROFS_INODE_CHUNK_BASED, EROFS_INODE_FLAT_INLINE, EROFS_INODE_FLAT_PLAIN, - EROFS_INODE_SLOT_SIZE, EROFS_I_DATALAYOUT_BITS, EROFS_I_VERSION_BIT, EROFS_I_VERSION_BITS, + rafsv6_load_blob_extra_info, recover_namespace, RafsV6BlobTable, RafsV6Dirent, + RafsV6InodeChunkAddr, RafsV6InodeCompact, RafsV6InodeExtended, RafsV6OndiskInode, + RafsV6XattrEntry, RafsV6XattrIbodyHeader, EROFS_BLOCK_SIZE, EROFS_INODE_CHUNK_BASED, + EROFS_INODE_FLAT_INLINE, EROFS_INODE_FLAT_PLAIN, EROFS_INODE_SLOT_SIZE, + EROFS_I_DATALAYOUT_BITS, EROFS_I_VERSION_BIT, EROFS_I_VERSION_BITS, }; use crate::metadata::layout::{bytes_to_os_str, MetaRange, XattrName, XattrValue}; use crate::metadata::{ - Attr, Entry, Inode, RafsInode, RafsInodeWalkAction, RafsInodeWalkHandler, RafsSuperBlock, - RafsSuperInodes, RafsSuperMeta, RAFS_ATTR_BLOCK_SIZE, RAFS_MAX_NAME, + Attr, Entry, Inode, RafsBlobExtraInfo, RafsInode, RafsInodeWalkAction, RafsInodeWalkHandler, + RafsSuperBlock, RafsSuperInodes, RafsSuperMeta, RAFS_ATTR_BLOCK_SIZE, RAFS_MAX_NAME, }; use crate::{MetaType, RafsError, RafsInodeExt, RafsIoReader, RafsResult}; @@ -63,6 +64,7 @@ fn err_invalidate_data(rafs_err: RafsError) -> std::io::Error { struct DirectMappingState { meta: Arc, blob_table: RafsV6BlobTable, + blob_extra_infos: HashMap, map: FileMapState, } @@ -71,6 +73,7 @@ impl DirectMappingState { DirectMappingState { meta: Arc::new(*meta), blob_table: RafsV6BlobTable::default(), + blob_extra_infos: HashMap::new(), map: FileMapState::default(), } } @@ -187,11 +190,13 @@ impl DirectSuperBlockV6 { let meta = &old_state.meta; r.seek(SeekFrom::Start(meta.blob_table_offset))?; blob_table.load(r, meta.blob_table_size, meta.chunk_size, meta.flags)?; + let blob_extra_infos = rafsv6_load_blob_extra_info(meta, r)?; let file_map = FileMapState::new(file, 0, len as usize, false)?; let state = DirectMappingState { meta: old_state.meta.clone(), blob_table, + blob_extra_infos, map: file_map, }; @@ -283,6 +288,10 @@ impl RafsSuperBlock for DirectSuperBlockV6 { self.state.load().blob_table.get_all() } + fn get_blob_extra_infos(&self) -> Result> { + Ok(self.state.load().blob_extra_infos.clone()) + } + fn root_ino(&self) -> u64 { self.info.root_ino } diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index f36804881ca..a7e88dd2714 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -3,6 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::convert::TryFrom; use std::ffi::{OsStr, OsString}; use std::fmt::Debug; @@ -22,7 +23,7 @@ use nydus_utils::{compress, digest, round_up, ByteSize}; use crate::metadata::layout::v5::RafsV5ChunkInfo; use crate::metadata::layout::{MetaRange, RafsXAttrs}; -use crate::metadata::{RafsStore, RafsSuperFlags, RafsSuperMeta}; +use crate::metadata::{RafsBlobExtraInfo, RafsStore, RafsSuperFlags, RafsSuperMeta}; use crate::{impl_bootstrap_converter, impl_pub_getter_setter, RafsIoReader, RafsIoWrite}; /// EROFS metadata slot size. @@ -226,6 +227,13 @@ impl RafsV6SuperBlock { u16::from_le(self.s_devt_slotoff) ))); } + let devtable_end = devtable_off + u16::from_le(self.s_extra_devices) as u64; + if devtable_end > meta_size { + return Err(einval!(format!( + "invalid device table slot count {} in Rafsv6 superblock", + u16::from_le(self.s_extra_devices) + ))); + } // s_build_time may be used as compact_inode's timestamp in the future. // if u64::from_le(self.s_build_time) != 0 || u32::from_le(self.s_build_time_nsec) != 0 { @@ -1225,21 +1233,6 @@ impl RafsV6Device { self.blob_id.copy_from_slice(id); } - /// Get number of blocks. - pub fn blocks(&self) -> u32 { - u32::from_le(self.blocks) - } - - /// Set number of blocks. - pub fn set_blocks(&mut self, blocks: u32) { - self.blocks = blocks.to_le(); - } - - /// Set mapped block address. - pub fn set_mapped_blkaddr(&mut self, addr: u32) { - self.mapped_blkaddr = addr.to_le(); - } - /// Load a `RafsV6Device` from a reader. pub fn load(&mut self, r: &mut RafsIoReader) -> Result<()> { r.read_exact(self.as_mut()) @@ -1250,19 +1243,25 @@ impl RafsV6Device { match String::from_utf8(self.blob_id.to_vec()) { Ok(v) => { if v.len() != BLOB_SHA256_LEN { - return Err(einval!(format!("v.len {} is invalid", v.len()))); + return Err(einval!(format!( + "Length of blob_id {} in RAFS v6 device entry is invalid", + v.len() + ))); } } - Err(_) => return Err(einval!("blob_id from_utf8 is invalid")), + Err(_) => return Err(einval!("blob_id in RAFS v6 device entry is invalid")), } if self.blocks() == 0 { - let msg = format!("invalid blocks {} in Rafs v6 Device", self.blocks()); + let msg = format!("invalid blocks {} in Rafs v6 device entry", self.blocks()); return Err(einval!(msg)); } Ok(()) } + + impl_pub_getter_setter!(blocks, set_blocks, blocks, u32); + impl_pub_getter_setter!(mapped_blkaddr, set_mapped_blkaddr, mapped_blkaddr, u32); } impl_bootstrap_converter!(RafsV6Device); @@ -1275,6 +1274,34 @@ impl RafsStore for RafsV6Device { } } +/// Load blob information table from a reader. +pub fn rafsv6_load_blob_extra_info( + meta: &RafsSuperMeta, + r: &mut RafsIoReader, +) -> Result> { + let mut infos = HashMap::new(); + if meta.blob_device_table_count == 0 { + return Ok(infos); + } + r.seek_to_offset(meta.blob_device_table_offset)?; + for _idx in 0..meta.blob_device_table_count { + let mut devslot = RafsV6Device::new(); + r.read_exact(devslot.as_mut())?; + devslot.validate()?; + let id = String::from_utf8(devslot.blob_id.to_vec()) + .map_err(|e| einval!(format!("invalid blob id, {}", e)))?; + let info = RafsBlobExtraInfo { + mapped_blkaddr: devslot.mapped_blkaddr(), + }; + if infos.contains_key(&id) { + return Err(einval!("duplicated blob id in RAFS v6 device table")); + } + infos.insert(id, info); + } + + Ok(infos) +} + #[inline] pub fn align_offset(offset: u64, aligned_size: u64) -> u64 { round_up(offset, aligned_size) diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index 893f2e82576..7949c26c9e3 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -6,7 +6,7 @@ //! Enums, Structs and Traits to access and manage Rafs filesystem metadata. use std::any::Any; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; @@ -68,6 +68,14 @@ pub const DOTDOT: &str = ".."; /// Type for RAFS filesystem inode number. pub type Inode = u64; +#[derive(Debug, Clone)] +pub struct RafsBlobExtraInfo { + /// Mapped block address from RAFS v6 devslot table. + /// + /// It's the offset of the uncompressed blob used to convert an image into a disk. + pub mapped_blkaddr: u32, +} + /// Trait to access filesystem inodes managed by a RAFS filesystem. pub trait RafsSuperInodes { /// Get the maximum inode number managed by the RAFS filesystem. @@ -95,6 +103,11 @@ pub trait RafsSuperBlock: RafsSuperInodes + Send + Sync { /// Get all blob objects referenced by the RAFS filesystem. fn get_blob_infos(&self) -> Vec>; + /// Get extra information associated with blob objects. + fn get_blob_extra_infos(&self) -> Result> { + Ok(HashMap::new()) + } + /// Get the inode number of the RAFS filesystem root. fn root_ino(&self) -> u64; diff --git a/service/src/blob_cache.rs b/service/src/blob_cache.rs index e9547c7dbcd..e05e15c91eb 100644 --- a/service/src/blob_cache.rs +++ b/service/src/blob_cache.rs @@ -14,7 +14,7 @@ use nydus_api::{ BlobCacheEntry, BlobCacheList, BlobCacheObjectId, ConfigV2, BLOB_CACHE_TYPE_DATA_BLOB, BLOB_CACHE_TYPE_META_BLOB, }; -use nydus_rafs::metadata::RafsSuper; +use nydus_rafs::metadata::{RafsBlobExtraInfo, RafsSuper}; use nydus_storage::device::BlobInfo; const ID_SPLITTER: &str = "/"; @@ -34,7 +34,8 @@ pub struct BlobCacheConfigMetaBlob { scoped_blob_id: String, path: PathBuf, config: Arc, - data_blobs: Mutex>>, + blobs: Mutex>>, + blob_extra_infos: HashMap, } impl BlobCacheConfigMetaBlob { @@ -48,15 +49,20 @@ impl BlobCacheConfigMetaBlob { &self.config } + /// Get optional extra information associated with a blob object. + pub fn get_blob_extra_info(&self, blob_id: &str) -> Option<&RafsBlobExtraInfo> { + self.blob_extra_infos.get(blob_id) + } + fn add_data_blob(&self, blob: Arc) { - self.data_blobs.lock().unwrap().push(blob); + self.blobs.lock().unwrap().push(blob); } } /// Configuration information for cached data blob objects. pub struct BlobCacheConfigDataBlob { - blob_info: Arc, scoped_blob_id: String, + blob_info: Arc, config: Arc, ref_count: AtomicU32, } @@ -107,6 +113,7 @@ impl BlobCacheObjectConfig { blob_id: String, path: PathBuf, config: Arc, + blob_extra_infos: HashMap, ) -> Self { let scoped_blob_id = generate_blob_key(&domain_id, &blob_id); @@ -115,7 +122,8 @@ impl BlobCacheObjectConfig { scoped_blob_id, path, config, - data_blobs: Mutex::new(Vec::new()), + blobs: Mutex::new(Vec::new()), + blob_extra_infos, })) } @@ -191,7 +199,7 @@ impl BlobCacheState { None => return Err(enoent!("blob_cache: cache entry not found")), Some(BlobCacheObjectConfig::MetaBlob(o)) => { is_meta = true; - data_blobs = o.data_blobs.lock().unwrap().clone(); + data_blobs = o.blobs.lock().unwrap().clone(); } Some(BlobCacheObjectConfig::DataBlob(o)) => { data_blobs.push(o.clone()); @@ -253,18 +261,20 @@ impl BlobCacheMgr { e }) } - BLOB_CACHE_TYPE_DATA_BLOB => { - warn!("blob_cache: invalid data blob cache entry: {:?}", entry); - Err(einval!("blob_cache: invalid data blob cache entry")) - } - _ => { - warn!("blob_cache: invalid blob cache entry: {:?}", entry); - Err(einval!("blob_cache: invalid blob cache entry")) - } + BLOB_CACHE_TYPE_DATA_BLOB => Err(einval!(format!( + "blob_cache: invalid data blob cache entry: {:?}", + entry + ))), + _ => Err(einval!(format!( + "blob_cache: invalid blob cache entry, {:?}", + entry + ))), } } /// Add a list of meta/data blobs to be cached by the cache manager. + /// + /// If failed to add some blob, the blobs already added won't be rolled back. pub fn add_blob_list(&self, blobs: &BlobCacheList) -> Result<()> { for entry in blobs.blobs.iter() { self.add_blob_entry(entry)?; @@ -289,17 +299,10 @@ impl BlobCacheMgr { } fn get_meta_info(&self, entry: &BlobCacheEntry) -> Result<(PathBuf, Arc)> { - // Validate type of backend and cache. let config = entry .blob_config .as_ref() .ok_or_else(|| einval!("missing blob cache configuration information"))?; - if config.cache.cache_type != "fscache" { - return Err(einval!( - "blob_cache: `config.cache_type` for meta blob is invalid" - )); - } - let cache_config = config.cache.get_fscache_config()?; if entry.blob_id.contains(ID_SPLITTER) { return Err(einval!("blob_cache: `blob_id` for meta blob is invalid")); @@ -322,15 +325,33 @@ impl BlobCacheMgr { )); } - // Validate the working directory for fscache - let path2 = Path::new(&cache_config.work_dir); - let path2 = path2 - .canonicalize() - .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; - if !path2.is_dir() { - return Err(einval!( - "blob_cache: `config.cache_config.work_dir` is not a directory" - )); + // Validate type of backend and cache. + if config.cache.is_fscache() { + // Validate the working directory for fscache + let cache_config = config.cache.get_fscache_config()?; + let path2 = Path::new(&cache_config.work_dir); + let path2 = path2 + .canonicalize() + .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; + if !path2.is_dir() { + return Err(einval!( + "blob_cache: `config.cache_config.work_dir` is not a directory" + )); + } + } else if config.cache.is_filecache() { + // Validate the working directory for filecache + let cache_config = config.cache.get_filecache_config()?; + let path2 = Path::new(&cache_config.work_dir); + let path2 = path2 + .canonicalize() + .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; + if !path2.is_dir() { + return Err(einval!( + "blob_cache: `config.cache_config.work_dir` is not a directory" + )); + } + } else { + return Err(einval!("blob_cache: unknown cache type")); } let config: Arc = Arc::new(config.into()); @@ -347,12 +368,15 @@ impl BlobCacheMgr { config: Arc, ) -> Result<()> { let (rs, _) = RafsSuper::load_from_file(&path, config.clone(), true, false)?; + let blob_extra_infos = rs.superblock.get_blob_extra_infos()?; let meta = BlobCacheObjectConfig::new_meta_blob( domain_id.to_string(), id.to_string(), path, config, + blob_extra_infos, ); + // Safe to unwrap because it's a meta blob object. let meta_obj = meta.meta_config().unwrap(); let mut state = self.get_state(); state.try_add(meta)?; @@ -461,7 +485,8 @@ mod tests { scoped_blob_id: "domain1".to_string(), path: path.clone(), config, - data_blobs: Mutex::new(Vec::new()), + blobs: Mutex::new(Vec::new()), + blob_extra_infos: HashMap::new(), }; assert_eq!(blob.path(), &path); } From e4dc7f87645e64cc0c4f1ba789ad22acaa5e9403 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sun, 19 Feb 2023 14:39:38 +0800 Subject: [PATCH 3/7] service: add common code to compose a block device from a RAFSv6 image Add common code to compose a block device from a RAFS image, which then can used exposed through nbd/ublk/virtio-blk/vhost-user-blk etc. Signed-off-by: Jiang Liu --- Cargo.lock | 10 ++++ service/Cargo.toml | 2 + service/src/blob_cache.rs | 8 +++ service/src/block_device.rs | 108 ++++++++++++++++++++++++++++++++++++ service/src/fs_cache.rs | 10 +++- service/src/lib.rs | 2 + 6 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 service/src/block_device.rs diff --git a/Cargo.lock b/Cargo.lock index 80709f97e57..afd545160f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dbs-allocator" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "543711b94b4bc1437d2ebb45f856452e96a45a67ab39f8dcf8c887c2a3701004" +dependencies = [ + "thiserror", +] + [[package]] name = "dbs-uhttp" version = "0.3.2" @@ -1254,6 +1263,7 @@ dependencies = [ name = "nydus-service" version = "0.2.0" dependencies = [ + "dbs-allocator", "fuse-backend-rs", "libc", "log", diff --git a/service/Cargo.toml b/service/Cargo.toml index 9499988badc..b320f739a61 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" resolver = "2" [dependencies] +dbs-allocator = { version = "0.1.1", optional = true } fuse-backend-rs = "0.10.1" libc = "0.2" log = "0.4.8" @@ -48,3 +49,4 @@ virtiofs = [ "virtio-bindings", ] +block-device = [ "dbs-allocator", "tokio/fs"] diff --git a/service/src/blob_cache.rs b/service/src/blob_cache.rs index e05e15c91eb..04b0cc2ceb1 100644 --- a/service/src/blob_cache.rs +++ b/service/src/blob_cache.rs @@ -49,6 +49,10 @@ impl BlobCacheConfigMetaBlob { &self.config } + pub fn get_blobs(&self) -> Vec> { + self.blobs.lock().unwrap().clone() + } + /// Get optional extra information associated with a blob object. pub fn get_blob_extra_info(&self, blob_id: &str) -> Option<&RafsBlobExtraInfo> { self.blob_extra_infos.get(blob_id) @@ -368,6 +372,10 @@ impl BlobCacheMgr { config: Arc, ) -> Result<()> { let (rs, _) = RafsSuper::load_from_file(&path, config.clone(), true, false)?; + if rs.meta.is_v5() { + return Err(einval!("blob_cache: RAFSv5 image is not supported")); + } + let blob_extra_infos = rs.superblock.get_blob_extra_infos()?; let meta = BlobCacheObjectConfig::new_meta_blob( domain_id.to_string(), diff --git a/service/src/block_device.rs b/service/src/block_device.rs new file mode 100644 index 00000000000..308d4ebb139 --- /dev/null +++ b/service/src/block_device.rs @@ -0,0 +1,108 @@ +// Copyright (C) 2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: (Apache-2.0) + +//! Represent a RAFSv6 image as a block device or a group of block devices. + +use std::cmp::min; +use std::io::Result; +use std::sync::Arc; + +use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; +use nydus_rafs::metadata::layout::v6::EROFS_BLOCK_BITS; + +use crate::blob_cache::{ + BlobCacheConfigDataBlob, BlobCacheConfigMetaBlob, BlobCacheMgr, BlobCacheObjectConfig, +}; + +enum BlockRange { + Hole, + MetaBlob(Arc), + DataBlob(Arc), +} + +pub struct BlockDevice { + blob_id: String, + //blob_cache_mgr: Arc, + //meta_blob: Arc, + //data_blobs: Vec>, + ranges: IntervalTree, +} + +impl BlockDevice { + pub fn new(blob_id: String, blob_cache_mgr: Arc) -> Result { + let mut ranges = IntervalTree::new(); + //ranges.insert(Range::new(0, u32::MAX), None); + + /* + let meta_blob = match blob_cache_mgr.get_config(&blob_id) { + None => return Err(enoent!("block_device: can not find blob {} in blob cache manager", blob_id)), + Some(BlobCacheObjectConfig::DataBlob(_v)) => return Err(einval!("block_device: blob {} is not a metadata blob", blob_id)), + Some(BlobCacheObjectConfig::MetaBlob(v)) => v, + }; + let constraint = Constraint::new(meta_blob.blocks()).min(0).max(meta_blob.blocks()); + let range = ranges.allocate(&constraint)?; + ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone())); + + let data_blobs = meta_blob.get_blobs(); + for blob in data_blobs.iter() { + let blob_info = blob.blob_info(); + let blob_id = blob_info.blob_id(); + let extra_info = meta_blob.get_blob_extra_info(&blob_id).ok_or_else(|| + enoent!("block_device: can not get extra information for blob {}", blob_id) + )?; + if extra_info.mapped_blkaddr == 0 { + return Err(einval!("block_device: mapped block address for blob {} is zero", blob_id)); + } + let constraint = Constraint::new(blob.blocks()).min(extra_info.mapped_blkaddr).max(extra_info.mapped_blkaddr + blob.blocks()); + let range = ranges.allocate(&constraint)?; + ranges.update(&range, BlockRange::DataBlob(blob.clone())); + } + */ + + Ok(BlockDevice { blob_id, ranges }) + } + + /// Read block range [start, start + blocks) from the block device. + pub async fn read(&self, mut start: u32, mut blocks: u32, buf: &mut [u8]) -> Result<()> { + if start.checked_add(blocks).is_none() + || (blocks as u64) << EROFS_BLOCK_BITS > buf.len() as u64 + { + return Err(einval!("block_device: invalid parameters to read()")); + } + + let mut pos = 0; + while blocks > 0 { + let (range, node) = match self.ranges.get_superset(&Range::new_point(start)) { + Some(v) => v, + None => { + return Err(eio!(format!( + "block_device: can not locate block 0x{:x} for meta blob {}", + start, self.blob_id + ))) + } + }; + + let count = min(range.len() as u32, blocks); + let sz = (count as usize) << EROFS_BLOCK_BITS as usize; + if let NodeState::Valued(r) = node { + match r { + BlockRange::Hole => buf[pos..pos + sz].fill(0), + BlockRange::MetaBlob(_m) => unimplemented!(), + BlockRange::DataBlob(_d) => unimplemented!(), + } + } else { + return Err(eio!(format!( + "block_device: block range 0x{:x}/0x{:x} of meta blob {} is unhandled", + start, blocks, self.blob_id, + ))); + } + + start += count; + blocks -= count; + pos += sz; + } + + Ok(()) + } +} diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index b70f9acaa57..228bab8c8b9 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -2,7 +2,15 @@ // // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) -//! FsCache manager to cooperate with the Linux fscache subsystem to support EROFS. +//! Handler to expose RAFSv6 image through EROFS/fscache. +//! +//! The [`FsCacheHandler`] is the inter-connection between in kernel EROFS/fscache drivers +//! and the user space [BlobCacheMgr](https://docs.rs/nydus-service/latest/nydus_service/blob_cache/struct.BlobCacheMgr.html). +//! The workflow is as below: +//! - EROFS presents a filesystem structure by parsing a RAFS image metadata blob. +//! - EROFS sends requests to the fscache subsystem when user reads data from files. +//! - Fscache subsystem send requests to [FsCacheHandler] if the requested data has been cached yet. +//! - [FsCacheHandler] reads blob data from the [BlobCacheMgr] and sends back reply messages. use std::collections::hash_map::Entry::Vacant; use std::collections::HashMap; diff --git a/service/src/lib.rs b/service/src/lib.rs index 4b44d19a391..dfa1538b81f 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -31,6 +31,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; pub mod blob_cache; +#[cfg(feature = "block-device")] +pub mod block_device; pub mod daemon; #[cfg(target_os = "linux")] mod fs_cache; From 10a2fef0cbd94549186c4d2299b83ee357972fbf Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 20 Feb 2023 13:45:07 +0800 Subject: [PATCH 4/7] service: compose a block device from a RAFSv6 image Compose a block device from a RAFSv6 image, so all metadata/data content can be accessed by block address. The EROFS fs driver can be used to directly mount the block device. It depends on the blob_cache subsystem and can be used to implement nbd/ublk/virtio-blk/vhost-user-blk servers. Signed-off-by: Jiang Liu --- Cargo.lock | 1 + service/Cargo.toml | 1 + service/src/blob_cache.rs | 249 ++++++++++--- service/src/block_device.rs | 344 +++++++++++++++--- service/src/fs_cache.rs | 21 +- ...1aa800ed0fb09d701aaec469964e9d54325f0d5fef | Bin 0 -> 14554 bytes tests/texture/bootstrap/rafs-v6-2.2.boot | Bin 0 -> 20480 bytes 7 files changed, 491 insertions(+), 125 deletions(-) create mode 100644 tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef create mode 100644 tests/texture/bootstrap/rafs-v6-2.2.boot diff --git a/Cargo.lock b/Cargo.lock index afd545160f6..5c23bb348b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1280,6 +1280,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-uring", "vhost", "vhost-user-backend", "virtio-bindings", diff --git a/service/Cargo.toml b/service/Cargo.toml index b320f739a61..54df0f7337b 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -22,6 +22,7 @@ serde_json = "1.0.51" thiserror = "1.0" time = { version = "0.3.14", features = ["serde-human-readable"] } tokio = { version = "1.24", features = ["macros"] } +tokio-uring = "0.4" nydus-api = { version = "0.2.2", path = "../api", features = ["handler"] } nydus-error = { version = "0.2.3", path = "../error" } diff --git a/service/src/blob_cache.rs b/service/src/blob_cache.rs index 04b0cc2ceb1..e677a9ee468 100644 --- a/service/src/blob_cache.rs +++ b/service/src/blob_cache.rs @@ -5,7 +5,9 @@ //! Blob cache manager to cache RAFS meta/data blob objects. use std::collections::HashMap; +use std::fs::OpenOptions; use std::io::{Error, ErrorKind, Result}; +use std::os::fd::FromRawFd; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -14,8 +16,13 @@ use nydus_api::{ BlobCacheEntry, BlobCacheList, BlobCacheObjectId, ConfigV2, BLOB_CACHE_TYPE_DATA_BLOB, BLOB_CACHE_TYPE_META_BLOB, }; +use nydus_rafs::metadata::layout::v6::{EROFS_BLOCK_BITS, EROFS_BLOCK_SIZE}; use nydus_rafs::metadata::{RafsBlobExtraInfo, RafsSuper}; +use nydus_storage::cache::BlobCache; use nydus_storage::device::BlobInfo; +use nydus_storage::factory::BLOB_FACTORY; +use tokio_uring::buf::IoBufMut; +use tokio_uring::fs::File; const ID_SPLITTER: &str = "/"; @@ -28,17 +35,22 @@ pub fn generate_blob_key(domain_id: &str, blob_id: &str) -> String { } } -/// Configuration information for cached meta blob objects. -pub struct BlobCacheConfigMetaBlob { - _blob_id: String, +/// Configuration information for a cached metadata blob. +pub struct MetaBlobConfig { + blob_id: String, scoped_blob_id: String, path: PathBuf, config: Arc, - blobs: Mutex>>, + blobs: Mutex>>, blob_extra_infos: HashMap, } -impl BlobCacheConfigMetaBlob { +impl MetaBlobConfig { + /// Get blob id. + pub fn blob_id(&self) -> &str { + &self.blob_id + } + /// Get file path to access the meta blob. pub fn path(&self) -> &Path { &self.path @@ -49,7 +61,7 @@ impl BlobCacheConfigMetaBlob { &self.config } - pub fn get_blobs(&self) -> Vec> { + pub fn get_blobs(&self) -> Vec> { self.blobs.lock().unwrap().clone() } @@ -58,20 +70,20 @@ impl BlobCacheConfigMetaBlob { self.blob_extra_infos.get(blob_id) } - fn add_data_blob(&self, blob: Arc) { + fn add_data_blob(&self, blob: Arc) { self.blobs.lock().unwrap().push(blob); } } -/// Configuration information for cached data blob objects. -pub struct BlobCacheConfigDataBlob { +/// Configuration information for a cached data blob. +pub struct DataBlobConfig { scoped_blob_id: String, blob_info: Arc, config: Arc, ref_count: AtomicU32, } -impl BlobCacheConfigDataBlob { +impl DataBlobConfig { /// Get the [`BlobInfo`](https://docs.rs/nydus-storage/latest/nydus_storage/device/struct.BlobInfo.html) object associated with the cached data blob. pub fn blob_info(&self) -> &Arc { &self.blob_info @@ -83,28 +95,28 @@ impl BlobCacheConfigDataBlob { } } -/// Configuration information for cached blob objects. +/// Configuration information for a cached metadata/data blob. #[derive(Clone)] -pub enum BlobCacheObjectConfig { +pub enum BlobConfig { /// Configuration information for cached meta blob objects. - MetaBlob(Arc), + MetaBlob(Arc), /// Configuration information for cached data blob objects. - DataBlob(Arc), + DataBlob(Arc), } -impl BlobCacheObjectConfig { +impl BlobConfig { /// Get the ['ConfigV2'] object associated with the cached data blob. pub fn config_v2(&self) -> &Arc { match self { - BlobCacheObjectConfig::MetaBlob(v) => v.config_v2(), - BlobCacheObjectConfig::DataBlob(v) => v.config_v2(), + BlobConfig::MetaBlob(v) => v.config_v2(), + BlobConfig::DataBlob(v) => v.config_v2(), } } fn new_data_blob(domain_id: String, blob_info: Arc, config: Arc) -> Self { let scoped_blob_id = generate_blob_key(&domain_id, &blob_info.blob_id()); - BlobCacheObjectConfig::DataBlob(Arc::new(BlobCacheConfigDataBlob { + BlobConfig::DataBlob(Arc::new(DataBlobConfig { blob_info, scoped_blob_id, config, @@ -121,8 +133,8 @@ impl BlobCacheObjectConfig { ) -> Self { let scoped_blob_id = generate_blob_key(&domain_id, &blob_id); - BlobCacheObjectConfig::MetaBlob(Arc::new(BlobCacheConfigMetaBlob { - _blob_id: blob_id, + BlobConfig::MetaBlob(Arc::new(MetaBlobConfig { + blob_id, scoped_blob_id, path, config, @@ -133,22 +145,22 @@ impl BlobCacheObjectConfig { fn key(&self) -> &str { match self { - BlobCacheObjectConfig::MetaBlob(o) => &o.scoped_blob_id, - BlobCacheObjectConfig::DataBlob(o) => &o.scoped_blob_id, + BlobConfig::MetaBlob(o) => &o.scoped_blob_id, + BlobConfig::DataBlob(o) => &o.scoped_blob_id, } } - fn meta_config(&self) -> Option> { + fn meta_config(&self) -> Option> { match self { - BlobCacheObjectConfig::MetaBlob(o) => Some(o.clone()), - BlobCacheObjectConfig::DataBlob(_o) => None, + BlobConfig::MetaBlob(o) => Some(o.clone()), + BlobConfig::DataBlob(_o) => None, } } } #[derive(Default)] struct BlobCacheState { - id_to_config_map: HashMap, + id_to_config_map: HashMap, } impl BlobCacheState { @@ -158,19 +170,19 @@ impl BlobCacheState { } } - fn try_add(&mut self, config: BlobCacheObjectConfig) -> Result<()> { + fn try_add(&mut self, config: BlobConfig) -> Result<()> { let key = config.key(); if let Some(entry) = self.id_to_config_map.get(key) { match entry { - BlobCacheObjectConfig::MetaBlob(_o) => { + BlobConfig::MetaBlob(_o) => { // Meta blob must be unique. return Err(Error::new( ErrorKind::AlreadyExists, "blob_cache: bootstrap blob already exists", )); } - BlobCacheObjectConfig::DataBlob(o) => { + BlobConfig::DataBlob(o) => { // Data blob is reference counted. o.ref_count.fetch_add(1, Ordering::AcqRel); } @@ -187,12 +199,8 @@ impl BlobCacheState { // Remove all blobs associated with the domain. let scoped_blob_prefix = format!("{}{}", param.domain_id, ID_SPLITTER); self.id_to_config_map.retain(|_k, v| match v { - BlobCacheObjectConfig::MetaBlob(o) => { - !o.scoped_blob_id.starts_with(&scoped_blob_prefix) - } - BlobCacheObjectConfig::DataBlob(o) => { - !o.scoped_blob_id.starts_with(&scoped_blob_prefix) - } + BlobConfig::MetaBlob(o) => !o.scoped_blob_id.starts_with(&scoped_blob_prefix), + BlobConfig::DataBlob(o) => !o.scoped_blob_id.starts_with(&scoped_blob_prefix), }); } else { let mut data_blobs = Vec::new(); @@ -201,11 +209,11 @@ impl BlobCacheState { match self.id_to_config_map.get(&scoped_blob_prefix) { None => return Err(enoent!("blob_cache: cache entry not found")), - Some(BlobCacheObjectConfig::MetaBlob(o)) => { + Some(BlobConfig::MetaBlob(o)) => { is_meta = true; data_blobs = o.blobs.lock().unwrap().clone(); } - Some(BlobCacheObjectConfig::DataBlob(o)) => { + Some(BlobConfig::DataBlob(o)) => { data_blobs.push(o.clone()); } } @@ -224,7 +232,7 @@ impl BlobCacheState { Ok(()) } - fn get(&self, key: &str) -> Option { + fn get(&self, key: &str) -> Option { self.id_to_config_map.get(key).cloned() } } @@ -293,7 +301,7 @@ impl BlobCacheMgr { } /// Get configuration information of the cached blob with specified `key`. - pub fn get_config(&self, key: &str) -> Option { + pub fn get_config(&self, key: &str) -> Option { self.get_state().get(key) } @@ -377,7 +385,7 @@ impl BlobCacheMgr { } let blob_extra_infos = rs.superblock.get_blob_extra_infos()?; - let meta = BlobCacheObjectConfig::new_meta_blob( + let meta = BlobConfig::new_meta_blob( domain_id.to_string(), id.to_string(), path, @@ -396,13 +404,10 @@ impl BlobCacheMgr { &bi.blob_id(), domain_id ); - let data_blob = BlobCacheObjectConfig::new_data_blob( - domain_id.to_string(), - bi, - meta_obj.config.clone(), - ); + let data_blob = + BlobConfig::new_data_blob(domain_id.to_string(), bi, meta_obj.config.clone()); let data_blob_config = match &data_blob { - BlobCacheObjectConfig::DataBlob(entry) => entry.clone(), + BlobConfig::DataBlob(entry) => entry.clone(), _ => panic!("blob_cache: internal error"), }; @@ -424,6 +429,115 @@ impl BlobCacheMgr { } } +/// Structure representing a cached metadata blob. +pub struct MetaBlob { + file: File, + size: u64, +} + +impl MetaBlob { + /// Create a new [MetaBlob] object from + pub fn new>(path: P) -> Result { + let file = OpenOptions::new() + .read(true) + .write(false) + .open(path.as_ref()) + .map_err(|e| { + warn!( + "blob_cache: failed to open metadata blob {}", + path.as_ref().display() + ); + e + })?; + let md = file.metadata().map_err(|e| { + warn!( + "blob_cache: failed to get metadata about metadata blob {}", + path.as_ref().display() + ); + e + })?; + let size = md.len(); + if size % EROFS_BLOCK_SIZE != 0 || (size >> EROFS_BLOCK_BITS) > u32::MAX as u64 { + return Err(einval!(format!( + "blob_cache: metadata blob size (0x{:x}) is invalid", + size + ))); + } + + Ok(MetaBlob { + file: File::from_std(file), + size, + }) + } + + /// Get number of blocks in unit of EROFS_BLOCK_SIZE. + pub fn blocks(&self) -> u32 { + (self.size >> EROFS_BLOCK_BITS) as u32 + } + + /// Read data from the cached metadata blob in asynchronous mode. + pub async fn async_read(&self, pos: u64, buf: T) -> (Result, T) { + self.file.read_at(buf, pos).await + } +} + +/// Structure representing a cached data blob. +pub struct DataBlob { + blob_id: String, + blob: Arc, + file: File, +} + +impl DataBlob { + /// Create a new instance of [DataBlob]. + pub fn new(config: &Arc) -> Result { + let blob_id = config.blob_info().blob_id(); + let blob = BLOB_FACTORY + .new_blob_cache(config.config_v2(), &config.blob_info) + .map_err(|e| { + warn!( + "blob_cache: failed to create cache object for blob {}", + blob_id + ); + e + })?; + + match blob.get_blob_object() { + Some(obj) => { + let fd = nix::unistd::dup(obj.as_raw_fd())?; + // Safe because the `fd` is valid. + let file = unsafe { File::from_raw_fd(fd) }; + Ok(DataBlob { + blob_id, + blob, + file, + }) + } + None => Err(eio!(format!( + "blob_cache: failed to get BlobObject for blob {}", + blob_id + ))), + } + } + + /// Read data from the cached data blob in asynchronous mode. + pub async fn async_read(&self, pos: u64, buf: T) -> (Result, T) { + match self.blob.get_blob_object() { + Some(obj) => match obj.fetch_range_uncompressed(pos, buf.bytes_total() as u64) { + Ok(_) => self.file.read_at(buf, pos).await, + Err(e) => (Err(e), buf), + }, + None => ( + Err(eio!(format!( + "blob_cache: failed to get BlobObject for blob {}", + self.blob_id + ))), + buf, + ), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -488,8 +602,8 @@ mod tests { assert_eq!(&backend_cfg.backend_type, "localfs"); assert_eq!(&cache_cfg.cache_type, "fscache"); - let blob = BlobCacheConfigMetaBlob { - _blob_id: "123456789-123".to_string(), + let blob = MetaBlobConfig { + blob_id: "123456789-123".to_string(), scoped_blob_id: "domain1".to_string(), path: path.clone(), config, @@ -573,12 +687,12 @@ mod tests { let tmpdir = TempDir::new().unwrap(); let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); let mut source_path = PathBuf::from(root_dir); - source_path.push("../tests/texture/bootstrap/rafs-v5.boot"); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); let config = r#" { "type": "bootstrap", - "id": "rafs-v5", + "id": "rafs-v6", "domain_id": "domain2", "config_v2": { "version": 2, @@ -612,35 +726,56 @@ mod tests { // Check existence of data blob referenced by the bootstrap. let key = generate_blob_key( &entry.domain_id, - "7fe907a0c9c7f35538f23f40baae5f2e8d148a3a6186f0f443f62d04b5e2d731", + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", ); assert!(mgr.get_config(&key).is_some()); - assert_eq!(mgr.get_state().id_to_config_map.len(), 19); + assert_eq!(mgr.get_state().id_to_config_map.len(), 2); - entry.blob_id = "rafs-v5-cloned".to_string(); + entry.blob_id = "rafs-v6-cloned".to_string(); let blob_id_cloned = generate_blob_key(&entry.domain_id, &entry.blob_id); mgr.add_blob_entry(&entry).unwrap(); - assert_eq!(mgr.get_state().id_to_config_map.len(), 20); + assert_eq!(mgr.get_state().id_to_config_map.len(), 3); assert!(mgr.get_config(&blob_id).is_some()); assert!(mgr.get_config(&blob_id_cloned).is_some()); mgr.remove_blob_entry(&BlobCacheObjectId { domain_id: entry.domain_id.clone(), - blob_id: "rafs-v5".to_string(), + blob_id: "rafs-v6".to_string(), }) .unwrap(); - assert_eq!(mgr.get_state().id_to_config_map.len(), 19); + assert_eq!(mgr.get_state().id_to_config_map.len(), 2); assert!(mgr.get_config(&blob_id).is_none()); assert!(mgr.get_config(&blob_id_cloned).is_some()); mgr.remove_blob_entry(&BlobCacheObjectId { domain_id: entry.domain_id, - blob_id: "rafs-v5-cloned".to_string(), + blob_id: "rafs-v6-cloned".to_string(), }) .unwrap(); assert_eq!(mgr.get_state().id_to_config_map.len(), 0); assert!(mgr.get_config(&blob_id).is_none()); assert!(mgr.get_config(&blob_id_cloned).is_none()); } + + #[test] + fn test_meta_blob() { + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + + tokio_uring::start(async move { + let meta_blob = MetaBlob::new(&source_path).unwrap(); + assert_eq!(meta_blob.blocks(), 5); + let buf = vec![0u8; 4096]; + let (res, buf) = meta_blob.async_read(0, buf).await; + assert_eq!(res.unwrap(), 4096); + assert_eq!(buf[0], 0); + assert_eq!(buf[1023], 0); + assert_eq!(buf[1024], 0xe2); + assert_eq!(buf[1027], 0xe0); + let (res, _buf) = meta_blob.async_read(0x6000, buf).await; + assert_eq!(res.unwrap(), 0); + }); + } } diff --git a/service/src/block_device.rs b/service/src/block_device.rs index 308d4ebb139..ae099f5c465 100644 --- a/service/src/block_device.rs +++ b/service/src/block_device.rs @@ -2,7 +2,14 @@ // // SPDX-License-Identifier: (Apache-2.0) -//! Represent a RAFSv6 image as a block device or a group of block devices. +//! Represent a RAFSv6 image as a block device. +//! +//! Metadata of RAFSv6 image has two address encoding schemes: +//! - blob address: data is located by (blob_index, chunk_index) +//! - block address: data is located by (block_addr) +//! +//! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block +//! device, so it can be directly mounted by Linux EROFS fs driver. use std::cmp::min; use std::io::Result; @@ -10,99 +17,322 @@ use std::sync::Arc; use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; use nydus_rafs::metadata::layout::v6::EROFS_BLOCK_BITS; +use tokio_uring::buf::IoBufMut; -use crate::blob_cache::{ - BlobCacheConfigDataBlob, BlobCacheConfigMetaBlob, BlobCacheMgr, BlobCacheObjectConfig, -}; +use crate::blob_cache::{BlobCacheMgr, BlobConfig, DataBlob, MetaBlob}; enum BlockRange { Hole, - MetaBlob(Arc), - DataBlob(Arc), + MetaBlob(Arc), + DataBlob(Arc), } +/// A block device composed up from a RAFSv6 image. +/// +/// RAFSv6 metadata has two encoding schemes: +/// - blob address: data is located by (blob_index, chunk_index) +/// - block address: data is located by (block_addr) +/// +/// Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block +/// device, so it can be directly mounted by Linux EROFS fs driver. pub struct BlockDevice { + blocks: u32, blob_id: String, - //blob_cache_mgr: Arc, - //meta_blob: Arc, - //data_blobs: Vec>, + cache_mgr: Arc, ranges: IntervalTree, } impl BlockDevice { - pub fn new(blob_id: String, blob_cache_mgr: Arc) -> Result { + /// Create a new instance of [BlockDevice]. + pub fn new(blob_id: String, cache_mgr: Arc) -> Result { let mut ranges = IntervalTree::new(); - //ranges.insert(Range::new(0, u32::MAX), None); + ranges.insert(Range::new(0, u32::MAX), None); - /* - let meta_blob = match blob_cache_mgr.get_config(&blob_id) { - None => return Err(enoent!("block_device: can not find blob {} in blob cache manager", blob_id)), - Some(BlobCacheObjectConfig::DataBlob(_v)) => return Err(einval!("block_device: blob {} is not a metadata blob", blob_id)), - Some(BlobCacheObjectConfig::MetaBlob(v)) => v, + let meta_blob_config = match cache_mgr.get_config(&blob_id) { + None => { + return Err(enoent!(format!( + "block_device: can not find blob {} in blob cache manager", + blob_id + ))) + } + Some(BlobConfig::DataBlob(_v)) => { + return Err(einval!(format!( + "block_device: blob {} is not a metadata blob", + blob_id + ))) + } + Some(BlobConfig::MetaBlob(v)) => v, }; - let constraint = Constraint::new(meta_blob.blocks()).min(0).max(meta_blob.blocks()); - let range = ranges.allocate(&constraint)?; - ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone())); - - let data_blobs = meta_blob.get_blobs(); - for blob in data_blobs.iter() { - let blob_info = blob.blob_info(); - let blob_id = blob_info.blob_id(); - let extra_info = meta_blob.get_blob_extra_info(&blob_id).ok_or_else(|| - enoent!("block_device: can not get extra information for blob {}", blob_id) - )?; - if extra_info.mapped_blkaddr == 0 { - return Err(einval!("block_device: mapped block address for blob {} is zero", blob_id)); - } - let constraint = Constraint::new(blob.blocks()).min(extra_info.mapped_blkaddr).max(extra_info.mapped_blkaddr + blob.blocks()); - let range = ranges.allocate(&constraint)?; - ranges.update(&range, BlockRange::DataBlob(blob.clone())); + let meta_blob = MetaBlob::new(meta_blob_config.path())?; + let meta_blob = Arc::new(meta_blob); + let constraint = Constraint::new(meta_blob.blocks()) + .min(0u32) + .max(meta_blob.blocks() - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!(format!( + "block_device: failed to allocate address range for meta blob {}", + meta_blob_config.blob_id() + )) + })?; + ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone())); + + let mut pos = meta_blob.blocks(); + let data_blobs = meta_blob_config.get_blobs(); + for blob in data_blobs.iter() { + let blob_info = blob.blob_info(); + let blob_id = blob_info.blob_id(); + let extra_info = meta_blob_config + .get_blob_extra_info(&blob_id) + .ok_or_else(|| { + let msg = format!( + "block_device: can not get extra information for blob {}", + blob_id + ); + enoent!(msg) + })?; + if extra_info.mapped_blkaddr == 0 { + let msg = format!( + "block_device: mapped block address for blob {} is zero", + blob_id + ); + return Err(einval!(msg)); + } + + if pos < extra_info.mapped_blkaddr { + let constraint = Constraint::new(extra_info.mapped_blkaddr - pos) + .min(pos) + .max(extra_info.mapped_blkaddr - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!("block_device: failed to allocate address range for hole between blobs") + })?; + ranges.update(&range, BlockRange::Hole); + } + + let blocks = blob_info.uncompressed_size() >> EROFS_BLOCK_BITS; + if blocks > u32::MAX as u64 + || blocks + extra_info.mapped_blkaddr as u64 > u32::MAX as u64 + { + return Err(einval!(format!( + "block_device: uncompressed size 0x{:x} of blob {} is invalid", + blob_info.uncompressed_size(), + blob_info.blob_id() + ))); } - */ + let data_blob = DataBlob::new(blob)?; + let constraint = Constraint::new(blocks as u32) + .min(extra_info.mapped_blkaddr) + .max(extra_info.mapped_blkaddr + blocks as u32 - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!(format!( + "block_device: can not allocate address range for blob {}", + blob_info.blob_id() + )) + })?; + ranges.update(&range, BlockRange::DataBlob(Arc::new(data_blob))); + pos = extra_info.mapped_blkaddr + blocks as u32; + } + + Ok(BlockDevice { + blocks: pos, + blob_id, + cache_mgr, + ranges, + }) + } + + /// Get blob id of the metadata blob. + pub fn meta_blob_id(&self) -> &str { + &self.blob_id + } - Ok(BlockDevice { blob_id, ranges }) + /// Get the [BlobCacheMgr](../blob_cache/struct.BlobCacheMgr.html) associated with the block device. + pub fn cache_mgr(&self) -> Arc { + self.cache_mgr.clone() + } + + /// Get number of blocks of the block device. + pub fn blocks(&self) -> u32 { + self.blocks } /// Read block range [start, start + blocks) from the block device. - pub async fn read(&self, mut start: u32, mut blocks: u32, buf: &mut [u8]) -> Result<()> { + pub async fn async_read( + &self, + mut start: u32, + mut blocks: u32, + mut buf: T, + ) -> (Result, T) { if start.checked_add(blocks).is_none() - || (blocks as u64) << EROFS_BLOCK_BITS > buf.len() as u64 + || (blocks as u64) << EROFS_BLOCK_BITS > buf.bytes_total() as u64 { - return Err(einval!("block_device: invalid parameters to read()")); + return ( + Err(einval!("block_device: invalid parameters to read()")), + buf, + ); } + let total_size = (blocks as usize) << EROFS_BLOCK_BITS; let mut pos = 0; while blocks > 0 { let (range, node) = match self.ranges.get_superset(&Range::new_point(start)) { Some(v) => v, None => { - return Err(eio!(format!( - "block_device: can not locate block 0x{:x} for meta blob {}", - start, self.blob_id - ))) + return ( + Err(eio!(format!( + "block_device: can not locate block 0x{:x} for meta blob {}", + start, self.blob_id + ))), + buf, + ); } }; - let count = min(range.len() as u32, blocks); - let sz = (count as usize) << EROFS_BLOCK_BITS as usize; if let NodeState::Valued(r) = node { - match r { - BlockRange::Hole => buf[pos..pos + sz].fill(0), - BlockRange::MetaBlob(_m) => unimplemented!(), - BlockRange::DataBlob(_d) => unimplemented!(), + let count = min(range.max as u32 - start + 1, blocks); + let sz = (count as usize) << EROFS_BLOCK_BITS as usize; + let mut s = buf.slice(pos..pos + sz); + let (res, s) = match r { + BlockRange::Hole => { + s.fill(0); + (Ok(sz), s) + } + BlockRange::MetaBlob(m) => { + m.async_read((start as u64) << EROFS_BLOCK_BITS, s).await + } + BlockRange::DataBlob(d) => { + let offset = start as u64 - range.min; + d.async_read(offset << EROFS_BLOCK_BITS, s).await + } + }; + + buf = s.into_inner(); + if res.is_err() { + return (res, buf); } + start += count; + blocks -= count; + pos += sz; } else { - return Err(eio!(format!( - "block_device: block range 0x{:x}/0x{:x} of meta blob {} is unhandled", - start, blocks, self.blob_id, - ))); + return ( + Err(eio!(format!( + "block_device: block range 0x{:x}/0x{:x} of meta blob {} is unhandled", + start, blocks, self.blob_id, + ))), + buf, + ); } - - start += count; - blocks -= count; - pos += sz; } - Ok(()) + (Ok(total_size), buf) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blob_cache::generate_blob_key; + use nydus_api::BlobCacheEntry; + use std::fs; + use std::path::PathBuf; + use vmm_sys_util::tempdir::TempDir; + + #[test] + fn test_block_device() { + let tmpdir = TempDir::new().unwrap(); + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + let mut dest_path = tmpdir.as_path().to_path_buf(); + dest_path.push("be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + fs::copy(&source_path, &dest_path).unwrap(); + + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + let config = r#" + { + "type": "bootstrap", + "id": "rafs-v6", + "domain_id": "domain2", + "config_v2": { + "version": 2, + "id": "factory1", + "backend": { + "type": "localfs", + "localfs": { + "dir": "/tmp/nydus" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "/tmp/nydus" + } + }, + "metadata_path": "RAFS_V5" + } + }"#; + let content = config + .replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap()) + .replace("RAFS_V5", &source_path.display().to_string()); + let mut entry: BlobCacheEntry = serde_json::from_str(&content).unwrap(); + assert!(entry.prepare_configuration_info()); + + let mgr = BlobCacheMgr::new(); + mgr.add_blob_entry(&entry).unwrap(); + let blob_id = generate_blob_key(&entry.domain_id, &entry.blob_id); + assert!(mgr.get_config(&blob_id).is_some()); + + // Check existence of data blob referenced by the bootstrap. + let key = generate_blob_key( + &entry.domain_id, + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", + ); + assert!(mgr.get_config(&key).is_some()); + + let mgr = Arc::new(mgr); + let device = BlockDevice::new(blob_id.clone(), mgr).unwrap(); + assert_eq!(device.blocks(), 0x209); + + tokio_uring::start(async move { + let buf = vec![0u8; 8192]; + let (res, buf) = device.async_read(u32::MAX, u32::MAX, buf).await; + assert!(res.is_err()); + assert_eq!(buf.len(), 8192); + let (res, _buf) = device.async_read(0, 1, vec![0u8]).await; + assert!(res.is_err()); + + let (res, buf) = device.async_read(0, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 4096); + assert_eq!(buf[0], 0); + assert_eq!(buf[1023], 0); + assert_eq!(buf[1024], 0xe2); + assert_eq!(buf[1027], 0xe0); + + let (res, buf) = device.async_read(4, 2, buf).await; + assert_eq!(res.unwrap(), 8192); + assert_eq!(buf[4096], 0); + assert_eq!(buf[5119], 0); + assert_eq!(buf[5120], 0); + assert_eq!(buf[5123], 0); + assert_eq!(buf[5372], 0); + assert_eq!(buf[8191], 0); + + let (res, buf) = device.async_read(0x200, 2, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 8192); + + let (res, buf) = device.async_read(0x208, 2, buf).await; + assert_eq!(buf.len(), 8192); + assert!(res.is_err()); + + let (res, buf) = device.async_read(0x208, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 4096); + + let (res, buf) = device.async_read(0x209, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert!(res.is_err()); + }); } } diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index 228bab8c8b9..a98fb1c5cf3 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -33,8 +33,7 @@ use nydus_storage::device::BlobPrefetchRequest; use nydus_storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY}; use crate::blob_cache::{ - generate_blob_key, BlobCacheConfigDataBlob, BlobCacheConfigMetaBlob, BlobCacheMgr, - BlobCacheObjectConfig, + generate_blob_key, BlobCacheMgr, BlobConfig, DataBlobConfig, MetaBlobConfig, }; nix::ioctl_write_int!(fscache_cread, 0x98, 1); @@ -216,7 +215,7 @@ struct FsCacheBootstrap { struct FsCacheBlobCache { cache: Option>, - config: Arc, + config: Arc, file: Arc, } @@ -240,7 +239,7 @@ enum FsCacheObject { #[derive(Default)] struct FsCacheState { id_to_object_map: HashMap, - id_to_config_map: HashMap>, + id_to_config_map: HashMap>, blob_cache_mgr: Arc, } @@ -442,11 +441,11 @@ impl FsCacheHandler { self.reply(&format!("copen {},{}", hdr.msg_id, -libc::ENOENT)); } Some(cfg) => match cfg { - BlobCacheObjectConfig::DataBlob(config) => { + BlobConfig::DataBlob(config) => { let reply = self.handle_open_data_blob(hdr, msg, config); self.reply(&reply); } - BlobCacheObjectConfig::MetaBlob(config) => { + BlobConfig::MetaBlob(config) => { self.handle_open_bootstrap(hdr, msg, config); } }, @@ -457,7 +456,7 @@ impl FsCacheHandler { &self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen, - config: Arc, + config: Arc, ) -> String { let mut state = self.state.lock().unwrap(); if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) { @@ -509,7 +508,7 @@ impl FsCacheHandler { }); } - fn do_prefetch(cfg: &BlobCacheConfigDataBlob, blob: Arc) -> Result<()> { + fn do_prefetch(cfg: &DataBlobConfig, blob: Arc) -> Result<()> { let blob_info = cfg.blob_info().deref(); let cache_cfg = cfg.config_v2().get_cache_config()?; if !cache_cfg.prefetch.enable { @@ -554,7 +553,7 @@ impl FsCacheHandler { /// the chunk map file will be managed by the userspace daemon. We need to figure out the /// way to share blob/chunkamp files with filecache manager. fn create_data_blob_object( - config: &BlobCacheConfigDataBlob, + config: &DataBlobConfig, file: Arc, ) -> Result> { let mut blob_info = config.blob_info().deref().clone(); @@ -580,7 +579,7 @@ impl FsCacheHandler { &self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen, - config: Arc, + config: Arc, ) { let path = config.path().display(); let condvar = Arc::new((Mutex::new(false), Condvar::new())); @@ -936,7 +935,7 @@ impl FsCacheHandler { } #[inline] - fn get_config(&self, key: &str) -> Option { + fn get_config(&self, key: &str) -> Option { self.get_state().blob_cache_mgr.get_config(key) } } diff --git a/tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef b/tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef new file mode 100644 index 0000000000000000000000000000000000000000..35509f7c7511f1ec5484b3ddd04c1b2ea6ee9f3f GIT binary patch literal 14554 zcmeI11yCIA*5?P;;E=&>a2?!&ySof7gA;-iAPMg7?(S}ZNrDsH2DjjlK!6Y+K-j$R zeQ)i4_wH7GU)9~ayR~_$y3g~pozuTQr=RM7Gq-U)SkOzQr2|;!T0!8#5$8Ol6*q!9 z`#Te7Ntv4E;X&^X`iB4(bB+F%`gCA%MSzwsMCK=DU&zFbMR z-?ly`K^hKZl|}!1TYG9vYgUo(bEgHG)yzDmsO|QVZ$EO`yhZ_0A z>y-_%+>C2l;c=CC;P1^!Zk8a`q8Hv3bh+P(`9~S75kt`turOv&%ld{sD!RQG4T9^- z17{&UYmdCNb#8GRYYC%DGd!s8V!_xba(f-7dEzGmBRo`NhO#HSwKz!Fmxj;p$Vm?z zx*a+B9GnH^fYfdIdTaU(A`sBCFDQ@(1)7?^R`oy|~CLUZZQZ&-1aavoRg|A~WKGT;Y}{WuC6; z6C}EH=n>g)32R#C^XS?;@nR-nmzOE7Xs1+ikiG11YSY7|T7A7UkZ|#A+ ztl{H0r0f(2XMhXX-KXKhin$>*w%&^jtYuu*se+Pt{bFMH=yUFx)&g2N(90z>|cSVmvRR-L|d4Sy_ zx1Kb!e7Y2N^EZ6F-b9RF2x;7-_@?Xv=EPi~o{4@=x#5pfi64y8$@R!yP20))ZD;iu zs8hMYi^(6U-M;r<5ejehWUm+z;UBm!@H8=f3Q*M*$4wn82=KFp`$GqU+?PKbmF+** z#pD$}()Zbj?($9m3)X#euqB_)dF!@fekgz=g|D;FCnYx+n;HnOj*i9*S1*wYLDgv! zlBsA)P@nO*qrgg=^#T@d5ovu?vokHe=;6}0@11PYI`_NVP=@F4P+GiCO24FriEGBFner6>59#DSQ&uD!^hM`O|>UKI+_2m&UhLK(wbxC#M91g|&{j zgr7{}W!CiA0u{faU$B&p|vQiWn){cxX$@uL!lM$bTp z@Vz|&qAHBRl_U#_bj207gNDwm#hBsYNa4k0=Q&SQ5r^#kByba%1lEO}p9t;nvVeWD z@(wzmnR)@QzAB-_qS?qsCQ6NF;t_KrNF!rlINRI+C`gFtaAJmPBiH9l?xL5T;mVU3 z(0eEBWLr9=3>Yt36)pGKt;H|UL0bdzI$BQ=>)O7L=Fm^BM#&N6Xf4bToO?4aY;^iY z?lC_0J*Zs|me%xCyiLipU+m%0rr0Q)t6#ny>6E4WBbz~P-LqrSxH1MT-m@6~O4iG_ zN#~EWILZ42Xv57q))$JV7`DP^X`7gBxejhmHN@$Ar@8fFG+^#bX>&*8iL_SPlW!}I ztJnL_sps_PXb;E59Vr0-m{ilWkWblSnA=>y?AKTGe$0oe>K|CF3|VT&mrmu5SW^Tj zh9ZJf`k5k9v={oe`@Xu1^W?)B?RX@cCL~43bQ|k<<|s2#j^gb(t*Luwfj>2^i$hf zfJQ7N!c%^@^1okMy(goBkfcx{SN9yv;RlJ)PH6ZH#qIJOu;xg9UW4 zb7le8s_H-L=A%pYzEtmbJHPMRq5Yy%;d5{Hmb!uh4;ehAZ#5p)Wz#Bf<`3k#Z#M4o zs_#Lj`}Bm2;#100+F2TILGDYi6;hyKBLXPW5WSOtAQTY@2tmfhqyZ|cB=m?9-nYHRFTD* zrgG?dY6RdZ_I9y5d7o^X_7MB>5qrLeU#-8fJ=CHYQA$x)ISa*24j@IHs*SGYB(n(h zo|@Y?<16MZZg3E{O+_!<=O}*Q_9R2i9_tSc29PLs9k}| zf?CTx{Ql7Odo`zQK~&bkeg@I5k28?<^d!2DW_(CVWe;*NyJ(4hpG=~{rFTS-jOO4Z zw&T!=J6ylAjjWD8v%SyQ^S~WGAillY0%C0JFbhxYs|1g2yTh5VU)mZig(x%$#lgQ@ zx}V5t>B>KakJXuG)ByFZYer4E%i8L{eL8aFjfKP{mk3Qm*0G!t6lE_3Qpc8CZ^rXw z1WX4ef6Wou6>goo2%(Ym^fEgHiTSx$cZIsPZ3pden$?t$Zy<1xY4SookWjpnM|*3v z>#3J1&N~f1CUD1==PXy?>O`wS~0(mkGl_=)q$PR>;>% zL=zm(Xcj&?Iy!o#WY1u9ATp9=E<-Sn4}s)XB)f7ljUu=-7A*T$11TkAqw!-^>k*oO zMy)`I@W{bU7{KIe*P6Poh#2l{2jk7uia`dSQl)hgnSUSYT8R=iD^{3a!xQ6=zboL% zI#kgfuwE?k6Y!c$Y{t{O_Kh1T;p;vyxut*iIjdk?>8lhsiz4*iY@V!Ubb`%7w4r0t&DU*zm~=xoszjHRlgx za=*KACZrsXrGI0e1&iL`S1i2W40ku1JDcV_x{|Jg^ut(NpdSMFrqpGog_|&MKLCzY z2z;xyz{`Gg>XTYk&kgx2!*lp67$3=WIyC*Nq_nffZd$D&1kb`_mo;58;>TRRIQ z_Re`9qWdBVzPdeL?h`frJUo|=g_9}YGGo#rwoceOIP_v6Hm>!J(OFEE;r_W#ztVo{*p^g>$5)ZTeYLARg~c#pjQ&LS ztV$oO9!D!yFA(b)IV198kS!{fHpnGVg`>%sHv2s~7EXe95XGSEj{t=H_q@rNH!mFh z$KA_{g5PmpdX01ra``5w_!2(R5;7(v=14(F>JGe2LF zix*I2<%`8dO4b>qD?Dj;2_Up{5+U-POK%w-34MN9YIcmkVP^$lXw{&nAh?M1FIS>1 zVjikn7{e@?T%^F-0*{oK(XgnF@Z|PoG#cYGBxqwZ0$$&6uqTn3(s|n4f^o+%t(9;h z8FRcL^6|t!O>)_!N@qG-Hn=*(cf(lKYNdg!X;pDoS9>6pZ#Axwh(DZ%+U?$@LUieJ zY+CZpr)p4@#o;{SNNJMoEkgx!g+k;u)=#(sgd!jN%oeK&uo> z45??eK~sddV^j{p4v_?&hW$*NRTanbi-592aM~J+<|cO9xfvy6i5YC>-;IuS=I9?k zbX7f|wx=aHY8!WdqNMRXjOeS|BNc7II5D*TPR7;5@;pVb5x8p{w+D3=X3*vL%8Sh%LwJJOP|#K4B0!XJ{!8Fl&UU@`mFP8AyWIkTgEqHDa@~Edm z;#cxaw{oC7L>K?0to)lrouh4``iBt9t{svcVgZA$}GMPHn@FF z*(v>HFL5|Hq(JBOcKv#oxfr@K1Cm4H-cs{F3vvz1r+#uEeQ@+0+?P|T2Hb-W`Y=qRKx(wn4`g|Rv4eEhaTyq)<1Pq%Rp3kinJV$L4^BxKj> zRYoJxw%)Bm)-B1al|$Dp_tf<3Z!(C?JiR&_)r2FX@h*g;KPa-M2eff&cqJW zdge)=9^kU%YZ-fHV4y!avRJoIE_`XA353J5%7#BiB{fuk0Ekqszw0e+?Ng{gMtlp( znF$9tnVM>`4b_G%7}La{p%eFoq`@ChDhnidgHiZJ>Wyv9FDP~I1=+;ocM2K6JkjSc zpVfj-B>qu%u@7mr3vGw^%@#2=KAHi7Gj-C){Ahwva|#kq-#YJV?f-ZeFFI;JOhOZ@&sL&}@>_ zkUQbN#OlnuO%ykEY8h@zb(K5oh40p#QMohU=UiG-8J^@gzt)*-a7hd<-o+We$-e2} z%xx0m*^6V5Yd^0Y>LGd`HowTta>Fdzj;kJ&#cnYi^Oelb$KH%(M+LQQlabUvh(xQTIb4%|o#wW<}6UifBvFEH&r zzJiC1ZCAeW7-iptcta5*#=f6WL#miHgraYUka>J%}9O$d`k7 z29ZpB^lZt08^*{v5?x1cr{Bmiu+RY!LNe|c%jeuO0* z4*20pxn%L8lUOlXKl)4wi}??2%aFIs1h8U`j?X?_??~=w5gJwMJm^$c5Nf*_c2jyB zxFiE<(#Q;Ke+n6+<;O2TFc*_0o_vakfQW#Mu!_U2!)W1L(YZXlWNR{YoyXV_>vDPG z)oCE^jU@nYp5J9uxBTo6nXKQmYyn77Ox{om;-wvazk0r%5~PCyNI)95*mThV#sG&i zef)c}M=e)=px~lg)Gl`4pC^jU&({lT`xW&l#OrxyZ!fio>XYf+ZMc>3EIJ>r=gG~W zqDKzPKuC+~&DGVZ(8VlF$ShQWHiy~`6j$f`+g7RTrDv6kUK=dRrpqtibasL?D=1@O zdm9tul?v;EOzTw*N5@HiVLFSnz8c3|SiQQf-B$cL*@xP=^X~%{2W0%-9$mS3rOis0 z`KSqNMYv@$bMO)1M`_w$9O`5(Q97v;A<_{ax-pq5W}0ZOAYlzLs;2h9ZLmM~R?5;I zm&hmPnYeq@Xyo*|hyFoE=xr*%;4~WeYm;!T^3Ne5;OUd=z^gtI0Ad z+ED^5TuZO_0jnreL?_Vl zHeU`3wA6$qn6@>XJJ)Z=WUqMWNau(9=xN;WDRyjkY6ynAo4=#f!VGId^kX(NgOLo` z&PSt#ff$Q<;wVykm$-GT@_X@mdj%(2=S#8m^r(UJzmyYJaVj+5A)i;K-0gY+$9YIY zl|Jy;ho|Vm`msg}155sIHl3_F=D%Sfo$HAa&B<%-f8h?8P%7C})mS>Ri~SiR4=>Bo z5yxR*qN~Vjxct2g zRjJQsht<7St={2m&yVCm>V*R)4w$791PfeXpk62B<&SPBOM1}63IuoGjox=MZ%L`f z^t5e)!3ZVE;3Un_kxEV0Cp?G2Xe2n0gm|Y8Fy@}-_0Y`|`hL-d9!kidq-sL+(JzZP$2OZlK$uCMbT1>6BRX6DwUSD8 zr-+%@8wYN=uFG!?AElPtuXA~7+4UMwcO3#1dmfvFyMYg*AD>%YI;`m#)IglFkL%0> zp5?NX)c|O$T-NK>_S;y=^fblrkvXCHZUo*T=rrhPJ6$wN zBGd$h#moxHKn1st+%l2iq1edCNMt&?qq}(pDZUKfC z_a_^uEBSJUvK({^DfO=R+7eZ@XEIi!?~&U{7L6@rq{(qrhC2yOWgEjbB-^B&mU63k zR8lIQxJv6~Vt-Kl5;|eR#Un&5bDBVf%+*5KU-@Jjw1e5FIfB2c1AonLtX0xU%Z>4j zJf1qx={jJIh6}WWYv<8pokfqvs2wirBPG7gG0eM%Tc`W%htBb^qoy(`u~wT`(dG&; zwc?;@mpxyj{^eeYU~^jl8qxdoJxEfgIH&>^AFWc#ZM`RSTx~n(RMj#vn$*;@EAbGK z?rDw}2cE8_mNj<7ig$62!Kp&65=wWTTlH;-lb43_D;r!Nn^F>6WpQZw*m8}WetTJW zT^7Y7`yFIt!H6m&-w8nRb7IeDF#iB}=1Nkl-959%)buPNXTZ#ZEtKJY48c*v?cwj{ z{G;MIh~pG0+0WjMQk@Gnd~`6UnBvU76KUC;23;fyguoDkdlD}(s0@8#1IOC#@=5$Z zWrHEv6y;4cL%5ik1U8(Wlv?107qDppQr$wTsD8!JLxsMw(Q?8ViN%XT_0D64PfL6_ zAR&9=x_}`oU+`*+bYpi2T$~CUHIE$)96)C zT$}P982X8~l;TClS!T#iUVo{a)LkxmXdL4b&?5AwDI*g_8%1v;6&H{q6se>EB-)zU z?1n5KirP}-6C?B9pGY;A!zwkeJ2V&n{{@6s9O=o*KvNRhNVL zv{=Xfg)ToTr^3d3>ea0AN`LgXL-hI%g++9&*?6TM8-6jhX?u;iwi+zQYKPq=P zbygjd_QCZ?B?Q`=mM2?h<@AWfR(y;U(a*_oAfACD@fc*=dl^+JPdu04YC;;zB|guX z@Qht42+c-F0p0k+HWZ)_oipGOGdZ^uad7 zsV`z_e^gZ5MkIV{OTi0T<&YMr0LByI#kZ*RWr%KMy$nVXu5aE`SI*I{_P)7;CjFY_ z=cCF3{Z;SdId&sQp0F}eaN}0GL?CtJRVmbM-lTFAi8IwG=jN~DT->?&@Jc%w`6OL- z6c6bplX^;XUr@-Y{~SZ!0)o_MJ|(sW2{mf2AnKC(JvS2cjVM9#-IQ1%LL6>WeL$WM z8ovkV-j5E`q|Do2#z)q$M$HNG2v3xbGd~Pb7;Q0|`WZD-7NobH463Ll%Id!M&NZV& zBsP@Fx)rIPryKcH?(rZ}a^h}VPw^(i!P2$C^8B0Sz7jbU zEXx#}jX9b3r&ej}scH_ENwb5jL@EM%GkGXI+?)6%0`kSven{nFCvS08Gvlab<&XhN zzcV3*5~{mtS;fP0i_uAXQ+zDR+RN>dh!1lnXD)mV4VTQg1k|uEl2nh$XSM}TSY5^gNvmdl+()D)e7)$Z`^-2upn5F2k^K4 zpX)#R`~rMn0QbKmA}i_{~G=_zX|*%@SDJI0>26TCh(iUZvwvw{3h_5z;6P- z3H&DT{}_QkzWx7DhB#fIo|gXyV*i^a1m@=Ewtq)V|DWqa*PlGhDIAX` zj;M4zNgDHFI>@;lr720r@d(POQ!0Ks1c9JEqlY0WMj`lX=A$CrpC`WpjVH2hUr;_h z+00&(_%nZO9PFSTp8t-g^&fovT~qv(LI0RP0^DH!-}&<&X#MXQApCXNf7F?+Iq-*{ zLm6c4)jUu{ehf3W`r4FBZ+f2hMGTo05>h3*N(89FN5 zE(*t6q)YYnsShB4hM4*F;k)&0p1(Pg{LA0p?O((Hl=-9n+%0WA{`|eb>FH|ypE~Pb achFz;|Ev7}x&C?hf7k!Nj`aWfO8*U-qmNYp literal 0 HcmV?d00001 diff --git a/tests/texture/bootstrap/rafs-v6-2.2.boot b/tests/texture/bootstrap/rafs-v6-2.2.boot new file mode 100644 index 0000000000000000000000000000000000000000..71932987aa8d0ffb6a5d5be97553681a9c75cfeb GIT binary patch literal 20480 zcmeI(J!lj`7=YnfjYTlUnZh4T{6Ph)W6tblcO_tqy$FIR7RTM~EL;$co`niRLJSBt zLSkXmV4(ydh*m-nYz-K-*28EjgAs!uVk6Gn&O(qW0wPF!x7csyo4L7}XPX^vP6#dp z5cppO=0ALUZ`a=2CI-b?2SlsRt#RaCskUa)6YE`#j@+1MlQ|okbzSLA>vwH>wT6lc z;vi6}5)`5&kYVTt#V{_E%b}DimPtiMaUgYFRsHTL>h@I>7yYh%#YD!%L?!0gtoY%q zRz%i?00IagfB*srAh2SAKYjm9zbMyy1A&nI5j z$G;qY{H(QQY3^;u$XS1JvM{EmAKrLy>&n!-^i$jsKmY**5I_I{1Q0*~0R#|0009IL zKmY**5I_I{1Q0*~0R#|0z$LIwCqEw);*f33e#3Y5b)5kov>l~B0g#mox;$d@UBVMr zZ8_5Ad_I4wR>_~M*Ds!~mTG5o$+o(KIp2D3NQhm1Y5QIsmd~Ty^R;SPj@`bn$4k5X zc6Vp9PTBa!{WN37Uv7|%-!paDIliaYWDL0wKmY**5I_I{1Q0*~0R#~Emj#A)tEV#` zheuBgZ)lWe_D?UqT-tnc_U4a)&P|`@#v2FB+|3SpLjR%H?2#Mob~Q(Av?1GNf1j_H zI{Fu87q%TezwyzXYfrYy{=UvFFLLeow~tQ#teNhnuNf!X)7v2k2q1s}0tg_000Iag NfB*srAn=z2egROVdZhpW literal 0 HcmV?d00001 From 1e9b2f39959981a047a6fd01ed0f8ecc4f8beae3 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Tue, 21 Feb 2023 15:20:22 +0800 Subject: [PATCH 5/7] service: add nbd service to export RAFSv6 images as block devices Implement NbdService which cooperates with the Linux nbd driver to expose RAFSv6 images as block devices. To simplify the implementation, the NbdService will directly talk with the nbd driver, instead of following a typical nbd-server and nbd-client architecture. Signed-off-by: Jiang Liu --- Cargo.lock | 1 + service/Cargo.toml | 2 + service/src/block_nbd.rs | 350 +++++++++++++++++++++++++++++++++++++++ service/src/lib.rs | 2 + 4 files changed, 355 insertions(+) create mode 100644 service/src/block_nbd.rs diff --git a/Cargo.lock b/Cargo.lock index 5c23bb348b3..6d48739a20f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1263,6 +1263,7 @@ dependencies = [ name = "nydus-service" version = "0.2.0" dependencies = [ + "bytes", "dbs-allocator", "fuse-backend-rs", "libc", diff --git a/service/Cargo.toml b/service/Cargo.toml index 54df0f7337b..22bff2906c1 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" resolver = "2" [dependencies] +bytes = { version = "1", optional = true } dbs-allocator = { version = "0.1.1", optional = true } fuse-backend-rs = "0.10.1" libc = "0.2" @@ -51,3 +52,4 @@ virtiofs = [ ] block-device = [ "dbs-allocator", "tokio/fs"] +block-nbd = ["block-device", "bytes"] diff --git a/service/src/block_nbd.rs b/service/src/block_nbd.rs new file mode 100644 index 00000000000..64f264123c8 --- /dev/null +++ b/service/src/block_nbd.rs @@ -0,0 +1,350 @@ +// Copyright (C) 2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: (Apache-2.0) + +//! Export a RAFSv6 image as a block device through NBD(Network Block Device) protocol. +//! +//! The [Network Block Device](https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md) +//! is a Linux-originated lightweight block access protocol that allows one to export a block device +//! to a client. RAFSv6 images have an block address based encoding, so an RAFSv6 image can be +//! exposed as a block device. The [NbdService] exposes a RAFSv6 image as a block device based on +//! the Linux Network Block Device driver. + +use std::fs::{self, OpenOptions}; +use std::io::Result; +use std::os::fd::{AsRawFd, FromRawFd, RawFd}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use bytes::{Buf, BufMut}; +use nydus_rafs::metadata::layout::v6::{EROFS_BLOCK_BITS, EROFS_BLOCK_SIZE}; +use nydus_storage::utils::alloc_buf; +use tokio::sync::broadcast::{channel, Sender}; +use tokio_uring::buf::IoBuf; +use tokio_uring::net::UnixStream; + +use crate::blob_cache::BlobCacheMgr; +use crate::block_device::BlockDevice; + +const NBD_SET_SOCK: u32 = 0; +const NBD_SET_BLOCK_SIZE: u32 = 1; +const NBD_DO_IT: u32 = 3; +const NBD_CLEAR_SOCK: u32 = 4; +const NBD_SET_BLOCKS: u32 = 7; +//const NBD_DISCONNECT: u32 = 8; +const NBD_SET_TIMEOUT: u32 = 9; +const NBD_SET_FLAGS: u32 = 10; +const NBD_FLAG_HAS_FLAGS: u32 = 0x1; +const NBD_FLAG_READ_ONLY: u32 = 0x2; +const NBD_FLAG_CAN_MULTI_CONN: u32 = 0x100; +const NBD_CMD_READ: u32 = 0; +const NBD_CMD_DISC: u32 = 2; +const NBD_REQUEST_HEADER_SIZE: usize = 28; +const NBD_REQUEST_MAGIC: u32 = 0x25609513; +const NBD_REPLY_MAGIC: u32 = 0x67446698; +const NBD_OK: u32 = 0; +const NBD_EIO: u32 = 5; +const NBD_EINVAL: u32 = 22; + +fn nbd_ioctl(fd: RawFd, cmd: u32, arg: u64) -> nix::Result { + let code = nix::request_code_none!(0xab, cmd); + unsafe { nix::convert_ioctl_res!(libc::ioctl(fd, code, arg)) } +} + +/// Network Block Device server to expose RAFSv6 images as block devices. +pub struct NbdService { + active: Arc, + blob_id: String, + cache_mgr: Arc, + nbd_dev: fs::File, + sender: Arc>, +} + +impl NbdService { + /// Create a new instance of [NbdService] to expose a RAFSv6 image as a block device. + /// + /// It opens the NBD device at `nbd_path` and initialize it according to information from + /// the block device composed from a RAFSv6 image. The caller needs to ensure that the NBD + /// device is available. + pub fn new(device: Arc, nbd_path: String) -> Result { + // Initialize the NBD device: set block size, block count and flags. + let nbd_dev = OpenOptions::new() + .read(true) + .write(true) + .open(&nbd_path) + .map_err(|e| { + error!("block_nbd: failed to open NBD device {}", nbd_path); + e + })?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_BLOCK_SIZE, EROFS_BLOCK_SIZE)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_BLOCKS, device.blocks() as u64)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_TIMEOUT, 60)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0)?; + nbd_ioctl( + nbd_dev.as_raw_fd(), + NBD_SET_FLAGS, + (NBD_FLAG_HAS_FLAGS | NBD_FLAG_READ_ONLY | NBD_FLAG_CAN_MULTI_CONN) as u64, + )?; + + let (sender, _receiver) = channel(4); + + Ok(NbdService { + active: Arc::new(AtomicBool::new(true)), + blob_id: device.meta_blob_id().to_string(), + cache_mgr: device.cache_mgr().clone(), + nbd_dev, + sender: Arc::new(sender), + }) + } + + /// Create a [NbdWoker] to run the event loop to handle NBD requests from kernel. + pub fn create_worker(&self) -> Result { + // Let the NBD driver go. + let (sock1, sock2) = std::os::unix::net::UnixStream::pair()?; + nbd_ioctl( + self.nbd_dev.as_raw_fd(), + NBD_SET_SOCK, + sock1.as_raw_fd() as u64, + )?; + + Ok(NbdWorker { + active: self.active.clone(), + blob_id: self.blob_id.clone(), + cache_mgr: self.cache_mgr.clone(), + _sock_kern: sock1, + sock_user: sock2, + sender: self.sender.clone(), + }) + } + + /// Run the event loop to handle incoming NBD requests. + /// + /// The caller will get blocked until the NBD device get destroyed or `NbdService::stop()` get + /// called. + pub fn run(&self) -> Result<()> { + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_DO_IT, 0); + self.active.store(false, Ordering::Release); + let _ = self.sender.send(1); + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0); + + Ok(()) + } + + /// Shutdown the NBD session and send exit notification to workers. + pub fn stop(&self) { + self.active.store(false, Ordering::Release); + let _ = self.sender.send(0); + //let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_DISCONNECT, 0); + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0); + } +} + +/// A worker to handle NBD requests in asynchronous mode. +pub struct NbdWorker { + active: Arc, + blob_id: String, + cache_mgr: Arc, + _sock_kern: std::os::unix::net::UnixStream, + sock_user: std::os::unix::net::UnixStream, + sender: Arc>, +} + +impl NbdWorker { + /// Run the event loop to handle NBD requests from kernel in asynchronous mode. + pub async fn run(self) { + let device = match BlockDevice::new(self.blob_id.clone(), self.cache_mgr.clone()) { + Ok(v) => v, + Err(e) => { + error!( + "block_nbd: failed to create block device for {}, {}", + self.blob_id, e + ); + return; + } + }; + + // Safe because the RawFd is valid during the lifetime of run(). + let mut sock = unsafe { UnixStream::from_raw_fd(self.sock_user.as_raw_fd()) }; + let mut receiver = self.sender.subscribe(); + let mut buf = vec![0u8; NBD_REQUEST_HEADER_SIZE]; + let mut pos = 0; + + while self.active.load(Ordering::Acquire) { + tokio::select! { + (res, s) = sock.read(buf.slice(pos..)) => { + match res { + Err(e) => { + warn!("block_nbd: failed to get request from kernel for {}, {}", self.blob_id, e); + break; + } + Ok(sz) => { + buf = s.into_inner(); + pos += sz; + if pos == NBD_REQUEST_HEADER_SIZE { + match self.handle_request(&buf, &mut sock, &device).await { + Ok(true) => {} + Ok(false) => break, + Err(e) => { + warn!("block_nbd: failed to handle request for {}, {}", self.blob_id, e); + break; + } + } + pos = 0; + } + } + } + } + _ = receiver.recv() => { + break; + } + } + } + } + + async fn handle_request( + &self, + mut request: &[u8], + sock: &mut UnixStream, + device: &BlockDevice, + ) -> Result { + let magic = request.get_u32(); + let ty = request.get_u32(); + let handle = request.get_u64(); + let pos = request.get_u64(); + let len = request.get_u32(); + + let mut code = NBD_OK; + let mut data_buf = alloc_buf(len as usize); + if magic != NBD_REQUEST_MAGIC + || pos % EROFS_BLOCK_SIZE != 0 + || len as u64 % EROFS_BLOCK_SIZE != 0 + { + warn!( + "block_nbd: invalid request magic 0x{:x}, type {}, pos 0x{:x}, len 0x{:x}", + magic, ty, pos, len + ); + code = NBD_EINVAL; + } else if ty == NBD_CMD_READ { + let start = (pos >> EROFS_BLOCK_BITS) as u32; + let count = len >> EROFS_BLOCK_BITS; + let (res, buf) = device.async_read(start, count, data_buf).await; + data_buf = buf; + match res { + Ok(sz) => { + if sz != len as usize { + warn!("block_nbd: got 0x{:x} bytes, expect 0x{:x}", sz, len); + code = NBD_EIO; + } + } + Err(e) => { + warn!("block_nbd: failed to read data from block device, {}", e); + code = NBD_EIO; + } + } + } else if ty == NBD_CMD_DISC { + return Ok(false); + } + + let mut reply = Vec::with_capacity(16); + reply.put_u32(NBD_REPLY_MAGIC); + reply.put_u32(code); + reply.put_u64(handle); + assert_eq!(reply.len(), 16); + assert_eq!(data_buf.len(), len as usize); + sock.write_all(reply).await.0?; + if code == NBD_OK { + sock.write_all(data_buf).await.0?; + } + + Ok(true) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blob_cache::{generate_blob_key, BlobCacheMgr}; + use nydus_api::BlobCacheEntry; + use std::path::PathBuf; + use std::time::Duration; + use vmm_sys_util::tempdir::TempDir; + + fn create_block_device(tmpdir: PathBuf) -> Result> { + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + let mut dest_path = tmpdir.clone(); + dest_path.push("be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + fs::copy(&source_path, &dest_path).unwrap(); + + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + let config = r#" + { + "type": "bootstrap", + "id": "rafs-v6", + "domain_id": "domain2", + "config_v2": { + "version": 2, + "id": "factory1", + "backend": { + "type": "localfs", + "localfs": { + "dir": "/tmp/nydus" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "/tmp/nydus" + } + }, + "metadata_path": "RAFS_V5" + } + }"#; + let content = config + .replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap()) + .replace("RAFS_V5", &source_path.display().to_string()); + let mut entry: BlobCacheEntry = serde_json::from_str(&content).unwrap(); + assert!(entry.prepare_configuration_info()); + + let mgr = BlobCacheMgr::new(); + mgr.add_blob_entry(&entry).unwrap(); + let blob_id = generate_blob_key(&entry.domain_id, &entry.blob_id); + assert!(mgr.get_config(&blob_id).is_some()); + + // Check existence of data blob referenced by the bootstrap. + let key = generate_blob_key( + &entry.domain_id, + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", + ); + assert!(mgr.get_config(&key).is_some()); + + let mgr = Arc::new(mgr); + let device = BlockDevice::new(blob_id.clone(), mgr).unwrap(); + + Ok(Arc::new(device)) + } + + #[ignore] + #[test] + fn test_nbd_device() { + tokio_uring::start(async { + let tmpdir = TempDir::new().unwrap(); + let device = create_block_device(tmpdir.as_path().to_path_buf()).unwrap(); + let nbd = NbdService::new(device, "/dev/nbd15".to_string()).unwrap(); + let nbd = Arc::new(nbd); + let nbd2 = nbd.clone(); + let worker1 = nbd.create_worker().unwrap(); + let worker2 = nbd.create_worker().unwrap(); + + tokio_uring::spawn(async move { worker1.run().await }); + tokio_uring::spawn(async move { worker2.run().await }); + std::thread::spawn(move || { + nbd2.run().unwrap(); + }); + tokio::time::sleep(Duration::from_micros(100000)).await; + nbd.stop(); + }) + } +} diff --git a/service/src/lib.rs b/service/src/lib.rs index dfa1538b81f..ceed2de57b5 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -33,6 +33,8 @@ use serde_json::Error as SerdeError; pub mod blob_cache; #[cfg(feature = "block-device")] pub mod block_device; +#[cfg(feature = "block-nbd")] +pub mod block_nbd; pub mod daemon; #[cfg(target_os = "linux")] mod fs_cache; From eceeefd74c5a3ded9552ea8bbe97a95f27ef2994 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Sat, 25 Feb 2023 00:27:16 +0800 Subject: [PATCH 6/7] nydusd: add subcommand nbd to export nydus images as block devices Add subcommand nbd to export nydus images as block devices through NBD. Signed-off-by: Jiang Liu --- Cargo.toml | 3 + service/src/block_nbd.rs | 281 ++++++++++++++++++++++++++++++++++++++- service/src/daemon.rs | 4 +- src/bin/nydusd/main.rs | 133 ++++++++++++++++++ 4 files changed, 416 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a1351089858..96b774ae3b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,9 @@ virtiofs = [ "virtio-queue", "vm-memory", ] +block-nbd = [ + "nydus-service/block-nbd" +] backend-http-proxy = ["nydus-storage/backend-http-proxy"] backend-localdisk = ["nydus-storage/backend-localdisk"] diff --git a/service/src/block_nbd.rs b/service/src/block_nbd.rs index 64f264123c8..63ca6f7709a 100644 --- a/service/src/block_nbd.rs +++ b/service/src/block_nbd.rs @@ -10,21 +10,30 @@ //! exposed as a block device. The [NbdService] exposes a RAFSv6 image as a block device based on //! the Linux Network Block Device driver. +use std::any::Any; use std::fs::{self, OpenOptions}; -use std::io::Result; +use std::io::{Error, Result}; use std::os::fd::{AsRawFd, FromRawFd, RawFd}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; use bytes::{Buf, BufMut}; +use mio::Waker; +use nydus_api::{BlobCacheEntry, BuildTimeInfo}; use nydus_rafs::metadata::layout::v6::{EROFS_BLOCK_BITS, EROFS_BLOCK_SIZE}; use nydus_storage::utils::alloc_buf; use tokio::sync::broadcast::{channel, Sender}; use tokio_uring::buf::IoBuf; use tokio_uring::net::UnixStream; -use crate::blob_cache::BlobCacheMgr; +use crate::blob_cache::{generate_blob_key, BlobCacheMgr}; use crate::block_device::BlockDevice; +use crate::daemon::{ + DaemonState, DaemonStateMachineContext, DaemonStateMachineInput, DaemonStateMachineSubscriber, + NydusDaemon, +}; +use crate::{Error as NydusError, Result as NydusResult}; const NBD_SET_SOCK: u32 = 0; const NBD_SET_BLOCK_SIZE: u32 = 1; @@ -260,6 +269,270 @@ impl NbdWorker { } } +/// A [NydusDaemon] implementation to expose RAFS v6 images as block devices through NBD. +pub struct NbdDaemon { + cache_mgr: Arc, + service: Arc, + + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + + nbd_threads: u32, + nbd_control_thread: Mutex>>, + nbd_service_threads: Mutex>>>, + request_sender: Arc>>, + result_receiver: Mutex>>, + state: AtomicI32, + state_machine_thread: Mutex>>>, + waker: Arc, +} + +impl NbdDaemon { + fn new( + nbd_path: String, + threads: u32, + blob_entry: BlobCacheEntry, + trigger: std::sync::mpsc::Sender, + receiver: std::sync::mpsc::Receiver>, + waker: Arc, + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + ) -> Result { + let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id); + let cache_mgr = Arc::new(BlobCacheMgr::new()); + cache_mgr.add_blob_entry(&blob_entry)?; + let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone())?; + let nbd_service = NbdService::new(Arc::new(block_device), nbd_path)?; + + Ok(NbdDaemon { + cache_mgr, + service: Arc::new(nbd_service), + + bti, + id, + supervisor, + + nbd_threads: threads, + nbd_control_thread: Mutex::new(None), + nbd_service_threads: Mutex::new(Vec::new()), + state: AtomicI32::new(DaemonState::INIT as i32), + request_sender: Arc::new(Mutex::new(trigger)), + result_receiver: Mutex::new(receiver), + state_machine_thread: Mutex::new(None), + waker, + }) + } +} + +impl DaemonStateMachineSubscriber for NbdDaemon { + fn on_event(&self, event: DaemonStateMachineInput) -> NydusResult<()> { + self.request_sender + .lock() + .expect("block_nbd: failed to lock request sender!") + .send(event) + .map_err(NydusError::ChannelSend)?; + + self.result_receiver + .lock() + .expect("block_nbd: failed to lock result receiver!") + .recv() + .map_err(NydusError::ChannelReceive)? + } +} + +impl NydusDaemon for NbdDaemon { + fn as_any(&self) -> &dyn Any { + self + } + + fn id(&self) -> Option { + self.id.clone() + } + + fn version(&self) -> BuildTimeInfo { + self.bti.clone() + } + + fn get_state(&self) -> DaemonState { + self.state.load(Ordering::Relaxed).into() + } + + fn set_state(&self, state: DaemonState) { + self.state.store(state as i32, Ordering::Relaxed); + } + + fn start(&self) -> NydusResult<()> { + info!("start NBD service with {} worker threads", self.nbd_threads); + for _ in 0..self.nbd_threads { + let waker = self.waker.clone(); + let worker = self + .service + .create_worker() + .map_err(|e| NydusError::StartService(format!("{}", e)))?; + let thread = std::thread::Builder::new() + .name("nbd_worker".to_string()) + .spawn(move || { + tokio_uring::start(async move { + worker.run().await; + // Notify the daemon controller that one working thread has exited. + if let Err(err) = waker.wake() { + error!("block_nbd: fail to exit daemon, error: {:?}", err); + } + }); + Ok(()) + }) + .map_err(NydusError::ThreadSpawn)?; + self.nbd_service_threads.lock().unwrap().push(thread); + } + + let nbd = self.service.clone(); + let thread = std::thread::spawn(move || { + if let Err(e) = nbd.run() { + error!("block_nbd: failed to run NBD control loop, {e}"); + } + }); + *self.nbd_control_thread.lock().unwrap() = Some(thread); + + Ok(()) + } + + fn umount(&self) -> NydusResult<()> { + Ok(()) + } + + fn stop(&self) { + self.service.stop(); + } + + fn wait(&self) -> NydusResult<()> { + self.wait_state_machine()?; + self.wait_service() + } + + fn wait_service(&self) -> NydusResult<()> { + loop { + let handle = self.nbd_service_threads.lock().unwrap().pop(); + if let Some(handle) = handle { + handle + .join() + .map_err(|e| { + let e = *e + .downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))); + NydusError::WaitDaemon(e) + })? + .map_err(NydusError::WaitDaemon)?; + } else { + // No more handles to wait + break; + } + } + + Ok(()) + } + + fn wait_state_machine(&self) -> NydusResult<()> { + let mut guard = self.state_machine_thread.lock().unwrap(); + if let Some(handler) = guard.take() { + let result = handler.join().map_err(|e| { + let e = *e + .downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))); + NydusError::WaitDaemon(e) + })?; + result.map_err(NydusError::WaitDaemon) + } else { + Ok(()) + } + } + + fn supervisor(&self) -> Option { + self.supervisor.clone() + } + + fn save(&self) -> NydusResult<()> { + unimplemented!() + } + + fn restore(&self) -> NydusResult<()> { + unimplemented!() + } + + fn get_blob_cache_mgr(&self) -> Option> { + Some(self.cache_mgr.clone()) + } +} + +/// Create and start a [NbdDaemon] instance to expose a RAFS v6 image as a block device through NBD. +#[allow(clippy::too_many_arguments)] +pub fn create_nbd_daemon( + device: String, + threads: u32, + blob_entry: BlobCacheEntry, + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + waker: Arc, +) -> Result> { + let (trigger, events_rx) = std::sync::mpsc::channel::(); + let (result_sender, result_receiver) = std::sync::mpsc::channel::>(); + let daemon = NbdDaemon::new( + device, + threads, + blob_entry, + trigger, + result_receiver, + waker, + bti, + id, + supervisor, + )?; + let daemon = Arc::new(daemon); + let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender); + let machine_thread = machine.kick_state_machine()?; + *daemon.state_machine_thread.lock().unwrap() = Some(machine_thread); + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + + /* + // TODO: support crash recover and hot-upgrade. + // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper + // finding a victim is not necessary. + if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(&mnt, api_sock.as_ref().unwrap())?) + || api_sock.is_none() + { + if let Some(cmd) = mount_cmd { + daemon.service.mount(cmd)?; + } + daemon + .service + .session + .lock() + .unwrap() + .mount() + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + daemon + .service + .conn + .store(calc_fuse_conn(mnt)?, Ordering::Relaxed); + } + */ + + Ok(daemon) +} + #[cfg(test)] mod tests { use super::*; diff --git a/service/src/daemon.rs b/service/src/daemon.rs index 26d11bb6338..c4149f2affb 100644 --- a/service/src/daemon.rs +++ b/service/src/daemon.rs @@ -173,7 +173,9 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { // For backward compatibility. /// Set default filesystem service object. - fn get_default_fs_service(&self) -> Option>; + fn get_default_fs_service(&self) -> Option> { + None + } /// Get the optional `BlobCacheMgr` object. fn get_blob_cache_mgr(&self) -> Option> { diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 6a8eeab6988..323a2e2b297 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -285,6 +285,8 @@ fn prepare_commandline_options() -> Command { let cmdline = append_fuse_subcmd_options(cmdline); #[cfg(feature = "virtiofs")] let cmdline = append_virtiofs_subcmd_options(cmdline); + #[cfg(feature = "block-nbd")] + let cmdline = self::nbd::append_nbd_subcmd_options(cmdline); append_singleton_subcmd_options(cmdline) } @@ -590,6 +592,130 @@ fn process_singleton_arguments( Ok(()) } +#[cfg(feature = "block-nbd")] +mod nbd { + use super::*; + use nydus_api::BlobCacheEntry; + use nydus_service::block_nbd::create_nbd_daemon; + + pub(super) fn append_nbd_subcmd_options(cmd: Command) -> Command { + let subcmd = Command::new("nbd") + .about("Export a RAFS v6 image as a block device through NBD (Experiment)"); + let subcmd = subcmd + .arg( + Arg::new("DEVICE") + .help("NBD device node to attach the block device") + .required(true) + .num_args(1), + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .short('B') + .help("Path to the RAFS filesystem metadata file") + .requires("localfs-dir") + .conflicts_with("config"), + ) + .arg( + Arg::new("localfs-dir") + .long("localfs-dir") + .requires("bootstrap") + .short('D') + .help( + "Path to the `localfs` working directory, which also enables the `localfs` storage backend" + ) + .conflicts_with("config"), + ) + .arg( + Arg::new("threads") + .long("threads") + .default_value("4") + .help("Number of worker threads to serve NBD requests") + .value_parser(thread_validator) + .required(false), + ); + cmd.subcommand(subcmd) + } + + pub(super) fn process_nbd_service( + args: SubCmdArgs, + bti: BuildTimeInfo, + _apisock: Option<&str>, + ) -> Result<()> { + let content = if let Some(bootstrap) = args.value_of("bootstrap") { + let dir = args.value_of("localfs-dir").ok_or_else(|| { + einval!("option `-D/--localfs-dir` is required by `--boootstrap`") + })?; + let config = r#" + { + "type": "bootstrap", + "id": "disk-default", + "domain_id": "block-nbd", + "config_v2": { + "version": 2, + "id": "block-nbd-factory", + "backend": { + "type": "localfs", + "localfs": { + "dir": "LOCAL_FS_DIR" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "LOCAL_FS_DIR" + } + }, + "metadata_path": "META_FILE_PATH" + } + }"#; + config + .replace("LOCAL_FS_DIR", dir) + .replace("META_FILE_PATH", bootstrap) + } else if let Some(v) = args.value_of("config") { + std::fs::read_to_string(v)? + } else { + return Err(einval!( + "both option `-C/--config` and `-B/--bootstrap` are missing" + )); + }; + let mut config: BlobCacheEntry = serde_json::from_str(&content).unwrap(); + if !config.prepare_configuration_info() { + return Err(einval!(format!("invalid configuration {}", content))); + } + + // Safe to unwrap because `DEVICE` is mandatory option. + let device = args.value_of("DEVICE").unwrap().to_string(); + let id = args.value_of("id").map(|id| id.to_string()); + let supervisor = args.value_of("supervisor").map(|s| s.to_string()); + let threads: u32 = args + .value_of("threads") + .map(|n| n.parse().unwrap_or(1)) + .unwrap_or(1); + + let daemon = create_nbd_daemon( + device, + threads, + config, + bti, + id, + supervisor, + DAEMON_CONTROLLER.alloc_waker(), + ) + .map(|d| { + info!("NBD daemon started!"); + d + }) + .map_err(|e| { + error!("Failed in starting NBD daemon: {}", e); + e + })?; + DAEMON_CONTROLLER.set_daemon(daemon); + + Ok(()) + } +} + extern "C" fn sig_exit(_sig: std::os::raw::c_int) { DAEMON_CONTROLLER.shutdown(); } @@ -640,6 +766,13 @@ fn main() -> Result<()> { let subargs = SubCmdArgs::new(&args, subargs); process_fs_service(subargs, bti, apisock, false)?; } + #[cfg(feature = "block-nbd")] + Some("nbd") => { + // Safe to unwrap because the subcommand is `nbd`. + let subargs = args.subcommand_matches("nbd").unwrap(); + let subargs = SubCmdArgs::new(&args, subargs); + self::nbd::process_nbd_service(subargs, bti, apisock)?; + } _ => { let subargs = SubCmdArgs::new(&args, &args); process_fs_service(subargs, bti, apisock, true)?; From f9b051ed400be77a396745dea00cca08aa0b5893 Mon Sep 17 00:00:00 2001 From: Jiang Liu Date: Mon, 27 Feb 2023 10:06:24 +0800 Subject: [PATCH 7/7] api: add method to load BlobCacheConfigV2 from file Add method to load BlobCacheConfigV2 from configuration file. Signed-off-by: Jiang Liu --- api/src/config.rs | 145 ++++++++++++++++++++++++++++- api/src/http.rs | 53 +---------- docs/samples/blob_cache_entry.toml | 145 +++++++++++++++++++++++++++++ service/src/blob_cache.rs | 9 +- service/src/singleton.rs | 2 +- src/bin/nydusd/main.rs | 24 +++-- 6 files changed, 313 insertions(+), 65 deletions(-) create mode 100644 docs/samples/blob_cache_entry.toml diff --git a/api/src/config.rs b/api/src/config.rs index 6c65fcf317e..e47591bc192 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -890,11 +890,54 @@ pub struct BlobCacheEntryConfigV2 { /// Configuration information for local cache system. #[serde(default)] pub cache: CacheConfigV2, - /// Optional file path for metadata blobs. + /// Optional file path for metadata blob. #[serde(default)] pub metadata_path: Option, } +impl BlobCacheEntryConfigV2 { + /// Read configuration information from a file. + pub fn from_file>(path: P) -> Result { + let md = fs::metadata(path.as_ref())?; + if md.len() > 0x100000 { + return Err(eother!("configuration file size is too big")); + } + let content = fs::read_to_string(path)?; + Self::from_str(&content) + } + + /// Validate the configuration object. + pub fn validate(&self) -> bool { + if self.version != 2 { + return false; + } + let config: ConfigV2 = self.into(); + config.validate() + } +} + +impl FromStr for BlobCacheEntryConfigV2 { + type Err = Error; + + fn from_str(s: &str) -> Result { + if let Ok(v) = serde_json::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + if let Ok(v) = toml::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + Err(einval!("failed to parse configuration information")) + } +} + impl From<&BlobCacheEntryConfigV2> for ConfigV2 { fn from(c: &BlobCacheEntryConfigV2) -> Self { ConfigV2 { @@ -943,6 +986,106 @@ impl ConfigV2Internal { } } +/// Blob cache object type for nydus/rafs bootstrap blob. +pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; +/// Blob cache object type for nydus/rafs data blob. +pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; + +/// Configuration information for a cached blob. +#[derive(Debug, Deserialize, Serialize)] +pub struct BlobCacheEntry { + /// Type of blob object, bootstrap or data blob. + #[serde(rename = "type")] + pub blob_type: String, + /// Blob id. + #[serde(rename = "id")] + pub blob_id: String, + /// Configuration information to generate blob cache object. + #[serde(default, rename = "config")] + pub(crate) blob_config_legacy: Option, + /// Configuration information to generate blob cache object. + #[serde(default, rename = "config_v2")] + pub blob_config: Option, + /// Domain id for the blob, which is used to group cached blobs into management domains. + #[serde(default)] + pub domain_id: String, +} + +impl BlobCacheEntry { + pub fn prepare_configuration_info(&mut self) -> bool { + if self.blob_config.is_none() { + if let Some(legacy) = self.blob_config_legacy.as_ref() { + match legacy.try_into() { + Err(_) => return false, + Ok(v) => self.blob_config = Some(v), + } + } + } + + match self.blob_config.as_ref() { + None => false, + Some(cfg) => cfg.cache.validate() && cfg.backend.validate(), + } + } +} + +impl BlobCacheEntry { + /// Read configuration information from a file. + pub fn from_file>(path: P) -> Result { + let md = fs::metadata(path.as_ref())?; + if md.len() > 0x100000 { + return Err(eother!("configuration file size is too big")); + } + let content = fs::read_to_string(path)?; + Self::from_str(&content) + } + + /// Validate the configuration object. + pub fn validate(&self) -> bool { + if self.blob_type != BLOB_CACHE_TYPE_META_BLOB + && self.blob_type != BLOB_CACHE_TYPE_DATA_BLOB + { + warn!("invalid blob type {} for blob cache entry", self.blob_type); + return false; + } + if let Some(config) = self.blob_config.as_ref() { + if !config.validate() { + return false; + } + } + true + } +} + +impl FromStr for BlobCacheEntry { + type Err = Error; + + fn from_str(s: &str) -> Result { + if let Ok(v) = serde_json::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + if let Ok(v) = toml::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + Err(einval!("failed to parse configuration information")) + } +} + +/// Configuration information for a list of cached blob objects. +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct BlobCacheList { + /// List of blob configuration information. + pub blobs: Vec, +} + fn default_true() -> bool { true } diff --git a/api/src/http.rs b/api/src/http.rs index ef6ebdbb8a7..92c342ce6e3 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -4,7 +4,6 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::convert::TryInto; use std::io; use std::sync::mpsc::{RecvError, SendError}; @@ -12,7 +11,7 @@ use nydus_error::error::MetricsError; use serde::Deserialize; use serde_json::Error as SerdeError; -use crate::{BlobCacheEntryConfig, BlobCacheEntryConfigV2}; +use crate::BlobCacheEntry; /// Mount a filesystem. #[derive(Clone, Deserialize, Debug)] @@ -43,56 +42,6 @@ pub struct DaemonConf { pub log_level: String, } -/// Blob cache object type for nydus/rafs bootstrap blob. -pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; -/// Blob cache object type for nydus/rafs data blob. -pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; - -/// Configuration information for a cached blob. -#[derive(Debug, Deserialize, Serialize)] -pub struct BlobCacheEntry { - /// Type of blob object, bootstrap or data blob. - #[serde(rename = "type")] - pub blob_type: String, - /// Blob id. - #[serde(rename = "id")] - pub blob_id: String, - /// Configuration information to generate blob cache object. - #[serde(default, rename = "config")] - pub(crate) blob_config_legacy: Option, - /// Configuration information to generate blob cache object. - #[serde(default, rename = "config_v2")] - pub blob_config: Option, - /// Domain id for the blob, which is used to group cached blobs into management domains. - #[serde(default)] - pub domain_id: String, -} - -impl BlobCacheEntry { - pub fn prepare_configuration_info(&mut self) -> bool { - if self.blob_config.is_none() { - if let Some(legacy) = self.blob_config_legacy.as_ref() { - match legacy.try_into() { - Err(_) => return false, - Ok(v) => self.blob_config = Some(v), - } - } - } - - match self.blob_config.as_ref() { - None => false, - Some(cfg) => cfg.cache.validate() && cfg.backend.validate(), - } - } -} - -/// Configuration information for a list of cached blob objects. -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct BlobCacheList { - /// List of blob configuration information. - pub blobs: Vec, -} - /// Identifier for cached blob objects. /// /// Domains are used to control the blob sharing scope. All blobs associated with the same domain diff --git a/docs/samples/blob_cache_entry.toml b/docs/samples/blob_cache_entry.toml new file mode 100644 index 00000000000..fea1c445fa8 --- /dev/null +++ b/docs/samples/blob_cache_entry.toml @@ -0,0 +1,145 @@ +# Configuration file for Nydus Image Service + +type = "bootstrap" +id = "image1" +domain_id = "domain1" + +# Configuration file format version number, must be 2. +[config_v2] +version = 2 +# Identifier for the instance. +id = "my_id" +# Optional file path for metadata blobs, for BlobCacheEntry only. +metadata_path = "/path/to/rafs/meta/data/blob" + +[config_v2.backend] +# Type of storage backend, valid values: "localfs", "oss", "registry" +type = "localfs" + +[config_v2.backend.localfs] +blob_file = "/tmp/nydus.blob.data" +dir = "/tmp" +alt_dirs = ["/var/nydus/cache"] + +[config_v2.backend.oss] +# Oss http scheme, either 'http' or 'https' +scheme = "http" +# Oss endpoint +endpoint = "my_endpoint" +# Oss bucket name +bucket_name = "my_bucket_name" +# Prefix object_prefix to OSS object key, for example the simulation of subdirectory: +object_prefix = "my_object_prefix" +# Oss access key +access_key_id = "my_access_key_id" +# Oss secret +access_key_secret = "my_access_key_secret" +# Skip SSL certificate validation for HTTPS scheme. +skip_verify = true +# Drop the read request once http request timeout, in seconds. +timeout = 10 +# Drop the read request once http connection timeout, in seconds. +connect_timeout = 10 +# Retry count when read request failed. +retry_limit = 5 + +[config_v2.backend.oss.proxy] +# Access remote storage backend via proxy, e.g. Dragonfly dfdaemon server URL. +url = "localhost:6789" +# Proxy health checking endpoint. +ping_url = "localhost:6789/ping" +# Fallback to remote storage backend if proxy ping failed. +fallback = true +# Interval for proxy health checking, in seconds. +check_interval = 5 +# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy. +use_http = false + +[[config_v2.backend.oss.mirrors]] +# Mirror server URL, for example http://127.0.0.1:65001. +host = "http://127.0.0.1:65001" +# Ping URL to check mirror server health. +ping_url = "http://127.0.0.1:65001/ping" +# HTTP request headers to be passed to mirror server. +# headers = +# Whether the authorization process is through mirror, default to false. +auth_through = true +# Interval for mirror health checking, in seconds. +health_check_interval = 5 +# Maximum number of failures before marking a mirror as unusable. +failure_limit = 5 + +[config_v2.backend.registry] +# Registry http scheme, either 'http' or 'https' +scheme = "https" +# Registry url host +host = "my.registry.com" +# Registry image name, like 'library/ubuntu' +repo = "nydus" +# Base64_encoded(username:password), the field should be sent to registry auth server to get a bearer token. +auth = "base64_encoded" +# Skip SSL certificate validation for HTTPS scheme. +skip_verify = true +# Drop the read request once http request timeout, in seconds. +timeout = 10 +# Drop the read request once http connection timeout, in seconds. +connect_timeout = 10 +# Retry count when read request failed. +retry_limit = 5 +# The field is a bearer token to be sent to registry to authorize registry requests. +registry_token = "bear_token" +# The http scheme to access blobs. +# It is used to workaround some P2P subsystem that requires a different scheme than the registry. +blob_url_scheme = "https" +# Redirect blob access to a different host regardless of the one specified in 'host'. +blob_redirected_host = "redirect.registry.com" + +[config_v2.backend.registry.proxy] +# Access remote storage backend via proxy, e.g. Dragonfly dfdaemon server URL. +url = "localhost:6789" +# Proxy health checking endpoint. +ping_url = "localhost:6789/ping" +# Fallback to remote storage backend if proxy ping failed. +fallback = true +# Interval for proxy health checking, in seconds. +check_interval = 5 +# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy. +use_http = false + +[[config_v2.backend.registry.mirrors]] +# Mirror server URL, for example http://127.0.0.1:65001. +host = "http://127.0.0.1:65001" +# Ping URL to check mirror server health. +ping_url = "http://127.0.0.1:65001/ping" +# HTTP request headers to be passed to mirror server. +# headers = +# Whether the authorization process is through mirror, default to false. +auth_through = true +# Interval for mirror health checking, in seconds. +health_check_interval = 5 +# Maximum number of failures before marking a mirror as unusable. +failure_limit = 5 + +[config_v2.cache] +# Type of blob cache: "blobcache", "filecache", "fscache", "dummycache" or "" +type = "filecache" +# Whether to cache compressed or uncompressed data. +compressed = true +# Whether to validate data read from the cache. +validate = true + +[config_v2.cache.filecache] +work_dir = "." + +[config_v2.cache.fscache] +work_dir = "." + +[config_v2.cache.prefetch] +# Whether to enable blob data prefetching. +enable = true +# Number of data prefetching working threads, valid values: 1-1024. +threads = 8 +# The batch size to prefetch data from backend, valid values: 0-0x10000000. +batch_size = 1000000 +# Network bandwidth rate limit in unit of Bytes and Zero means no limit. +bandwidth_limit = 10000000 diff --git a/service/src/blob_cache.rs b/service/src/blob_cache.rs index e677a9ee468..ce8040ca93b 100644 --- a/service/src/blob_cache.rs +++ b/service/src/blob_cache.rs @@ -328,9 +328,12 @@ impl BlobCacheMgr { "blob_cache: `config.metadata_path` for meta blob is empty" )); } - let path = Path::new(&path) - .canonicalize() - .map_err(|_e| einval!("blob_cache: `config.metadata_path` for meta blob is invalid"))?; + let path = Path::new(&path).canonicalize().map_err(|_e| { + einval!(format!( + "blob_cache: `config.metadata_path={}` for meta blob is invalid", + path + )) + })?; if !path.is_file() { return Err(einval!( "blob_cache: `config.metadata_path` for meta blob is not a file" diff --git a/service/src/singleton.rs b/service/src/singleton.rs index 171552e1f6a..6cfa9c82112 100644 --- a/service/src/singleton.rs +++ b/service/src/singleton.rs @@ -10,7 +10,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use mio::Waker; -use nydus_api::http::BlobCacheList; +use nydus_api::config::BlobCacheList; use nydus_api::BuildTimeInfo; use crate::blob_cache::BlobCacheMgr; diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 323a2e2b297..ea454b187b9 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -597,6 +597,7 @@ mod nbd { use super::*; use nydus_api::BlobCacheEntry; use nydus_service::block_nbd::create_nbd_daemon; + use std::str::FromStr; pub(super) fn append_nbd_subcmd_options(cmd: Command) -> Command { let subcmd = Command::new("nbd") @@ -642,7 +643,7 @@ mod nbd { bti: BuildTimeInfo, _apisock: Option<&str>, ) -> Result<()> { - let content = if let Some(bootstrap) = args.value_of("bootstrap") { + let mut entry = if let Some(bootstrap) = args.value_of("bootstrap") { let dir = args.value_of("localfs-dir").ok_or_else(|| { einval!("option `-D/--localfs-dir` is required by `--boootstrap`") })?; @@ -669,19 +670,26 @@ mod nbd { "metadata_path": "META_FILE_PATH" } }"#; - config + let config = config .replace("LOCAL_FS_DIR", dir) - .replace("META_FILE_PATH", bootstrap) + .replace("META_FILE_PATH", bootstrap); + BlobCacheEntry::from_str(&config)? } else if let Some(v) = args.value_of("config") { - std::fs::read_to_string(v)? + BlobCacheEntry::from_file(v)? } else { return Err(einval!( "both option `-C/--config` and `-B/--bootstrap` are missing" )); }; - let mut config: BlobCacheEntry = serde_json::from_str(&content).unwrap(); - if !config.prepare_configuration_info() { - return Err(einval!(format!("invalid configuration {}", content))); + if !entry.prepare_configuration_info() { + return Err(einval!( + "invalid blob cache entry configuration information" + )); + } + if entry.validate() == false { + return Err(einval!( + "invalid blob cache entry configuration information" + )); } // Safe to unwrap because `DEVICE` is mandatory option. @@ -696,7 +704,7 @@ mod nbd { let daemon = create_nbd_daemon( device, threads, - config, + entry, bti, id, supervisor,