diff --git a/Cargo.lock b/Cargo.lock index 80709f97e57..6d48739a20f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dbs-allocator" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "543711b94b4bc1437d2ebb45f856452e96a45a67ab39f8dcf8c887c2a3701004" +dependencies = [ + "thiserror", +] + [[package]] name = "dbs-uhttp" version = "0.3.2" @@ -1254,6 +1263,8 @@ dependencies = [ name = "nydus-service" version = "0.2.0" dependencies = [ + "bytes", + "dbs-allocator", "fuse-backend-rs", "libc", "log", @@ -1270,6 +1281,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-uring", "vhost", "vhost-user-backend", "virtio-bindings", diff --git a/Cargo.toml b/Cargo.toml index a1351089858..96b774ae3b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,9 @@ virtiofs = [ "virtio-queue", "vm-memory", ] +block-nbd = [ + "nydus-service/block-nbd" +] backend-http-proxy = ["nydus-storage/backend-http-proxy"] backend-localdisk = ["nydus-storage/backend-localdisk"] diff --git a/api/src/config.rs b/api/src/config.rs index 6c65fcf317e..e47591bc192 100644 --- a/api/src/config.rs +++ b/api/src/config.rs @@ -890,11 +890,54 @@ pub struct BlobCacheEntryConfigV2 { /// Configuration information for local cache system. #[serde(default)] pub cache: CacheConfigV2, - /// Optional file path for metadata blobs. + /// Optional file path for metadata blob. #[serde(default)] pub metadata_path: Option, } +impl BlobCacheEntryConfigV2 { + /// Read configuration information from a file. + pub fn from_file>(path: P) -> Result { + let md = fs::metadata(path.as_ref())?; + if md.len() > 0x100000 { + return Err(eother!("configuration file size is too big")); + } + let content = fs::read_to_string(path)?; + Self::from_str(&content) + } + + /// Validate the configuration object. + pub fn validate(&self) -> bool { + if self.version != 2 { + return false; + } + let config: ConfigV2 = self.into(); + config.validate() + } +} + +impl FromStr for BlobCacheEntryConfigV2 { + type Err = Error; + + fn from_str(s: &str) -> Result { + if let Ok(v) = serde_json::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + if let Ok(v) = toml::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + Err(einval!("failed to parse configuration information")) + } +} + impl From<&BlobCacheEntryConfigV2> for ConfigV2 { fn from(c: &BlobCacheEntryConfigV2) -> Self { ConfigV2 { @@ -943,6 +986,106 @@ impl ConfigV2Internal { } } +/// Blob cache object type for nydus/rafs bootstrap blob. +pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; +/// Blob cache object type for nydus/rafs data blob. +pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; + +/// Configuration information for a cached blob. +#[derive(Debug, Deserialize, Serialize)] +pub struct BlobCacheEntry { + /// Type of blob object, bootstrap or data blob. + #[serde(rename = "type")] + pub blob_type: String, + /// Blob id. + #[serde(rename = "id")] + pub blob_id: String, + /// Configuration information to generate blob cache object. + #[serde(default, rename = "config")] + pub(crate) blob_config_legacy: Option, + /// Configuration information to generate blob cache object. + #[serde(default, rename = "config_v2")] + pub blob_config: Option, + /// Domain id for the blob, which is used to group cached blobs into management domains. + #[serde(default)] + pub domain_id: String, +} + +impl BlobCacheEntry { + pub fn prepare_configuration_info(&mut self) -> bool { + if self.blob_config.is_none() { + if let Some(legacy) = self.blob_config_legacy.as_ref() { + match legacy.try_into() { + Err(_) => return false, + Ok(v) => self.blob_config = Some(v), + } + } + } + + match self.blob_config.as_ref() { + None => false, + Some(cfg) => cfg.cache.validate() && cfg.backend.validate(), + } + } +} + +impl BlobCacheEntry { + /// Read configuration information from a file. + pub fn from_file>(path: P) -> Result { + let md = fs::metadata(path.as_ref())?; + if md.len() > 0x100000 { + return Err(eother!("configuration file size is too big")); + } + let content = fs::read_to_string(path)?; + Self::from_str(&content) + } + + /// Validate the configuration object. + pub fn validate(&self) -> bool { + if self.blob_type != BLOB_CACHE_TYPE_META_BLOB + && self.blob_type != BLOB_CACHE_TYPE_DATA_BLOB + { + warn!("invalid blob type {} for blob cache entry", self.blob_type); + return false; + } + if let Some(config) = self.blob_config.as_ref() { + if !config.validate() { + return false; + } + } + true + } +} + +impl FromStr for BlobCacheEntry { + type Err = Error; + + fn from_str(s: &str) -> Result { + if let Ok(v) = serde_json::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + if let Ok(v) = toml::from_str::(s) { + return if v.validate() { + Ok(v) + } else { + Err(einval!("invalid configuration")) + }; + } + Err(einval!("failed to parse configuration information")) + } +} + +/// Configuration information for a list of cached blob objects. +#[derive(Debug, Default, Deserialize, Serialize)] +pub struct BlobCacheList { + /// List of blob configuration information. + pub blobs: Vec, +} + fn default_true() -> bool { true } diff --git a/api/src/http.rs b/api/src/http.rs index ef6ebdbb8a7..92c342ce6e3 100644 --- a/api/src/http.rs +++ b/api/src/http.rs @@ -4,7 +4,6 @@ // // SPDX-License-Identifier: Apache-2.0 -use std::convert::TryInto; use std::io; use std::sync::mpsc::{RecvError, SendError}; @@ -12,7 +11,7 @@ use nydus_error::error::MetricsError; use serde::Deserialize; use serde_json::Error as SerdeError; -use crate::{BlobCacheEntryConfig, BlobCacheEntryConfigV2}; +use crate::BlobCacheEntry; /// Mount a filesystem. #[derive(Clone, Deserialize, Debug)] @@ -43,56 +42,6 @@ pub struct DaemonConf { pub log_level: String, } -/// Blob cache object type for nydus/rafs bootstrap blob. -pub const BLOB_CACHE_TYPE_META_BLOB: &str = "bootstrap"; -/// Blob cache object type for nydus/rafs data blob. -pub const BLOB_CACHE_TYPE_DATA_BLOB: &str = "datablob"; - -/// Configuration information for a cached blob. -#[derive(Debug, Deserialize, Serialize)] -pub struct BlobCacheEntry { - /// Type of blob object, bootstrap or data blob. - #[serde(rename = "type")] - pub blob_type: String, - /// Blob id. - #[serde(rename = "id")] - pub blob_id: String, - /// Configuration information to generate blob cache object. - #[serde(default, rename = "config")] - pub(crate) blob_config_legacy: Option, - /// Configuration information to generate blob cache object. - #[serde(default, rename = "config_v2")] - pub blob_config: Option, - /// Domain id for the blob, which is used to group cached blobs into management domains. - #[serde(default)] - pub domain_id: String, -} - -impl BlobCacheEntry { - pub fn prepare_configuration_info(&mut self) -> bool { - if self.blob_config.is_none() { - if let Some(legacy) = self.blob_config_legacy.as_ref() { - match legacy.try_into() { - Err(_) => return false, - Ok(v) => self.blob_config = Some(v), - } - } - } - - match self.blob_config.as_ref() { - None => false, - Some(cfg) => cfg.cache.validate() && cfg.backend.validate(), - } - } -} - -/// Configuration information for a list of cached blob objects. -#[derive(Debug, Default, Deserialize, Serialize)] -pub struct BlobCacheList { - /// List of blob configuration information. - pub blobs: Vec, -} - /// Identifier for cached blob objects. /// /// Domains are used to control the blob sharing scope. All blobs associated with the same domain diff --git a/docs/samples/blob_cache_entry.toml b/docs/samples/blob_cache_entry.toml new file mode 100644 index 00000000000..fea1c445fa8 --- /dev/null +++ b/docs/samples/blob_cache_entry.toml @@ -0,0 +1,145 @@ +# Configuration file for Nydus Image Service + +type = "bootstrap" +id = "image1" +domain_id = "domain1" + +# Configuration file format version number, must be 2. +[config_v2] +version = 2 +# Identifier for the instance. +id = "my_id" +# Optional file path for metadata blobs, for BlobCacheEntry only. +metadata_path = "/path/to/rafs/meta/data/blob" + +[config_v2.backend] +# Type of storage backend, valid values: "localfs", "oss", "registry" +type = "localfs" + +[config_v2.backend.localfs] +blob_file = "/tmp/nydus.blob.data" +dir = "/tmp" +alt_dirs = ["/var/nydus/cache"] + +[config_v2.backend.oss] +# Oss http scheme, either 'http' or 'https' +scheme = "http" +# Oss endpoint +endpoint = "my_endpoint" +# Oss bucket name +bucket_name = "my_bucket_name" +# Prefix object_prefix to OSS object key, for example the simulation of subdirectory: +object_prefix = "my_object_prefix" +# Oss access key +access_key_id = "my_access_key_id" +# Oss secret +access_key_secret = "my_access_key_secret" +# Skip SSL certificate validation for HTTPS scheme. +skip_verify = true +# Drop the read request once http request timeout, in seconds. +timeout = 10 +# Drop the read request once http connection timeout, in seconds. +connect_timeout = 10 +# Retry count when read request failed. +retry_limit = 5 + +[config_v2.backend.oss.proxy] +# Access remote storage backend via proxy, e.g. Dragonfly dfdaemon server URL. +url = "localhost:6789" +# Proxy health checking endpoint. +ping_url = "localhost:6789/ping" +# Fallback to remote storage backend if proxy ping failed. +fallback = true +# Interval for proxy health checking, in seconds. +check_interval = 5 +# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy. +use_http = false + +[[config_v2.backend.oss.mirrors]] +# Mirror server URL, for example http://127.0.0.1:65001. +host = "http://127.0.0.1:65001" +# Ping URL to check mirror server health. +ping_url = "http://127.0.0.1:65001/ping" +# HTTP request headers to be passed to mirror server. +# headers = +# Whether the authorization process is through mirror, default to false. +auth_through = true +# Interval for mirror health checking, in seconds. +health_check_interval = 5 +# Maximum number of failures before marking a mirror as unusable. +failure_limit = 5 + +[config_v2.backend.registry] +# Registry http scheme, either 'http' or 'https' +scheme = "https" +# Registry url host +host = "my.registry.com" +# Registry image name, like 'library/ubuntu' +repo = "nydus" +# Base64_encoded(username:password), the field should be sent to registry auth server to get a bearer token. +auth = "base64_encoded" +# Skip SSL certificate validation for HTTPS scheme. +skip_verify = true +# Drop the read request once http request timeout, in seconds. +timeout = 10 +# Drop the read request once http connection timeout, in seconds. +connect_timeout = 10 +# Retry count when read request failed. +retry_limit = 5 +# The field is a bearer token to be sent to registry to authorize registry requests. +registry_token = "bear_token" +# The http scheme to access blobs. +# It is used to workaround some P2P subsystem that requires a different scheme than the registry. +blob_url_scheme = "https" +# Redirect blob access to a different host regardless of the one specified in 'host'. +blob_redirected_host = "redirect.registry.com" + +[config_v2.backend.registry.proxy] +# Access remote storage backend via proxy, e.g. Dragonfly dfdaemon server URL. +url = "localhost:6789" +# Proxy health checking endpoint. +ping_url = "localhost:6789/ping" +# Fallback to remote storage backend if proxy ping failed. +fallback = true +# Interval for proxy health checking, in seconds. +check_interval = 5 +# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy. +use_http = false + +[[config_v2.backend.registry.mirrors]] +# Mirror server URL, for example http://127.0.0.1:65001. +host = "http://127.0.0.1:65001" +# Ping URL to check mirror server health. +ping_url = "http://127.0.0.1:65001/ping" +# HTTP request headers to be passed to mirror server. +# headers = +# Whether the authorization process is through mirror, default to false. +auth_through = true +# Interval for mirror health checking, in seconds. +health_check_interval = 5 +# Maximum number of failures before marking a mirror as unusable. +failure_limit = 5 + +[config_v2.cache] +# Type of blob cache: "blobcache", "filecache", "fscache", "dummycache" or "" +type = "filecache" +# Whether to cache compressed or uncompressed data. +compressed = true +# Whether to validate data read from the cache. +validate = true + +[config_v2.cache.filecache] +work_dir = "." + +[config_v2.cache.fscache] +work_dir = "." + +[config_v2.cache.prefetch] +# Whether to enable blob data prefetching. +enable = true +# Number of data prefetching working threads, valid values: 1-1024. +threads = 8 +# The batch size to prefetch data from backend, valid values: 0-0x10000000. +batch_size = 1000000 +# Network bandwidth rate limit in unit of Bytes and Zero means no limit. +bandwidth_limit = 10000000 diff --git a/rafs/src/metadata/direct_v6.rs b/rafs/src/metadata/direct_v6.rs index 2725c515419..8c71777e03e 100644 --- a/rafs/src/metadata/direct_v6.rs +++ b/rafs/src/metadata/direct_v6.rs @@ -38,15 +38,16 @@ use nydus_utils::{digest::RafsDigest, div_round_up, round_up}; use crate::metadata::layout::v5::RafsV5ChunkInfo; use crate::metadata::layout::v6::{ - recover_namespace, RafsV6BlobTable, RafsV6Dirent, RafsV6InodeChunkAddr, RafsV6InodeCompact, - RafsV6InodeExtended, RafsV6OndiskInode, RafsV6XattrEntry, RafsV6XattrIbodyHeader, - EROFS_BLOCK_SIZE, EROFS_INODE_CHUNK_BASED, EROFS_INODE_FLAT_INLINE, EROFS_INODE_FLAT_PLAIN, - EROFS_INODE_SLOT_SIZE, EROFS_I_DATALAYOUT_BITS, EROFS_I_VERSION_BIT, EROFS_I_VERSION_BITS, + rafsv6_load_blob_extra_info, recover_namespace, RafsV6BlobTable, RafsV6Dirent, + RafsV6InodeChunkAddr, RafsV6InodeCompact, RafsV6InodeExtended, RafsV6OndiskInode, + RafsV6XattrEntry, RafsV6XattrIbodyHeader, EROFS_BLOCK_SIZE, EROFS_INODE_CHUNK_BASED, + EROFS_INODE_FLAT_INLINE, EROFS_INODE_FLAT_PLAIN, EROFS_INODE_SLOT_SIZE, + EROFS_I_DATALAYOUT_BITS, EROFS_I_VERSION_BIT, EROFS_I_VERSION_BITS, }; use crate::metadata::layout::{bytes_to_os_str, MetaRange, XattrName, XattrValue}; use crate::metadata::{ - Attr, Entry, Inode, RafsInode, RafsInodeWalkAction, RafsInodeWalkHandler, RafsSuperBlock, - RafsSuperInodes, RafsSuperMeta, RAFS_ATTR_BLOCK_SIZE, RAFS_MAX_NAME, + Attr, Entry, Inode, RafsBlobExtraInfo, RafsInode, RafsInodeWalkAction, RafsInodeWalkHandler, + RafsSuperBlock, RafsSuperInodes, RafsSuperMeta, RAFS_ATTR_BLOCK_SIZE, RAFS_MAX_NAME, }; use crate::{MetaType, RafsError, RafsInodeExt, RafsIoReader, RafsResult}; @@ -63,6 +64,7 @@ fn err_invalidate_data(rafs_err: RafsError) -> std::io::Error { struct DirectMappingState { meta: Arc, blob_table: RafsV6BlobTable, + blob_extra_infos: HashMap, map: FileMapState, } @@ -71,6 +73,7 @@ impl DirectMappingState { DirectMappingState { meta: Arc::new(*meta), blob_table: RafsV6BlobTable::default(), + blob_extra_infos: HashMap::new(), map: FileMapState::default(), } } @@ -187,11 +190,13 @@ impl DirectSuperBlockV6 { let meta = &old_state.meta; r.seek(SeekFrom::Start(meta.blob_table_offset))?; blob_table.load(r, meta.blob_table_size, meta.chunk_size, meta.flags)?; + let blob_extra_infos = rafsv6_load_blob_extra_info(meta, r)?; let file_map = FileMapState::new(file, 0, len as usize, false)?; let state = DirectMappingState { meta: old_state.meta.clone(), blob_table, + blob_extra_infos, map: file_map, }; @@ -283,6 +288,10 @@ impl RafsSuperBlock for DirectSuperBlockV6 { self.state.load().blob_table.get_all() } + fn get_blob_extra_infos(&self) -> Result> { + Ok(self.state.load().blob_extra_infos.clone()) + } + fn root_ino(&self) -> u64 { self.info.root_ino } diff --git a/rafs/src/metadata/layout/v6.rs b/rafs/src/metadata/layout/v6.rs index 6cf13afd01b..a7e88dd2714 100644 --- a/rafs/src/metadata/layout/v6.rs +++ b/rafs/src/metadata/layout/v6.rs @@ -3,6 +3,7 @@ // // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::convert::TryFrom; use std::ffi::{OsStr, OsString}; use std::fmt::Debug; @@ -21,48 +22,53 @@ use nydus_storage::{RAFS_MAX_CHUNKS_PER_BLOB, RAFS_MAX_CHUNK_SIZE}; use nydus_utils::{compress, digest, round_up, ByteSize}; use crate::metadata::layout::v5::RafsV5ChunkInfo; -use crate::metadata::layout::MetaRange; -use crate::metadata::{layout::RafsXAttrs, RafsStore, RafsSuperFlags}; +use crate::metadata::layout::{MetaRange, RafsXAttrs}; +use crate::metadata::{RafsBlobExtraInfo, RafsStore, RafsSuperFlags, RafsSuperMeta}; use crate::{impl_bootstrap_converter, impl_pub_getter_setter, RafsIoReader, RafsIoWrite}; /// EROFS metadata slot size. pub const EROFS_INODE_SLOT_SIZE: usize = 1 << EROFS_INODE_SLOT_BITS; +/// Bits of EROFS logical block size. +pub const EROFS_BLOCK_BITS: u8 = 12; /// EROFS logical block size. pub const EROFS_BLOCK_SIZE: u64 = 1u64 << EROFS_BLOCK_BITS; -/// EROFS plain inode. -pub const EROFS_INODE_FLAT_PLAIN: u16 = 0; -/// EROFS inline inode. -pub const EROFS_INODE_FLAT_INLINE: u16 = 2; -/// EROFS chunked inode. -pub const EROFS_INODE_CHUNK_BASED: u16 = 4; + +/// Offset of EROFS super block. +pub const EROFS_SUPER_OFFSET: u16 = 1024; +/// Size of EROFS super block. +pub const EROFS_SUPER_BLOCK_SIZE: u16 = 128; +/// Size of extended super block, used for rafs v6 specific fields +pub const EROFS_EXT_SUPER_BLOCK_SIZE: u16 = 256; /// EROFS device table offset. pub const EROFS_DEVTABLE_OFFSET: u16 = EROFS_SUPER_OFFSET + EROFS_SUPER_BLOCK_SIZE + EROFS_EXT_SUPER_BLOCK_SIZE; +/// Offseet for inode format flags: compact or extended. pub const EROFS_I_VERSION_BIT: u16 = 0; +/// Number of bits for inode format flags. pub const EROFS_I_VERSION_BITS: u16 = 1; +/// 32-byte on-disk inode +pub const EROFS_INODE_LAYOUT_COMPACT: u16 = 0; +/// 64-byte on-disk inode +pub const EROFS_INODE_LAYOUT_EXTENDED: u16 = 1; +/// Number of bits for inode data layout. pub const EROFS_I_DATALAYOUT_BITS: u16 = 3; +/// EROFS plain inode. +pub const EROFS_INODE_FLAT_PLAIN: u16 = 0; +/// EROFS inline inode. +pub const EROFS_INODE_FLAT_INLINE: u16 = 2; +/// EROFS chunked inode. +pub const EROFS_INODE_CHUNK_BASED: u16 = 4; -// Offset of EROFS super block. -pub const EROFS_SUPER_OFFSET: u16 = 1024; -// Size of EROFS super block. -pub const EROFS_SUPER_BLOCK_SIZE: u16 = 128; -// Size of extended super block, used for rafs v6 specific fields -const EROFS_EXT_SUPER_BLOCK_SIZE: u16 = 256; // Magic number for EROFS super block. const EROFS_SUPER_MAGIC_V1: u32 = 0xE0F5_E1E2; -// Bits of EROFS logical block size. -const EROFS_BLOCK_BITS: u8 = 12; // Bits of EROFS metadata slot size. const EROFS_INODE_SLOT_BITS: u8 = 5; -// 32-byte on-disk inode -const EROFS_INODE_LAYOUT_COMPACT: u16 = 0; -// 64-byte on-disk inode -const EROFS_INODE_LAYOUT_EXTENDED: u16 = 1; // Bit flag indicating whether the inode is chunked or not. const EROFS_CHUNK_FORMAT_INDEXES_FLAG: u16 = 0x0020; // Encoded chunk size (log2(chunk_size) - EROFS_BLOCK_BITS). const EROFS_CHUNK_FORMAT_SIZE_MASK: u16 = 0x001F; + /// Checksum of superblock, compatible with EROFS versions prior to Linux kernel 5.5. #[allow(dead_code)] const EROFS_FEATURE_COMPAT_SB_CHKSUM: u32 = 0x0000_0001; @@ -72,6 +78,7 @@ const EROFS_FEATURE_COMPAT_RAFS_V6: u32 = 0x4000_0000; const EROFS_FEATURE_INCOMPAT_CHUNKED_FILE: u32 = 0x0000_0004; /// Multi-devices, incompatible with EROFS versions prior to Linux kernel 5.16. const EROFS_FEATURE_INCOMPAT_DEVICE_TABLE: u32 = 0x0000_0008; + /// Size of SHA256 digest string. const BLOB_SHA256_LEN: usize = 64; const BLOB_MAX_SIZE_UNCOMPRESSED: u64 = 1u64 << 44; @@ -97,7 +104,7 @@ pub struct RafsV6SuperBlock { s_extslots: u8, /// Nid of the root directory. /// `root inode offset = s_meta_blkaddr * 4096 + s_root_nid * 32`. - pub s_root_nid: u16, + s_root_nid: u16, /// Total valid ino # s_inos: u64, /// Timestamp of filesystem creation. @@ -107,7 +114,7 @@ pub struct RafsV6SuperBlock { /// Total size of file system in blocks, used for statfs s_blocks: u32, /// Start block address of the metadata area. - pub s_meta_blkaddr: u32, + s_meta_blkaddr: u32, /// Start block address of the shared xattr area. s_xattr_blkaddr: u32, /// 128-bit uuid for volume @@ -150,13 +157,19 @@ impl RafsV6SuperBlock { meta_size ))); } - if meta_size & (EROFS_BLOCK_SIZE - 1) != 0 { return Err(einval!(format!( "invalid Rafs v6 metadata size: bootstrap size {} is not aligned", meta_size ))); } + let meta_addr = u32::from_le(self.s_meta_blkaddr) as u64 * EROFS_BLOCK_SIZE; + if meta_addr > meta_size { + return Err(einval!(format!( + "invalid Rafs v6 meta block address 0x{:x}, meta file size 0x{:x}", + meta_addr, meta_size + ))); + } if u32::from_le(self.s_magic) != EROFS_SUPER_MAGIC_V1 { return Err(einval!(format!( @@ -179,14 +192,14 @@ impl RafsV6SuperBlock { ))); } - if self.s_inos == 0 { - return Err(einval!("invalid inode number in Rafsv6 superblock")); - } - if self.s_extslots != 0 { return Err(einval!("invalid extended slots in Rafsv6 superblock")); } + if self.s_inos == 0 { + return Err(einval!("invalid inode number in Rafsv6 superblock")); + } + if self.s_u != 0 { return Err(einval!("invalid union field in Rafsv6 superblock")); } @@ -206,14 +219,21 @@ impl RafsV6SuperBlock { return Err(einval!("invalid extra device count in Rafsv6 superblock")); } - if u16::from_le(self.s_devt_slotoff) - != (EROFS_DEVTABLE_OFFSET / size_of::() as u16) - { + let devtable_off = + u16::from_le(self.s_devt_slotoff) as u64 * size_of::() as u64; + if devtable_off != EROFS_DEVTABLE_OFFSET as u64 { return Err(einval!(format!( "invalid device table slot offset {} in Rafsv6 superblock", u16::from_le(self.s_devt_slotoff) ))); } + let devtable_end = devtable_off + u16::from_le(self.s_extra_devices) as u64; + if devtable_end > meta_size { + return Err(einval!(format!( + "invalid device table slot count {} in Rafsv6 superblock", + u16::from_le(self.s_extra_devices) + ))); + } // s_build_time may be used as compact_inode's timestamp in the future. // if u64::from_le(self.s_build_time) != 0 || u32::from_le(self.s_build_time_nsec) != 0 { @@ -259,23 +279,34 @@ impl RafsV6SuperBlock { self.s_blocks = blocks.to_le(); } + /// Get root nid. + pub fn root_nid(&self) -> u16 { + u16::from_le(self.s_root_nid) + } + /// Set EROFS root nid. pub fn set_root_nid(&mut self, nid: u16) { self.s_root_nid = nid.to_le(); } + /// Get meta block address. + pub fn meta_addr(&self) -> u32 { + u32::from_le(self.s_meta_blkaddr) + } + /// Set EROFS meta block address. pub fn set_meta_addr(&mut self, meta_addr: u64) { assert!((meta_addr / EROFS_BLOCK_SIZE) <= u32::MAX as u64); self.s_meta_blkaddr = u32::to_le((meta_addr / EROFS_BLOCK_SIZE) as u32); } - /// Set number of extra devices. - pub fn set_extra_devices(&mut self, count: u16) { - self.s_extra_devices = count.to_le(); + /// Get device table offset. + pub fn device_table_offset(&self) -> u64 { + u16::from_le(self.s_devt_slotoff) as u64 * size_of::() as u64 } impl_pub_getter_setter!(magic, set_magic, s_magic, u32); + impl_pub_getter_setter!(extra_devices, set_extra_devices, s_extra_devices, u16); } impl RafsStore for RafsV6SuperBlock { @@ -365,7 +396,7 @@ impl RafsV6SuperBlockExt { } /// Validate the Rafs v6 super block. - pub fn validate(&self, meta_size: u64) -> Result<()> { + pub fn validate(&self, meta_size: u64, meta: &RafsSuperMeta) -> Result<()> { let mut flags = self.flags(); flags &= RafsSuperFlags::COMPRESSION_NONE.bits() | RafsSuperFlags::COMPRESSION_LZ4.bits() @@ -394,10 +425,13 @@ impl RafsV6SuperBlockExt { return Err(einval!("invalid chunk size in Rafs v6 extended superblock")); } + let devslot_end = meta.blob_device_table_offset + meta.blob_table_size as u64; + let blob_offset = self.blob_table_offset(); let blob_size = self.blob_table_size() as u64; if blob_offset & (EROFS_BLOCK_SIZE - 1) != 0 || blob_offset < EROFS_BLOCK_SIZE + || blob_offset < devslot_end || blob_size % size_of::() as u64 != 0 || blob_offset.checked_add(blob_size).is_none() || blob_offset + blob_size > meta_size @@ -409,11 +443,13 @@ impl RafsV6SuperBlockExt { } let blob_range = MetaRange::new(blob_offset, blob_size, true)?; + let mut chunk_info_tbl_range = None; if self.chunk_table_size() > 0 { let chunk_tbl_offset = self.chunk_table_offset(); let chunk_tbl_size = self.chunk_table_size(); if chunk_tbl_offset < EROFS_BLOCK_SIZE || chunk_tbl_offset % EROFS_BLOCK_SIZE != 0 + || chunk_tbl_offset < devslot_end || chunk_tbl_size % size_of::() as u64 != 0 || chunk_tbl_offset.checked_add(chunk_tbl_size).is_none() || chunk_tbl_offset + chunk_tbl_size > meta_size @@ -429,6 +465,7 @@ impl RafsV6SuperBlockExt { "blob table intersects with chunk table in Rafs v6 extended superblock", ))); } + chunk_info_tbl_range = Some(chunk_range); } // Legacy RAFS may have zero prefetch table offset but non-zero prefetch table size for @@ -438,6 +475,7 @@ impl RafsV6SuperBlockExt { let tbl_size = self.prefetch_table_size() as u64; if tbl_offset < EROFS_BLOCK_SIZE || tbl_size % size_of::() as u64 != 0 + || tbl_offset < devslot_end || tbl_offset.checked_add(tbl_size).is_none() || tbl_offset + tbl_size > meta_size { @@ -452,6 +490,13 @@ impl RafsV6SuperBlockExt { "blob table intersects with prefetch table in Rafs v6 extended superblock", ))); } + if let Some(chunk_range) = chunk_info_tbl_range.as_ref() { + if chunk_range.intersect_with(&prefetch_range) { + return Err(einval!(format!( + "chunk information table intersects with prefetch table in Rafs v6 extended superblock", + ))); + } + } } Ok(()) @@ -1188,21 +1233,6 @@ impl RafsV6Device { self.blob_id.copy_from_slice(id); } - /// Get number of blocks. - pub fn blocks(&self) -> u32 { - u32::from_le(self.blocks) - } - - /// Set number of blocks. - pub fn set_blocks(&mut self, blocks: u32) { - self.blocks = blocks.to_le(); - } - - /// Set mapped block address. - pub fn set_mapped_blkaddr(&mut self, addr: u32) { - self.mapped_blkaddr = addr.to_le(); - } - /// Load a `RafsV6Device` from a reader. pub fn load(&mut self, r: &mut RafsIoReader) -> Result<()> { r.read_exact(self.as_mut()) @@ -1213,23 +1243,25 @@ impl RafsV6Device { match String::from_utf8(self.blob_id.to_vec()) { Ok(v) => { if v.len() != BLOB_SHA256_LEN { - return Err(einval!(format!("v.len {} is invalid", v.len()))); + return Err(einval!(format!( + "Length of blob_id {} in RAFS v6 device entry is invalid", + v.len() + ))); } } - Err(_) => return Err(einval!("blob_id from_utf8 is invalid")), + Err(_) => return Err(einval!("blob_id in RAFS v6 device entry is invalid")), } if self.blocks() == 0 { - let msg = format!("invalid blocks {} in Rafs v6 Device", self.blocks()); + let msg = format!("invalid blocks {} in Rafs v6 device entry", self.blocks()); return Err(einval!(msg)); } - if u32::from_le(self.mapped_blkaddr) != 0 { - return Err(einval!("invalid mapped_addr in Rafs v6 Device")); - } - Ok(()) } + + impl_pub_getter_setter!(blocks, set_blocks, blocks, u32); + impl_pub_getter_setter!(mapped_blkaddr, set_mapped_blkaddr, mapped_blkaddr, u32); } impl_bootstrap_converter!(RafsV6Device); @@ -1242,6 +1274,34 @@ impl RafsStore for RafsV6Device { } } +/// Load blob information table from a reader. +pub fn rafsv6_load_blob_extra_info( + meta: &RafsSuperMeta, + r: &mut RafsIoReader, +) -> Result> { + let mut infos = HashMap::new(); + if meta.blob_device_table_count == 0 { + return Ok(infos); + } + r.seek_to_offset(meta.blob_device_table_offset)?; + for _idx in 0..meta.blob_device_table_count { + let mut devslot = RafsV6Device::new(); + r.read_exact(devslot.as_mut())?; + devslot.validate()?; + let id = String::from_utf8(devslot.blob_id.to_vec()) + .map_err(|e| einval!(format!("invalid blob id, {}", e)))?; + let info = RafsBlobExtraInfo { + mapped_blkaddr: devslot.mapped_blkaddr(), + }; + if infos.contains_key(&id) { + return Err(einval!("duplicated blob id in RAFS v6 device table")); + } + infos.insert(id, info); + } + + Ok(infos) +} + #[inline] pub fn align_offset(offset: u64, aligned_size: u64) -> u64 { round_up(offset, aligned_size) diff --git a/rafs/src/metadata/md_v6.rs b/rafs/src/metadata/md_v6.rs index 5d2abb829ae..a6f9e95d941 100644 --- a/rafs/src/metadata/md_v6.rs +++ b/rafs/src/metadata/md_v6.rs @@ -31,12 +31,14 @@ impl RafsSuper { sb.validate(end)?; self.meta.version = RAFS_SUPER_VERSION_V6; self.meta.magic = sb.magic(); - self.meta.meta_blkaddr = sb.s_meta_blkaddr; - self.meta.root_nid = sb.s_root_nid; + self.meta.meta_blkaddr = sb.meta_addr(); + self.meta.root_nid = sb.root_nid(); + self.meta.blob_device_table_count = sb.extra_devices() as u32; + self.meta.blob_device_table_offset = sb.device_table_offset(); let mut ext_sb = RafsV6SuperBlockExt::new(); ext_sb.load(r)?; - ext_sb.validate(end)?; + ext_sb.validate(end, &self.meta)?; self.meta.chunk_size = ext_sb.chunk_size(); self.meta.blob_table_offset = ext_sb.blob_table_offset(); self.meta.blob_table_size = ext_sb.blob_table_size(); diff --git a/rafs/src/metadata/mod.rs b/rafs/src/metadata/mod.rs index af1775e8dd0..7949c26c9e3 100644 --- a/rafs/src/metadata/mod.rs +++ b/rafs/src/metadata/mod.rs @@ -6,7 +6,7 @@ //! Enums, Structs and Traits to access and manage Rafs filesystem metadata. use std::any::Any; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; use std::ffi::{OsStr, OsString}; use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; @@ -68,6 +68,14 @@ pub const DOTDOT: &str = ".."; /// Type for RAFS filesystem inode number. pub type Inode = u64; +#[derive(Debug, Clone)] +pub struct RafsBlobExtraInfo { + /// Mapped block address from RAFS v6 devslot table. + /// + /// It's the offset of the uncompressed blob used to convert an image into a disk. + pub mapped_blkaddr: u32, +} + /// Trait to access filesystem inodes managed by a RAFS filesystem. pub trait RafsSuperInodes { /// Get the maximum inode number managed by the RAFS filesystem. @@ -95,6 +103,11 @@ pub trait RafsSuperBlock: RafsSuperInodes + Send + Sync { /// Get all blob objects referenced by the RAFS filesystem. fn get_blob_infos(&self) -> Vec>; + /// Get extra information associated with blob objects. + fn get_blob_extra_infos(&self) -> Result> { + Ok(HashMap::new()) + } + /// Get the inode number of the RAFS filesystem root. fn root_ino(&self) -> u64; @@ -423,6 +436,10 @@ pub struct RafsSuperMeta { pub extended_blob_table_offset: u64, /// Offset of the extended blob information table into the metadata blob. pub extended_blob_table_entries: u32, + /// Number of RAFS v6 blob device entries in the devslot table. + pub blob_device_table_count: u32, + /// Offset of the RAFS v6 devslot table. + pub blob_device_table_offset: u64, /// Offset of the inode prefetch table into the metadata blob. pub prefetch_table_offset: u64, /// Size of the inode prefetch table. @@ -520,6 +537,8 @@ impl Default for RafsSuperMeta { blob_table_offset: 0, extended_blob_table_offset: 0, extended_blob_table_entries: 0, + blob_device_table_count: 0, + blob_device_table_offset: 0, prefetch_table_offset: 0, prefetch_table_entries: 0, attr_timeout: Duration::from_secs(RAFS_DEFAULT_ATTR_TIMEOUT), diff --git a/service/Cargo.toml b/service/Cargo.toml index 9499988badc..22bff2906c1 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -10,6 +10,8 @@ edition = "2018" resolver = "2" [dependencies] +bytes = { version = "1", optional = true } +dbs-allocator = { version = "0.1.1", optional = true } fuse-backend-rs = "0.10.1" libc = "0.2" log = "0.4.8" @@ -21,6 +23,7 @@ serde_json = "1.0.51" thiserror = "1.0" time = { version = "0.3.14", features = ["serde-human-readable"] } tokio = { version = "1.24", features = ["macros"] } +tokio-uring = "0.4" nydus-api = { version = "0.2.2", path = "../api", features = ["handler"] } nydus-error = { version = "0.2.3", path = "../error" } @@ -48,3 +51,5 @@ virtiofs = [ "virtio-bindings", ] +block-device = [ "dbs-allocator", "tokio/fs"] +block-nbd = ["block-device", "bytes"] diff --git a/service/src/blob_cache.rs b/service/src/blob_cache.rs index e9547c7dbcd..ce8040ca93b 100644 --- a/service/src/blob_cache.rs +++ b/service/src/blob_cache.rs @@ -5,7 +5,9 @@ //! Blob cache manager to cache RAFS meta/data blob objects. use std::collections::HashMap; +use std::fs::OpenOptions; use std::io::{Error, ErrorKind, Result}; +use std::os::fd::FromRawFd; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex, MutexGuard}; @@ -14,8 +16,13 @@ use nydus_api::{ BlobCacheEntry, BlobCacheList, BlobCacheObjectId, ConfigV2, BLOB_CACHE_TYPE_DATA_BLOB, BLOB_CACHE_TYPE_META_BLOB, }; -use nydus_rafs::metadata::RafsSuper; +use nydus_rafs::metadata::layout::v6::{EROFS_BLOCK_BITS, EROFS_BLOCK_SIZE}; +use nydus_rafs::metadata::{RafsBlobExtraInfo, RafsSuper}; +use nydus_storage::cache::BlobCache; use nydus_storage::device::BlobInfo; +use nydus_storage::factory::BLOB_FACTORY; +use tokio_uring::buf::IoBufMut; +use tokio_uring::fs::File; const ID_SPLITTER: &str = "/"; @@ -28,16 +35,22 @@ pub fn generate_blob_key(domain_id: &str, blob_id: &str) -> String { } } -/// Configuration information for cached meta blob objects. -pub struct BlobCacheConfigMetaBlob { - _blob_id: String, +/// Configuration information for a cached metadata blob. +pub struct MetaBlobConfig { + blob_id: String, scoped_blob_id: String, path: PathBuf, config: Arc, - data_blobs: Mutex>>, + blobs: Mutex>>, + blob_extra_infos: HashMap, } -impl BlobCacheConfigMetaBlob { +impl MetaBlobConfig { + /// Get blob id. + pub fn blob_id(&self) -> &str { + &self.blob_id + } + /// Get file path to access the meta blob. pub fn path(&self) -> &Path { &self.path @@ -48,20 +61,29 @@ impl BlobCacheConfigMetaBlob { &self.config } - fn add_data_blob(&self, blob: Arc) { - self.data_blobs.lock().unwrap().push(blob); + pub fn get_blobs(&self) -> Vec> { + self.blobs.lock().unwrap().clone() + } + + /// Get optional extra information associated with a blob object. + pub fn get_blob_extra_info(&self, blob_id: &str) -> Option<&RafsBlobExtraInfo> { + self.blob_extra_infos.get(blob_id) + } + + fn add_data_blob(&self, blob: Arc) { + self.blobs.lock().unwrap().push(blob); } } -/// Configuration information for cached data blob objects. -pub struct BlobCacheConfigDataBlob { - blob_info: Arc, +/// Configuration information for a cached data blob. +pub struct DataBlobConfig { scoped_blob_id: String, + blob_info: Arc, config: Arc, ref_count: AtomicU32, } -impl BlobCacheConfigDataBlob { +impl DataBlobConfig { /// Get the [`BlobInfo`](https://docs.rs/nydus-storage/latest/nydus_storage/device/struct.BlobInfo.html) object associated with the cached data blob. pub fn blob_info(&self) -> &Arc { &self.blob_info @@ -73,28 +95,28 @@ impl BlobCacheConfigDataBlob { } } -/// Configuration information for cached blob objects. +/// Configuration information for a cached metadata/data blob. #[derive(Clone)] -pub enum BlobCacheObjectConfig { +pub enum BlobConfig { /// Configuration information for cached meta blob objects. - MetaBlob(Arc), + MetaBlob(Arc), /// Configuration information for cached data blob objects. - DataBlob(Arc), + DataBlob(Arc), } -impl BlobCacheObjectConfig { +impl BlobConfig { /// Get the ['ConfigV2'] object associated with the cached data blob. pub fn config_v2(&self) -> &Arc { match self { - BlobCacheObjectConfig::MetaBlob(v) => v.config_v2(), - BlobCacheObjectConfig::DataBlob(v) => v.config_v2(), + BlobConfig::MetaBlob(v) => v.config_v2(), + BlobConfig::DataBlob(v) => v.config_v2(), } } fn new_data_blob(domain_id: String, blob_info: Arc, config: Arc) -> Self { let scoped_blob_id = generate_blob_key(&domain_id, &blob_info.blob_id()); - BlobCacheObjectConfig::DataBlob(Arc::new(BlobCacheConfigDataBlob { + BlobConfig::DataBlob(Arc::new(DataBlobConfig { blob_info, scoped_blob_id, config, @@ -107,36 +129,38 @@ impl BlobCacheObjectConfig { blob_id: String, path: PathBuf, config: Arc, + blob_extra_infos: HashMap, ) -> Self { let scoped_blob_id = generate_blob_key(&domain_id, &blob_id); - BlobCacheObjectConfig::MetaBlob(Arc::new(BlobCacheConfigMetaBlob { - _blob_id: blob_id, + BlobConfig::MetaBlob(Arc::new(MetaBlobConfig { + blob_id, scoped_blob_id, path, config, - data_blobs: Mutex::new(Vec::new()), + blobs: Mutex::new(Vec::new()), + blob_extra_infos, })) } fn key(&self) -> &str { match self { - BlobCacheObjectConfig::MetaBlob(o) => &o.scoped_blob_id, - BlobCacheObjectConfig::DataBlob(o) => &o.scoped_blob_id, + BlobConfig::MetaBlob(o) => &o.scoped_blob_id, + BlobConfig::DataBlob(o) => &o.scoped_blob_id, } } - fn meta_config(&self) -> Option> { + fn meta_config(&self) -> Option> { match self { - BlobCacheObjectConfig::MetaBlob(o) => Some(o.clone()), - BlobCacheObjectConfig::DataBlob(_o) => None, + BlobConfig::MetaBlob(o) => Some(o.clone()), + BlobConfig::DataBlob(_o) => None, } } } #[derive(Default)] struct BlobCacheState { - id_to_config_map: HashMap, + id_to_config_map: HashMap, } impl BlobCacheState { @@ -146,19 +170,19 @@ impl BlobCacheState { } } - fn try_add(&mut self, config: BlobCacheObjectConfig) -> Result<()> { + fn try_add(&mut self, config: BlobConfig) -> Result<()> { let key = config.key(); if let Some(entry) = self.id_to_config_map.get(key) { match entry { - BlobCacheObjectConfig::MetaBlob(_o) => { + BlobConfig::MetaBlob(_o) => { // Meta blob must be unique. return Err(Error::new( ErrorKind::AlreadyExists, "blob_cache: bootstrap blob already exists", )); } - BlobCacheObjectConfig::DataBlob(o) => { + BlobConfig::DataBlob(o) => { // Data blob is reference counted. o.ref_count.fetch_add(1, Ordering::AcqRel); } @@ -175,12 +199,8 @@ impl BlobCacheState { // Remove all blobs associated with the domain. let scoped_blob_prefix = format!("{}{}", param.domain_id, ID_SPLITTER); self.id_to_config_map.retain(|_k, v| match v { - BlobCacheObjectConfig::MetaBlob(o) => { - !o.scoped_blob_id.starts_with(&scoped_blob_prefix) - } - BlobCacheObjectConfig::DataBlob(o) => { - !o.scoped_blob_id.starts_with(&scoped_blob_prefix) - } + BlobConfig::MetaBlob(o) => !o.scoped_blob_id.starts_with(&scoped_blob_prefix), + BlobConfig::DataBlob(o) => !o.scoped_blob_id.starts_with(&scoped_blob_prefix), }); } else { let mut data_blobs = Vec::new(); @@ -189,11 +209,11 @@ impl BlobCacheState { match self.id_to_config_map.get(&scoped_blob_prefix) { None => return Err(enoent!("blob_cache: cache entry not found")), - Some(BlobCacheObjectConfig::MetaBlob(o)) => { + Some(BlobConfig::MetaBlob(o)) => { is_meta = true; - data_blobs = o.data_blobs.lock().unwrap().clone(); + data_blobs = o.blobs.lock().unwrap().clone(); } - Some(BlobCacheObjectConfig::DataBlob(o)) => { + Some(BlobConfig::DataBlob(o)) => { data_blobs.push(o.clone()); } } @@ -212,7 +232,7 @@ impl BlobCacheState { Ok(()) } - fn get(&self, key: &str) -> Option { + fn get(&self, key: &str) -> Option { self.id_to_config_map.get(key).cloned() } } @@ -253,18 +273,20 @@ impl BlobCacheMgr { e }) } - BLOB_CACHE_TYPE_DATA_BLOB => { - warn!("blob_cache: invalid data blob cache entry: {:?}", entry); - Err(einval!("blob_cache: invalid data blob cache entry")) - } - _ => { - warn!("blob_cache: invalid blob cache entry: {:?}", entry); - Err(einval!("blob_cache: invalid blob cache entry")) - } + BLOB_CACHE_TYPE_DATA_BLOB => Err(einval!(format!( + "blob_cache: invalid data blob cache entry: {:?}", + entry + ))), + _ => Err(einval!(format!( + "blob_cache: invalid blob cache entry, {:?}", + entry + ))), } } /// Add a list of meta/data blobs to be cached by the cache manager. + /// + /// If failed to add some blob, the blobs already added won't be rolled back. pub fn add_blob_list(&self, blobs: &BlobCacheList) -> Result<()> { for entry in blobs.blobs.iter() { self.add_blob_entry(entry)?; @@ -279,7 +301,7 @@ impl BlobCacheMgr { } /// Get configuration information of the cached blob with specified `key`. - pub fn get_config(&self, key: &str) -> Option { + pub fn get_config(&self, key: &str) -> Option { self.get_state().get(key) } @@ -289,17 +311,10 @@ impl BlobCacheMgr { } fn get_meta_info(&self, entry: &BlobCacheEntry) -> Result<(PathBuf, Arc)> { - // Validate type of backend and cache. let config = entry .blob_config .as_ref() .ok_or_else(|| einval!("missing blob cache configuration information"))?; - if config.cache.cache_type != "fscache" { - return Err(einval!( - "blob_cache: `config.cache_type` for meta blob is invalid" - )); - } - let cache_config = config.cache.get_fscache_config()?; if entry.blob_id.contains(ID_SPLITTER) { return Err(einval!("blob_cache: `blob_id` for meta blob is invalid")); @@ -313,24 +328,45 @@ impl BlobCacheMgr { "blob_cache: `config.metadata_path` for meta blob is empty" )); } - let path = Path::new(&path) - .canonicalize() - .map_err(|_e| einval!("blob_cache: `config.metadata_path` for meta blob is invalid"))?; + let path = Path::new(&path).canonicalize().map_err(|_e| { + einval!(format!( + "blob_cache: `config.metadata_path={}` for meta blob is invalid", + path + )) + })?; if !path.is_file() { return Err(einval!( "blob_cache: `config.metadata_path` for meta blob is not a file" )); } - // Validate the working directory for fscache - let path2 = Path::new(&cache_config.work_dir); - let path2 = path2 - .canonicalize() - .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; - if !path2.is_dir() { - return Err(einval!( - "blob_cache: `config.cache_config.work_dir` is not a directory" - )); + // Validate type of backend and cache. + if config.cache.is_fscache() { + // Validate the working directory for fscache + let cache_config = config.cache.get_fscache_config()?; + let path2 = Path::new(&cache_config.work_dir); + let path2 = path2 + .canonicalize() + .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; + if !path2.is_dir() { + return Err(einval!( + "blob_cache: `config.cache_config.work_dir` is not a directory" + )); + } + } else if config.cache.is_filecache() { + // Validate the working directory for filecache + let cache_config = config.cache.get_filecache_config()?; + let path2 = Path::new(&cache_config.work_dir); + let path2 = path2 + .canonicalize() + .map_err(|_e| eio!("blob_cache: `config.cache_config.work_dir` is invalid"))?; + if !path2.is_dir() { + return Err(einval!( + "blob_cache: `config.cache_config.work_dir` is not a directory" + )); + } + } else { + return Err(einval!("blob_cache: unknown cache type")); } let config: Arc = Arc::new(config.into()); @@ -347,12 +383,19 @@ impl BlobCacheMgr { config: Arc, ) -> Result<()> { let (rs, _) = RafsSuper::load_from_file(&path, config.clone(), true, false)?; - let meta = BlobCacheObjectConfig::new_meta_blob( + if rs.meta.is_v5() { + return Err(einval!("blob_cache: RAFSv5 image is not supported")); + } + + let blob_extra_infos = rs.superblock.get_blob_extra_infos()?; + let meta = BlobConfig::new_meta_blob( domain_id.to_string(), id.to_string(), path, config, + blob_extra_infos, ); + // Safe to unwrap because it's a meta blob object. let meta_obj = meta.meta_config().unwrap(); let mut state = self.get_state(); state.try_add(meta)?; @@ -364,13 +407,10 @@ impl BlobCacheMgr { &bi.blob_id(), domain_id ); - let data_blob = BlobCacheObjectConfig::new_data_blob( - domain_id.to_string(), - bi, - meta_obj.config.clone(), - ); + let data_blob = + BlobConfig::new_data_blob(domain_id.to_string(), bi, meta_obj.config.clone()); let data_blob_config = match &data_blob { - BlobCacheObjectConfig::DataBlob(entry) => entry.clone(), + BlobConfig::DataBlob(entry) => entry.clone(), _ => panic!("blob_cache: internal error"), }; @@ -392,6 +432,115 @@ impl BlobCacheMgr { } } +/// Structure representing a cached metadata blob. +pub struct MetaBlob { + file: File, + size: u64, +} + +impl MetaBlob { + /// Create a new [MetaBlob] object from + pub fn new>(path: P) -> Result { + let file = OpenOptions::new() + .read(true) + .write(false) + .open(path.as_ref()) + .map_err(|e| { + warn!( + "blob_cache: failed to open metadata blob {}", + path.as_ref().display() + ); + e + })?; + let md = file.metadata().map_err(|e| { + warn!( + "blob_cache: failed to get metadata about metadata blob {}", + path.as_ref().display() + ); + e + })?; + let size = md.len(); + if size % EROFS_BLOCK_SIZE != 0 || (size >> EROFS_BLOCK_BITS) > u32::MAX as u64 { + return Err(einval!(format!( + "blob_cache: metadata blob size (0x{:x}) is invalid", + size + ))); + } + + Ok(MetaBlob { + file: File::from_std(file), + size, + }) + } + + /// Get number of blocks in unit of EROFS_BLOCK_SIZE. + pub fn blocks(&self) -> u32 { + (self.size >> EROFS_BLOCK_BITS) as u32 + } + + /// Read data from the cached metadata blob in asynchronous mode. + pub async fn async_read(&self, pos: u64, buf: T) -> (Result, T) { + self.file.read_at(buf, pos).await + } +} + +/// Structure representing a cached data blob. +pub struct DataBlob { + blob_id: String, + blob: Arc, + file: File, +} + +impl DataBlob { + /// Create a new instance of [DataBlob]. + pub fn new(config: &Arc) -> Result { + let blob_id = config.blob_info().blob_id(); + let blob = BLOB_FACTORY + .new_blob_cache(config.config_v2(), &config.blob_info) + .map_err(|e| { + warn!( + "blob_cache: failed to create cache object for blob {}", + blob_id + ); + e + })?; + + match blob.get_blob_object() { + Some(obj) => { + let fd = nix::unistd::dup(obj.as_raw_fd())?; + // Safe because the `fd` is valid. + let file = unsafe { File::from_raw_fd(fd) }; + Ok(DataBlob { + blob_id, + blob, + file, + }) + } + None => Err(eio!(format!( + "blob_cache: failed to get BlobObject for blob {}", + blob_id + ))), + } + } + + /// Read data from the cached data blob in asynchronous mode. + pub async fn async_read(&self, pos: u64, buf: T) -> (Result, T) { + match self.blob.get_blob_object() { + Some(obj) => match obj.fetch_range_uncompressed(pos, buf.bytes_total() as u64) { + Ok(_) => self.file.read_at(buf, pos).await, + Err(e) => (Err(e), buf), + }, + None => ( + Err(eio!(format!( + "blob_cache: failed to get BlobObject for blob {}", + self.blob_id + ))), + buf, + ), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -456,12 +605,13 @@ mod tests { assert_eq!(&backend_cfg.backend_type, "localfs"); assert_eq!(&cache_cfg.cache_type, "fscache"); - let blob = BlobCacheConfigMetaBlob { - _blob_id: "123456789-123".to_string(), + let blob = MetaBlobConfig { + blob_id: "123456789-123".to_string(), scoped_blob_id: "domain1".to_string(), path: path.clone(), config, - data_blobs: Mutex::new(Vec::new()), + blobs: Mutex::new(Vec::new()), + blob_extra_infos: HashMap::new(), }; assert_eq!(blob.path(), &path); } @@ -540,12 +690,12 @@ mod tests { let tmpdir = TempDir::new().unwrap(); let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); let mut source_path = PathBuf::from(root_dir); - source_path.push("../tests/texture/bootstrap/rafs-v5.boot"); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); let config = r#" { "type": "bootstrap", - "id": "rafs-v5", + "id": "rafs-v6", "domain_id": "domain2", "config_v2": { "version": 2, @@ -579,35 +729,56 @@ mod tests { // Check existence of data blob referenced by the bootstrap. let key = generate_blob_key( &entry.domain_id, - "7fe907a0c9c7f35538f23f40baae5f2e8d148a3a6186f0f443f62d04b5e2d731", + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", ); assert!(mgr.get_config(&key).is_some()); - assert_eq!(mgr.get_state().id_to_config_map.len(), 19); + assert_eq!(mgr.get_state().id_to_config_map.len(), 2); - entry.blob_id = "rafs-v5-cloned".to_string(); + entry.blob_id = "rafs-v6-cloned".to_string(); let blob_id_cloned = generate_blob_key(&entry.domain_id, &entry.blob_id); mgr.add_blob_entry(&entry).unwrap(); - assert_eq!(mgr.get_state().id_to_config_map.len(), 20); + assert_eq!(mgr.get_state().id_to_config_map.len(), 3); assert!(mgr.get_config(&blob_id).is_some()); assert!(mgr.get_config(&blob_id_cloned).is_some()); mgr.remove_blob_entry(&BlobCacheObjectId { domain_id: entry.domain_id.clone(), - blob_id: "rafs-v5".to_string(), + blob_id: "rafs-v6".to_string(), }) .unwrap(); - assert_eq!(mgr.get_state().id_to_config_map.len(), 19); + assert_eq!(mgr.get_state().id_to_config_map.len(), 2); assert!(mgr.get_config(&blob_id).is_none()); assert!(mgr.get_config(&blob_id_cloned).is_some()); mgr.remove_blob_entry(&BlobCacheObjectId { domain_id: entry.domain_id, - blob_id: "rafs-v5-cloned".to_string(), + blob_id: "rafs-v6-cloned".to_string(), }) .unwrap(); assert_eq!(mgr.get_state().id_to_config_map.len(), 0); assert!(mgr.get_config(&blob_id).is_none()); assert!(mgr.get_config(&blob_id_cloned).is_none()); } + + #[test] + fn test_meta_blob() { + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + + tokio_uring::start(async move { + let meta_blob = MetaBlob::new(&source_path).unwrap(); + assert_eq!(meta_blob.blocks(), 5); + let buf = vec![0u8; 4096]; + let (res, buf) = meta_blob.async_read(0, buf).await; + assert_eq!(res.unwrap(), 4096); + assert_eq!(buf[0], 0); + assert_eq!(buf[1023], 0); + assert_eq!(buf[1024], 0xe2); + assert_eq!(buf[1027], 0xe0); + let (res, _buf) = meta_blob.async_read(0x6000, buf).await; + assert_eq!(res.unwrap(), 0); + }); + } } diff --git a/service/src/block_device.rs b/service/src/block_device.rs new file mode 100644 index 00000000000..ae099f5c465 --- /dev/null +++ b/service/src/block_device.rs @@ -0,0 +1,338 @@ +// Copyright (C) 2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: (Apache-2.0) + +//! Represent a RAFSv6 image as a block device. +//! +//! Metadata of RAFSv6 image has two address encoding schemes: +//! - blob address: data is located by (blob_index, chunk_index) +//! - block address: data is located by (block_addr) +//! +//! Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block +//! device, so it can be directly mounted by Linux EROFS fs driver. + +use std::cmp::min; +use std::io::Result; +use std::sync::Arc; + +use dbs_allocator::{Constraint, IntervalTree, NodeState, Range}; +use nydus_rafs::metadata::layout::v6::EROFS_BLOCK_BITS; +use tokio_uring::buf::IoBufMut; + +use crate::blob_cache::{BlobCacheMgr, BlobConfig, DataBlob, MetaBlob}; + +enum BlockRange { + Hole, + MetaBlob(Arc), + DataBlob(Arc), +} + +/// A block device composed up from a RAFSv6 image. +/// +/// RAFSv6 metadata has two encoding schemes: +/// - blob address: data is located by (blob_index, chunk_index) +/// - block address: data is located by (block_addr) +/// +/// Based on the block address scheme, an RAFSv6 image can be converted into/represented as a block +/// device, so it can be directly mounted by Linux EROFS fs driver. +pub struct BlockDevice { + blocks: u32, + blob_id: String, + cache_mgr: Arc, + ranges: IntervalTree, +} + +impl BlockDevice { + /// Create a new instance of [BlockDevice]. + pub fn new(blob_id: String, cache_mgr: Arc) -> Result { + let mut ranges = IntervalTree::new(); + ranges.insert(Range::new(0, u32::MAX), None); + + let meta_blob_config = match cache_mgr.get_config(&blob_id) { + None => { + return Err(enoent!(format!( + "block_device: can not find blob {} in blob cache manager", + blob_id + ))) + } + Some(BlobConfig::DataBlob(_v)) => { + return Err(einval!(format!( + "block_device: blob {} is not a metadata blob", + blob_id + ))) + } + Some(BlobConfig::MetaBlob(v)) => v, + }; + let meta_blob = MetaBlob::new(meta_blob_config.path())?; + let meta_blob = Arc::new(meta_blob); + let constraint = Constraint::new(meta_blob.blocks()) + .min(0u32) + .max(meta_blob.blocks() - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!(format!( + "block_device: failed to allocate address range for meta blob {}", + meta_blob_config.blob_id() + )) + })?; + ranges.update(&range, BlockRange::MetaBlob(meta_blob.clone())); + + let mut pos = meta_blob.blocks(); + let data_blobs = meta_blob_config.get_blobs(); + for blob in data_blobs.iter() { + let blob_info = blob.blob_info(); + let blob_id = blob_info.blob_id(); + let extra_info = meta_blob_config + .get_blob_extra_info(&blob_id) + .ok_or_else(|| { + let msg = format!( + "block_device: can not get extra information for blob {}", + blob_id + ); + enoent!(msg) + })?; + if extra_info.mapped_blkaddr == 0 { + let msg = format!( + "block_device: mapped block address for blob {} is zero", + blob_id + ); + return Err(einval!(msg)); + } + + if pos < extra_info.mapped_blkaddr { + let constraint = Constraint::new(extra_info.mapped_blkaddr - pos) + .min(pos) + .max(extra_info.mapped_blkaddr - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!("block_device: failed to allocate address range for hole between blobs") + })?; + ranges.update(&range, BlockRange::Hole); + } + + let blocks = blob_info.uncompressed_size() >> EROFS_BLOCK_BITS; + if blocks > u32::MAX as u64 + || blocks + extra_info.mapped_blkaddr as u64 > u32::MAX as u64 + { + return Err(einval!(format!( + "block_device: uncompressed size 0x{:x} of blob {} is invalid", + blob_info.uncompressed_size(), + blob_info.blob_id() + ))); + } + let data_blob = DataBlob::new(blob)?; + let constraint = Constraint::new(blocks as u32) + .min(extra_info.mapped_blkaddr) + .max(extra_info.mapped_blkaddr + blocks as u32 - 1); + let range = ranges.allocate(&constraint).ok_or_else(|| { + enoent!(format!( + "block_device: can not allocate address range for blob {}", + blob_info.blob_id() + )) + })?; + ranges.update(&range, BlockRange::DataBlob(Arc::new(data_blob))); + pos = extra_info.mapped_blkaddr + blocks as u32; + } + + Ok(BlockDevice { + blocks: pos, + blob_id, + cache_mgr, + ranges, + }) + } + + /// Get blob id of the metadata blob. + pub fn meta_blob_id(&self) -> &str { + &self.blob_id + } + + /// Get the [BlobCacheMgr](../blob_cache/struct.BlobCacheMgr.html) associated with the block device. + pub fn cache_mgr(&self) -> Arc { + self.cache_mgr.clone() + } + + /// Get number of blocks of the block device. + pub fn blocks(&self) -> u32 { + self.blocks + } + + /// Read block range [start, start + blocks) from the block device. + pub async fn async_read( + &self, + mut start: u32, + mut blocks: u32, + mut buf: T, + ) -> (Result, T) { + if start.checked_add(blocks).is_none() + || (blocks as u64) << EROFS_BLOCK_BITS > buf.bytes_total() as u64 + { + return ( + Err(einval!("block_device: invalid parameters to read()")), + buf, + ); + } + + let total_size = (blocks as usize) << EROFS_BLOCK_BITS; + let mut pos = 0; + while blocks > 0 { + let (range, node) = match self.ranges.get_superset(&Range::new_point(start)) { + Some(v) => v, + None => { + return ( + Err(eio!(format!( + "block_device: can not locate block 0x{:x} for meta blob {}", + start, self.blob_id + ))), + buf, + ); + } + }; + + if let NodeState::Valued(r) = node { + let count = min(range.max as u32 - start + 1, blocks); + let sz = (count as usize) << EROFS_BLOCK_BITS as usize; + let mut s = buf.slice(pos..pos + sz); + let (res, s) = match r { + BlockRange::Hole => { + s.fill(0); + (Ok(sz), s) + } + BlockRange::MetaBlob(m) => { + m.async_read((start as u64) << EROFS_BLOCK_BITS, s).await + } + BlockRange::DataBlob(d) => { + let offset = start as u64 - range.min; + d.async_read(offset << EROFS_BLOCK_BITS, s).await + } + }; + + buf = s.into_inner(); + if res.is_err() { + return (res, buf); + } + start += count; + blocks -= count; + pos += sz; + } else { + return ( + Err(eio!(format!( + "block_device: block range 0x{:x}/0x{:x} of meta blob {} is unhandled", + start, blocks, self.blob_id, + ))), + buf, + ); + } + } + + (Ok(total_size), buf) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blob_cache::generate_blob_key; + use nydus_api::BlobCacheEntry; + use std::fs; + use std::path::PathBuf; + use vmm_sys_util::tempdir::TempDir; + + #[test] + fn test_block_device() { + let tmpdir = TempDir::new().unwrap(); + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + let mut dest_path = tmpdir.as_path().to_path_buf(); + dest_path.push("be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + fs::copy(&source_path, &dest_path).unwrap(); + + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + let config = r#" + { + "type": "bootstrap", + "id": "rafs-v6", + "domain_id": "domain2", + "config_v2": { + "version": 2, + "id": "factory1", + "backend": { + "type": "localfs", + "localfs": { + "dir": "/tmp/nydus" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "/tmp/nydus" + } + }, + "metadata_path": "RAFS_V5" + } + }"#; + let content = config + .replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap()) + .replace("RAFS_V5", &source_path.display().to_string()); + let mut entry: BlobCacheEntry = serde_json::from_str(&content).unwrap(); + assert!(entry.prepare_configuration_info()); + + let mgr = BlobCacheMgr::new(); + mgr.add_blob_entry(&entry).unwrap(); + let blob_id = generate_blob_key(&entry.domain_id, &entry.blob_id); + assert!(mgr.get_config(&blob_id).is_some()); + + // Check existence of data blob referenced by the bootstrap. + let key = generate_blob_key( + &entry.domain_id, + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", + ); + assert!(mgr.get_config(&key).is_some()); + + let mgr = Arc::new(mgr); + let device = BlockDevice::new(blob_id.clone(), mgr).unwrap(); + assert_eq!(device.blocks(), 0x209); + + tokio_uring::start(async move { + let buf = vec![0u8; 8192]; + let (res, buf) = device.async_read(u32::MAX, u32::MAX, buf).await; + assert!(res.is_err()); + assert_eq!(buf.len(), 8192); + let (res, _buf) = device.async_read(0, 1, vec![0u8]).await; + assert!(res.is_err()); + + let (res, buf) = device.async_read(0, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 4096); + assert_eq!(buf[0], 0); + assert_eq!(buf[1023], 0); + assert_eq!(buf[1024], 0xe2); + assert_eq!(buf[1027], 0xe0); + + let (res, buf) = device.async_read(4, 2, buf).await; + assert_eq!(res.unwrap(), 8192); + assert_eq!(buf[4096], 0); + assert_eq!(buf[5119], 0); + assert_eq!(buf[5120], 0); + assert_eq!(buf[5123], 0); + assert_eq!(buf[5372], 0); + assert_eq!(buf[8191], 0); + + let (res, buf) = device.async_read(0x200, 2, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 8192); + + let (res, buf) = device.async_read(0x208, 2, buf).await; + assert_eq!(buf.len(), 8192); + assert!(res.is_err()); + + let (res, buf) = device.async_read(0x208, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert_eq!(res.unwrap(), 4096); + + let (res, buf) = device.async_read(0x209, 1, buf).await; + assert_eq!(buf.len(), 8192); + assert!(res.is_err()); + }); + } +} diff --git a/service/src/block_nbd.rs b/service/src/block_nbd.rs new file mode 100644 index 00000000000..63ca6f7709a --- /dev/null +++ b/service/src/block_nbd.rs @@ -0,0 +1,623 @@ +// Copyright (C) 2023 Alibaba Cloud. All rights reserved. +// +// SPDX-License-Identifier: (Apache-2.0) + +//! Export a RAFSv6 image as a block device through NBD(Network Block Device) protocol. +//! +//! The [Network Block Device](https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md) +//! is a Linux-originated lightweight block access protocol that allows one to export a block device +//! to a client. RAFSv6 images have an block address based encoding, so an RAFSv6 image can be +//! exposed as a block device. The [NbdService] exposes a RAFSv6 image as a block device based on +//! the Linux Network Block Device driver. + +use std::any::Any; +use std::fs::{self, OpenOptions}; +use std::io::{Error, Result}; +use std::os::fd::{AsRawFd, FromRawFd, RawFd}; +use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread::JoinHandle; + +use bytes::{Buf, BufMut}; +use mio::Waker; +use nydus_api::{BlobCacheEntry, BuildTimeInfo}; +use nydus_rafs::metadata::layout::v6::{EROFS_BLOCK_BITS, EROFS_BLOCK_SIZE}; +use nydus_storage::utils::alloc_buf; +use tokio::sync::broadcast::{channel, Sender}; +use tokio_uring::buf::IoBuf; +use tokio_uring::net::UnixStream; + +use crate::blob_cache::{generate_blob_key, BlobCacheMgr}; +use crate::block_device::BlockDevice; +use crate::daemon::{ + DaemonState, DaemonStateMachineContext, DaemonStateMachineInput, DaemonStateMachineSubscriber, + NydusDaemon, +}; +use crate::{Error as NydusError, Result as NydusResult}; + +const NBD_SET_SOCK: u32 = 0; +const NBD_SET_BLOCK_SIZE: u32 = 1; +const NBD_DO_IT: u32 = 3; +const NBD_CLEAR_SOCK: u32 = 4; +const NBD_SET_BLOCKS: u32 = 7; +//const NBD_DISCONNECT: u32 = 8; +const NBD_SET_TIMEOUT: u32 = 9; +const NBD_SET_FLAGS: u32 = 10; +const NBD_FLAG_HAS_FLAGS: u32 = 0x1; +const NBD_FLAG_READ_ONLY: u32 = 0x2; +const NBD_FLAG_CAN_MULTI_CONN: u32 = 0x100; +const NBD_CMD_READ: u32 = 0; +const NBD_CMD_DISC: u32 = 2; +const NBD_REQUEST_HEADER_SIZE: usize = 28; +const NBD_REQUEST_MAGIC: u32 = 0x25609513; +const NBD_REPLY_MAGIC: u32 = 0x67446698; +const NBD_OK: u32 = 0; +const NBD_EIO: u32 = 5; +const NBD_EINVAL: u32 = 22; + +fn nbd_ioctl(fd: RawFd, cmd: u32, arg: u64) -> nix::Result { + let code = nix::request_code_none!(0xab, cmd); + unsafe { nix::convert_ioctl_res!(libc::ioctl(fd, code, arg)) } +} + +/// Network Block Device server to expose RAFSv6 images as block devices. +pub struct NbdService { + active: Arc, + blob_id: String, + cache_mgr: Arc, + nbd_dev: fs::File, + sender: Arc>, +} + +impl NbdService { + /// Create a new instance of [NbdService] to expose a RAFSv6 image as a block device. + /// + /// It opens the NBD device at `nbd_path` and initialize it according to information from + /// the block device composed from a RAFSv6 image. The caller needs to ensure that the NBD + /// device is available. + pub fn new(device: Arc, nbd_path: String) -> Result { + // Initialize the NBD device: set block size, block count and flags. + let nbd_dev = OpenOptions::new() + .read(true) + .write(true) + .open(&nbd_path) + .map_err(|e| { + error!("block_nbd: failed to open NBD device {}", nbd_path); + e + })?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_BLOCK_SIZE, EROFS_BLOCK_SIZE)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_BLOCKS, device.blocks() as u64)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_SET_TIMEOUT, 60)?; + nbd_ioctl(nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0)?; + nbd_ioctl( + nbd_dev.as_raw_fd(), + NBD_SET_FLAGS, + (NBD_FLAG_HAS_FLAGS | NBD_FLAG_READ_ONLY | NBD_FLAG_CAN_MULTI_CONN) as u64, + )?; + + let (sender, _receiver) = channel(4); + + Ok(NbdService { + active: Arc::new(AtomicBool::new(true)), + blob_id: device.meta_blob_id().to_string(), + cache_mgr: device.cache_mgr().clone(), + nbd_dev, + sender: Arc::new(sender), + }) + } + + /// Create a [NbdWoker] to run the event loop to handle NBD requests from kernel. + pub fn create_worker(&self) -> Result { + // Let the NBD driver go. + let (sock1, sock2) = std::os::unix::net::UnixStream::pair()?; + nbd_ioctl( + self.nbd_dev.as_raw_fd(), + NBD_SET_SOCK, + sock1.as_raw_fd() as u64, + )?; + + Ok(NbdWorker { + active: self.active.clone(), + blob_id: self.blob_id.clone(), + cache_mgr: self.cache_mgr.clone(), + _sock_kern: sock1, + sock_user: sock2, + sender: self.sender.clone(), + }) + } + + /// Run the event loop to handle incoming NBD requests. + /// + /// The caller will get blocked until the NBD device get destroyed or `NbdService::stop()` get + /// called. + pub fn run(&self) -> Result<()> { + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_DO_IT, 0); + self.active.store(false, Ordering::Release); + let _ = self.sender.send(1); + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0); + + Ok(()) + } + + /// Shutdown the NBD session and send exit notification to workers. + pub fn stop(&self) { + self.active.store(false, Ordering::Release); + let _ = self.sender.send(0); + //let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_DISCONNECT, 0); + let _ = nbd_ioctl(self.nbd_dev.as_raw_fd(), NBD_CLEAR_SOCK, 0); + } +} + +/// A worker to handle NBD requests in asynchronous mode. +pub struct NbdWorker { + active: Arc, + blob_id: String, + cache_mgr: Arc, + _sock_kern: std::os::unix::net::UnixStream, + sock_user: std::os::unix::net::UnixStream, + sender: Arc>, +} + +impl NbdWorker { + /// Run the event loop to handle NBD requests from kernel in asynchronous mode. + pub async fn run(self) { + let device = match BlockDevice::new(self.blob_id.clone(), self.cache_mgr.clone()) { + Ok(v) => v, + Err(e) => { + error!( + "block_nbd: failed to create block device for {}, {}", + self.blob_id, e + ); + return; + } + }; + + // Safe because the RawFd is valid during the lifetime of run(). + let mut sock = unsafe { UnixStream::from_raw_fd(self.sock_user.as_raw_fd()) }; + let mut receiver = self.sender.subscribe(); + let mut buf = vec![0u8; NBD_REQUEST_HEADER_SIZE]; + let mut pos = 0; + + while self.active.load(Ordering::Acquire) { + tokio::select! { + (res, s) = sock.read(buf.slice(pos..)) => { + match res { + Err(e) => { + warn!("block_nbd: failed to get request from kernel for {}, {}", self.blob_id, e); + break; + } + Ok(sz) => { + buf = s.into_inner(); + pos += sz; + if pos == NBD_REQUEST_HEADER_SIZE { + match self.handle_request(&buf, &mut sock, &device).await { + Ok(true) => {} + Ok(false) => break, + Err(e) => { + warn!("block_nbd: failed to handle request for {}, {}", self.blob_id, e); + break; + } + } + pos = 0; + } + } + } + } + _ = receiver.recv() => { + break; + } + } + } + } + + async fn handle_request( + &self, + mut request: &[u8], + sock: &mut UnixStream, + device: &BlockDevice, + ) -> Result { + let magic = request.get_u32(); + let ty = request.get_u32(); + let handle = request.get_u64(); + let pos = request.get_u64(); + let len = request.get_u32(); + + let mut code = NBD_OK; + let mut data_buf = alloc_buf(len as usize); + if magic != NBD_REQUEST_MAGIC + || pos % EROFS_BLOCK_SIZE != 0 + || len as u64 % EROFS_BLOCK_SIZE != 0 + { + warn!( + "block_nbd: invalid request magic 0x{:x}, type {}, pos 0x{:x}, len 0x{:x}", + magic, ty, pos, len + ); + code = NBD_EINVAL; + } else if ty == NBD_CMD_READ { + let start = (pos >> EROFS_BLOCK_BITS) as u32; + let count = len >> EROFS_BLOCK_BITS; + let (res, buf) = device.async_read(start, count, data_buf).await; + data_buf = buf; + match res { + Ok(sz) => { + if sz != len as usize { + warn!("block_nbd: got 0x{:x} bytes, expect 0x{:x}", sz, len); + code = NBD_EIO; + } + } + Err(e) => { + warn!("block_nbd: failed to read data from block device, {}", e); + code = NBD_EIO; + } + } + } else if ty == NBD_CMD_DISC { + return Ok(false); + } + + let mut reply = Vec::with_capacity(16); + reply.put_u32(NBD_REPLY_MAGIC); + reply.put_u32(code); + reply.put_u64(handle); + assert_eq!(reply.len(), 16); + assert_eq!(data_buf.len(), len as usize); + sock.write_all(reply).await.0?; + if code == NBD_OK { + sock.write_all(data_buf).await.0?; + } + + Ok(true) + } +} + +/// A [NydusDaemon] implementation to expose RAFS v6 images as block devices through NBD. +pub struct NbdDaemon { + cache_mgr: Arc, + service: Arc, + + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + + nbd_threads: u32, + nbd_control_thread: Mutex>>, + nbd_service_threads: Mutex>>>, + request_sender: Arc>>, + result_receiver: Mutex>>, + state: AtomicI32, + state_machine_thread: Mutex>>>, + waker: Arc, +} + +impl NbdDaemon { + fn new( + nbd_path: String, + threads: u32, + blob_entry: BlobCacheEntry, + trigger: std::sync::mpsc::Sender, + receiver: std::sync::mpsc::Receiver>, + waker: Arc, + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + ) -> Result { + let blob_id = generate_blob_key(&blob_entry.domain_id, &blob_entry.blob_id); + let cache_mgr = Arc::new(BlobCacheMgr::new()); + cache_mgr.add_blob_entry(&blob_entry)?; + let block_device = BlockDevice::new(blob_id.clone(), cache_mgr.clone())?; + let nbd_service = NbdService::new(Arc::new(block_device), nbd_path)?; + + Ok(NbdDaemon { + cache_mgr, + service: Arc::new(nbd_service), + + bti, + id, + supervisor, + + nbd_threads: threads, + nbd_control_thread: Mutex::new(None), + nbd_service_threads: Mutex::new(Vec::new()), + state: AtomicI32::new(DaemonState::INIT as i32), + request_sender: Arc::new(Mutex::new(trigger)), + result_receiver: Mutex::new(receiver), + state_machine_thread: Mutex::new(None), + waker, + }) + } +} + +impl DaemonStateMachineSubscriber for NbdDaemon { + fn on_event(&self, event: DaemonStateMachineInput) -> NydusResult<()> { + self.request_sender + .lock() + .expect("block_nbd: failed to lock request sender!") + .send(event) + .map_err(NydusError::ChannelSend)?; + + self.result_receiver + .lock() + .expect("block_nbd: failed to lock result receiver!") + .recv() + .map_err(NydusError::ChannelReceive)? + } +} + +impl NydusDaemon for NbdDaemon { + fn as_any(&self) -> &dyn Any { + self + } + + fn id(&self) -> Option { + self.id.clone() + } + + fn version(&self) -> BuildTimeInfo { + self.bti.clone() + } + + fn get_state(&self) -> DaemonState { + self.state.load(Ordering::Relaxed).into() + } + + fn set_state(&self, state: DaemonState) { + self.state.store(state as i32, Ordering::Relaxed); + } + + fn start(&self) -> NydusResult<()> { + info!("start NBD service with {} worker threads", self.nbd_threads); + for _ in 0..self.nbd_threads { + let waker = self.waker.clone(); + let worker = self + .service + .create_worker() + .map_err(|e| NydusError::StartService(format!("{}", e)))?; + let thread = std::thread::Builder::new() + .name("nbd_worker".to_string()) + .spawn(move || { + tokio_uring::start(async move { + worker.run().await; + // Notify the daemon controller that one working thread has exited. + if let Err(err) = waker.wake() { + error!("block_nbd: fail to exit daemon, error: {:?}", err); + } + }); + Ok(()) + }) + .map_err(NydusError::ThreadSpawn)?; + self.nbd_service_threads.lock().unwrap().push(thread); + } + + let nbd = self.service.clone(); + let thread = std::thread::spawn(move || { + if let Err(e) = nbd.run() { + error!("block_nbd: failed to run NBD control loop, {e}"); + } + }); + *self.nbd_control_thread.lock().unwrap() = Some(thread); + + Ok(()) + } + + fn umount(&self) -> NydusResult<()> { + Ok(()) + } + + fn stop(&self) { + self.service.stop(); + } + + fn wait(&self) -> NydusResult<()> { + self.wait_state_machine()?; + self.wait_service() + } + + fn wait_service(&self) -> NydusResult<()> { + loop { + let handle = self.nbd_service_threads.lock().unwrap().pop(); + if let Some(handle) = handle { + handle + .join() + .map_err(|e| { + let e = *e + .downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))); + NydusError::WaitDaemon(e) + })? + .map_err(NydusError::WaitDaemon)?; + } else { + // No more handles to wait + break; + } + } + + Ok(()) + } + + fn wait_state_machine(&self) -> NydusResult<()> { + let mut guard = self.state_machine_thread.lock().unwrap(); + if let Some(handler) = guard.take() { + let result = handler.join().map_err(|e| { + let e = *e + .downcast::() + .unwrap_or_else(|e| Box::new(eother!(e))); + NydusError::WaitDaemon(e) + })?; + result.map_err(NydusError::WaitDaemon) + } else { + Ok(()) + } + } + + fn supervisor(&self) -> Option { + self.supervisor.clone() + } + + fn save(&self) -> NydusResult<()> { + unimplemented!() + } + + fn restore(&self) -> NydusResult<()> { + unimplemented!() + } + + fn get_blob_cache_mgr(&self) -> Option> { + Some(self.cache_mgr.clone()) + } +} + +/// Create and start a [NbdDaemon] instance to expose a RAFS v6 image as a block device through NBD. +#[allow(clippy::too_many_arguments)] +pub fn create_nbd_daemon( + device: String, + threads: u32, + blob_entry: BlobCacheEntry, + bti: BuildTimeInfo, + id: Option, + supervisor: Option, + waker: Arc, +) -> Result> { + let (trigger, events_rx) = std::sync::mpsc::channel::(); + let (result_sender, result_receiver) = std::sync::mpsc::channel::>(); + let daemon = NbdDaemon::new( + device, + threads, + blob_entry, + trigger, + result_receiver, + waker, + bti, + id, + supervisor, + )?; + let daemon = Arc::new(daemon); + let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender); + let machine_thread = machine.kick_state_machine()?; + *daemon.state_machine_thread.lock().unwrap() = Some(machine_thread); + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + + /* + // TODO: support crash recover and hot-upgrade. + // Without api socket, nydusd can't do neither live-upgrade nor failover, so the helper + // finding a victim is not necessary. + if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(&mnt, api_sock.as_ref().unwrap())?) + || api_sock.is_none() + { + if let Some(cmd) = mount_cmd { + daemon.service.mount(cmd)?; + } + daemon + .service + .session + .lock() + .unwrap() + .mount() + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Mount) + .map_err(|e| eother!(e))?; + daemon + .on_event(DaemonStateMachineInput::Start) + .map_err(|e| eother!(e))?; + daemon + .service + .conn + .store(calc_fuse_conn(mnt)?, Ordering::Relaxed); + } + */ + + Ok(daemon) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::blob_cache::{generate_blob_key, BlobCacheMgr}; + use nydus_api::BlobCacheEntry; + use std::path::PathBuf; + use std::time::Duration; + use vmm_sys_util::tempdir::TempDir; + + fn create_block_device(tmpdir: PathBuf) -> Result> { + let root_dir = &std::env::var("CARGO_MANIFEST_DIR").expect("$CARGO_MANIFEST_DIR"); + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + let mut dest_path = tmpdir.clone(); + dest_path.push("be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef"); + fs::copy(&source_path, &dest_path).unwrap(); + + let mut source_path = PathBuf::from(root_dir); + source_path.push("../tests/texture/bootstrap/rafs-v6-2.2.boot"); + let config = r#" + { + "type": "bootstrap", + "id": "rafs-v6", + "domain_id": "domain2", + "config_v2": { + "version": 2, + "id": "factory1", + "backend": { + "type": "localfs", + "localfs": { + "dir": "/tmp/nydus" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "/tmp/nydus" + } + }, + "metadata_path": "RAFS_V5" + } + }"#; + let content = config + .replace("/tmp/nydus", tmpdir.as_path().to_str().unwrap()) + .replace("RAFS_V5", &source_path.display().to_string()); + let mut entry: BlobCacheEntry = serde_json::from_str(&content).unwrap(); + assert!(entry.prepare_configuration_info()); + + let mgr = BlobCacheMgr::new(); + mgr.add_blob_entry(&entry).unwrap(); + let blob_id = generate_blob_key(&entry.domain_id, &entry.blob_id); + assert!(mgr.get_config(&blob_id).is_some()); + + // Check existence of data blob referenced by the bootstrap. + let key = generate_blob_key( + &entry.domain_id, + "be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef", + ); + assert!(mgr.get_config(&key).is_some()); + + let mgr = Arc::new(mgr); + let device = BlockDevice::new(blob_id.clone(), mgr).unwrap(); + + Ok(Arc::new(device)) + } + + #[ignore] + #[test] + fn test_nbd_device() { + tokio_uring::start(async { + let tmpdir = TempDir::new().unwrap(); + let device = create_block_device(tmpdir.as_path().to_path_buf()).unwrap(); + let nbd = NbdService::new(device, "/dev/nbd15".to_string()).unwrap(); + let nbd = Arc::new(nbd); + let nbd2 = nbd.clone(); + let worker1 = nbd.create_worker().unwrap(); + let worker2 = nbd.create_worker().unwrap(); + + tokio_uring::spawn(async move { worker1.run().await }); + tokio_uring::spawn(async move { worker2.run().await }); + std::thread::spawn(move || { + nbd2.run().unwrap(); + }); + tokio::time::sleep(Duration::from_micros(100000)).await; + nbd.stop(); + }) + } +} diff --git a/service/src/daemon.rs b/service/src/daemon.rs index 26d11bb6338..c4149f2affb 100644 --- a/service/src/daemon.rs +++ b/service/src/daemon.rs @@ -173,7 +173,9 @@ pub trait NydusDaemon: DaemonStateMachineSubscriber + Send + Sync { // For backward compatibility. /// Set default filesystem service object. - fn get_default_fs_service(&self) -> Option>; + fn get_default_fs_service(&self) -> Option> { + None + } /// Get the optional `BlobCacheMgr` object. fn get_blob_cache_mgr(&self) -> Option> { diff --git a/service/src/fs_cache.rs b/service/src/fs_cache.rs index b70f9acaa57..a98fb1c5cf3 100644 --- a/service/src/fs_cache.rs +++ b/service/src/fs_cache.rs @@ -2,7 +2,15 @@ // // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) -//! FsCache manager to cooperate with the Linux fscache subsystem to support EROFS. +//! Handler to expose RAFSv6 image through EROFS/fscache. +//! +//! The [`FsCacheHandler`] is the inter-connection between in kernel EROFS/fscache drivers +//! and the user space [BlobCacheMgr](https://docs.rs/nydus-service/latest/nydus_service/blob_cache/struct.BlobCacheMgr.html). +//! The workflow is as below: +//! - EROFS presents a filesystem structure by parsing a RAFS image metadata blob. +//! - EROFS sends requests to the fscache subsystem when user reads data from files. +//! - Fscache subsystem send requests to [FsCacheHandler] if the requested data has been cached yet. +//! - [FsCacheHandler] reads blob data from the [BlobCacheMgr] and sends back reply messages. use std::collections::hash_map::Entry::Vacant; use std::collections::HashMap; @@ -25,8 +33,7 @@ use nydus_storage::device::BlobPrefetchRequest; use nydus_storage::factory::{ASYNC_RUNTIME, BLOB_FACTORY}; use crate::blob_cache::{ - generate_blob_key, BlobCacheConfigDataBlob, BlobCacheConfigMetaBlob, BlobCacheMgr, - BlobCacheObjectConfig, + generate_blob_key, BlobCacheMgr, BlobConfig, DataBlobConfig, MetaBlobConfig, }; nix::ioctl_write_int!(fscache_cread, 0x98, 1); @@ -208,7 +215,7 @@ struct FsCacheBootstrap { struct FsCacheBlobCache { cache: Option>, - config: Arc, + config: Arc, file: Arc, } @@ -232,7 +239,7 @@ enum FsCacheObject { #[derive(Default)] struct FsCacheState { id_to_object_map: HashMap, - id_to_config_map: HashMap>, + id_to_config_map: HashMap>, blob_cache_mgr: Arc, } @@ -434,11 +441,11 @@ impl FsCacheHandler { self.reply(&format!("copen {},{}", hdr.msg_id, -libc::ENOENT)); } Some(cfg) => match cfg { - BlobCacheObjectConfig::DataBlob(config) => { + BlobConfig::DataBlob(config) => { let reply = self.handle_open_data_blob(hdr, msg, config); self.reply(&reply); } - BlobCacheObjectConfig::MetaBlob(config) => { + BlobConfig::MetaBlob(config) => { self.handle_open_bootstrap(hdr, msg, config); } }, @@ -449,7 +456,7 @@ impl FsCacheHandler { &self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen, - config: Arc, + config: Arc, ) -> String { let mut state = self.state.lock().unwrap(); if let Vacant(e) = state.id_to_object_map.entry(hdr.object_id) { @@ -501,7 +508,7 @@ impl FsCacheHandler { }); } - fn do_prefetch(cfg: &BlobCacheConfigDataBlob, blob: Arc) -> Result<()> { + fn do_prefetch(cfg: &DataBlobConfig, blob: Arc) -> Result<()> { let blob_info = cfg.blob_info().deref(); let cache_cfg = cfg.config_v2().get_cache_config()?; if !cache_cfg.prefetch.enable { @@ -546,7 +553,7 @@ impl FsCacheHandler { /// the chunk map file will be managed by the userspace daemon. We need to figure out the /// way to share blob/chunkamp files with filecache manager. fn create_data_blob_object( - config: &BlobCacheConfigDataBlob, + config: &DataBlobConfig, file: Arc, ) -> Result> { let mut blob_info = config.blob_info().deref().clone(); @@ -572,7 +579,7 @@ impl FsCacheHandler { &self, hdr: &FsCacheMsgHeader, msg: &FsCacheMsgOpen, - config: Arc, + config: Arc, ) { let path = config.path().display(); let condvar = Arc::new((Mutex::new(false), Condvar::new())); @@ -928,7 +935,7 @@ impl FsCacheHandler { } #[inline] - fn get_config(&self, key: &str) -> Option { + fn get_config(&self, key: &str) -> Option { self.get_state().blob_cache_mgr.get_config(key) } } diff --git a/service/src/lib.rs b/service/src/lib.rs index 4b44d19a391..ceed2de57b5 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -31,6 +31,10 @@ use serde::{Deserialize, Serialize}; use serde_json::Error as SerdeError; pub mod blob_cache; +#[cfg(feature = "block-device")] +pub mod block_device; +#[cfg(feature = "block-nbd")] +pub mod block_nbd; pub mod daemon; #[cfg(target_os = "linux")] mod fs_cache; diff --git a/service/src/singleton.rs b/service/src/singleton.rs index 171552e1f6a..6cfa9c82112 100644 --- a/service/src/singleton.rs +++ b/service/src/singleton.rs @@ -10,7 +10,7 @@ use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Mutex}; use mio::Waker; -use nydus_api::http::BlobCacheList; +use nydus_api::config::BlobCacheList; use nydus_api::BuildTimeInfo; use crate::blob_cache::BlobCacheMgr; diff --git a/src/bin/nydusd/main.rs b/src/bin/nydusd/main.rs index 6a8eeab6988..ea454b187b9 100644 --- a/src/bin/nydusd/main.rs +++ b/src/bin/nydusd/main.rs @@ -285,6 +285,8 @@ fn prepare_commandline_options() -> Command { let cmdline = append_fuse_subcmd_options(cmdline); #[cfg(feature = "virtiofs")] let cmdline = append_virtiofs_subcmd_options(cmdline); + #[cfg(feature = "block-nbd")] + let cmdline = self::nbd::append_nbd_subcmd_options(cmdline); append_singleton_subcmd_options(cmdline) } @@ -590,6 +592,138 @@ fn process_singleton_arguments( Ok(()) } +#[cfg(feature = "block-nbd")] +mod nbd { + use super::*; + use nydus_api::BlobCacheEntry; + use nydus_service::block_nbd::create_nbd_daemon; + use std::str::FromStr; + + pub(super) fn append_nbd_subcmd_options(cmd: Command) -> Command { + let subcmd = Command::new("nbd") + .about("Export a RAFS v6 image as a block device through NBD (Experiment)"); + let subcmd = subcmd + .arg( + Arg::new("DEVICE") + .help("NBD device node to attach the block device") + .required(true) + .num_args(1), + ) + .arg( + Arg::new("bootstrap") + .long("bootstrap") + .short('B') + .help("Path to the RAFS filesystem metadata file") + .requires("localfs-dir") + .conflicts_with("config"), + ) + .arg( + Arg::new("localfs-dir") + .long("localfs-dir") + .requires("bootstrap") + .short('D') + .help( + "Path to the `localfs` working directory, which also enables the `localfs` storage backend" + ) + .conflicts_with("config"), + ) + .arg( + Arg::new("threads") + .long("threads") + .default_value("4") + .help("Number of worker threads to serve NBD requests") + .value_parser(thread_validator) + .required(false), + ); + cmd.subcommand(subcmd) + } + + pub(super) fn process_nbd_service( + args: SubCmdArgs, + bti: BuildTimeInfo, + _apisock: Option<&str>, + ) -> Result<()> { + let mut entry = if let Some(bootstrap) = args.value_of("bootstrap") { + let dir = args.value_of("localfs-dir").ok_or_else(|| { + einval!("option `-D/--localfs-dir` is required by `--boootstrap`") + })?; + let config = r#" + { + "type": "bootstrap", + "id": "disk-default", + "domain_id": "block-nbd", + "config_v2": { + "version": 2, + "id": "block-nbd-factory", + "backend": { + "type": "localfs", + "localfs": { + "dir": "LOCAL_FS_DIR" + } + }, + "cache": { + "type": "filecache", + "filecache": { + "work_dir": "LOCAL_FS_DIR" + } + }, + "metadata_path": "META_FILE_PATH" + } + }"#; + let config = config + .replace("LOCAL_FS_DIR", dir) + .replace("META_FILE_PATH", bootstrap); + BlobCacheEntry::from_str(&config)? + } else if let Some(v) = args.value_of("config") { + BlobCacheEntry::from_file(v)? + } else { + return Err(einval!( + "both option `-C/--config` and `-B/--bootstrap` are missing" + )); + }; + if !entry.prepare_configuration_info() { + return Err(einval!( + "invalid blob cache entry configuration information" + )); + } + if entry.validate() == false { + return Err(einval!( + "invalid blob cache entry configuration information" + )); + } + + // Safe to unwrap because `DEVICE` is mandatory option. + let device = args.value_of("DEVICE").unwrap().to_string(); + let id = args.value_of("id").map(|id| id.to_string()); + let supervisor = args.value_of("supervisor").map(|s| s.to_string()); + let threads: u32 = args + .value_of("threads") + .map(|n| n.parse().unwrap_or(1)) + .unwrap_or(1); + + let daemon = create_nbd_daemon( + device, + threads, + entry, + bti, + id, + supervisor, + DAEMON_CONTROLLER.alloc_waker(), + ) + .map(|d| { + info!("NBD daemon started!"); + d + }) + .map_err(|e| { + error!("Failed in starting NBD daemon: {}", e); + e + })?; + DAEMON_CONTROLLER.set_daemon(daemon); + + Ok(()) + } +} + extern "C" fn sig_exit(_sig: std::os::raw::c_int) { DAEMON_CONTROLLER.shutdown(); } @@ -640,6 +774,13 @@ fn main() -> Result<()> { let subargs = SubCmdArgs::new(&args, subargs); process_fs_service(subargs, bti, apisock, false)?; } + #[cfg(feature = "block-nbd")] + Some("nbd") => { + // Safe to unwrap because the subcommand is `nbd`. + let subargs = args.subcommand_matches("nbd").unwrap(); + let subargs = SubCmdArgs::new(&args, subargs); + self::nbd::process_nbd_service(subargs, bti, apisock)?; + } _ => { let subargs = SubCmdArgs::new(&args, &args); process_fs_service(subargs, bti, apisock, true)?; diff --git a/tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef b/tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef new file mode 100644 index 00000000000..35509f7c751 Binary files /dev/null and b/tests/texture/blobs/be7d77eeb719f70884758d1aa800ed0fb09d701aaec469964e9d54325f0d5fef differ diff --git a/tests/texture/bootstrap/rafs-v6-2.2.boot b/tests/texture/bootstrap/rafs-v6-2.2.boot new file mode 100644 index 00000000000..71932987aa8 Binary files /dev/null and b/tests/texture/bootstrap/rafs-v6-2.2.boot differ