diff --git a/.gitmodules b/.gitmodules index 89eccf618..5060ce20c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,6 @@ [submodule "spdk-rs"] path = spdk-rs - url = https://github.com/openebs/spdk-rs + url = ../spdk-rs.git branch = release/2.7 [submodule "utils/dependencies"] path = utils/dependencies diff --git a/Cargo.lock b/Cargo.lock index ce860f7a1..1bdaedd25 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1592,6 +1592,7 @@ dependencies = [ "crossbeam", "derive_builder", "devinfo", + "either", "env_logger", "etcd-client", "event-publisher", diff --git a/io-engine-tests/src/lib.rs b/io-engine-tests/src/lib.rs index f3163c71d..d4498b3b0 100644 --- a/io-engine-tests/src/lib.rs +++ b/io-engine-tests/src/lib.rs @@ -188,6 +188,28 @@ pub fn truncate_file_bytes(path: &str, size: u64) { assert!(output.status.success()); } +/// Automatically assign a loopdev to path +pub fn setup_loopdev_file(path: &str, sector_size: Option) -> String { + let log_sec = sector_size.unwrap_or(512); + + let output = Command::new("losetup") + .args(["-f", "--show", "-b", &format!("{log_sec}"), path]) + .output() + .expect("failed exec losetup"); + assert!(output.status.success()); + // return the assigned loop device + String::from_utf8(output.stdout).unwrap().trim().to_string() +} + +/// Detach the provided loop device. +pub fn detach_loopdev(dev: &str) { + let output = Command::new("losetup") + .args(["-d", dev]) + .output() + .expect("failed exec losetup"); + assert!(output.status.success()); +} + pub fn fscheck(device: &str) { let output = Command::new("fsck") .args([device, "-n"]) diff --git a/io-engine/Cargo.toml b/io-engine/Cargo.toml index e4e1df05a..9b7148928 100644 --- a/io-engine/Cargo.toml +++ b/io-engine/Cargo.toml @@ -70,7 +70,7 @@ libc = "0.2.149" log = "0.4.20" md5 = "0.7.0" merge = "0.1.0" -nix = { version = "0.27.1", default-features = false, features = [ "hostname", "net", "socket", "ioctl" ] } +nix = { version = "0.27.1", default-features = false, features = ["hostname", "net", "socket", "ioctl"] } once_cell = "1.18.0" parking_lot = "0.12.1" pin-utils = "0.1.0" @@ -98,9 +98,10 @@ async-process = { version = "1.8.1" } rstack = { version = "0.3.3" } tokio-stream = "0.1.14" rustls = "0.21.12" +either = "1.9.0" devinfo = { path = "../utils/dependencies/devinfo" } -jsonrpc = { path = "../jsonrpc"} +jsonrpc = { path = "../jsonrpc" } io-engine-api = { path = "../utils/dependencies/apis/io-engine" } spdk-rs = { path = "../spdk-rs" } sysfs = { path = "../sysfs" } diff --git a/io-engine/src/bdev/aio.rs b/io-engine/src/bdev/aio.rs index 9a3700edb..99fc5cbf2 100644 --- a/io-engine/src/bdev/aio.rs +++ b/io-engine/src/bdev/aio.rs @@ -3,6 +3,7 @@ use std::{ convert::TryFrom, ffi::CString, fmt::{Debug, Formatter}, + os::unix::fs::FileTypeExt, }; use async_trait::async_trait; @@ -29,7 +30,7 @@ pub(super) struct Aio { impl Debug for Aio { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Aio '{}'", self.name) + write!(f, "Aio '{}', 'blk_size: {}'", self.name, self.blk_size) } } @@ -47,6 +48,10 @@ impl TryFrom<&Url> for Aio { }); } + let path_is_blockdev = std::fs::metadata(url.path()) + .ok() + .map_or(false, |meta| meta.file_type().is_block_device()); + let mut parameters: HashMap = url.query_pairs().into_owned().collect(); @@ -58,9 +63,14 @@ impl TryFrom<&Url> for Aio { value: value.clone(), })? } - None => 512, + None => { + if path_is_blockdev { + 0 + } else { + 512 + } + } }; - let uuid = uri::uuid(parameters.remove("uuid")).context( bdev_api::UuidParamParseFailed { uri: url.to_string(), diff --git a/io-engine/src/bdev/nexus/nexus_bdev_children.rs b/io-engine/src/bdev/nexus/nexus_bdev_children.rs index d14c9dd05..a6bab4cfd 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_children.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_children.rs @@ -1152,10 +1152,11 @@ impl<'n> Nexus<'n> { // Cancel rebuild job for this child, if any. if let Some(job) = child.rebuild_job() { debug!("{self:?}: retire: stopping rebuild job..."); - let terminated = job.force_fail(); - Reactors::master().send_future(async move { - terminated.await.ok(); - }); + if let either::Either::Left(terminated) = job.force_fail() { + Reactors::master().send_future(async move { + terminated.await.ok(); + }); + } } debug!("{child:?}: retire: enqueuing device '{dev}' to retire"); diff --git a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs index cecae8f33..7dcde5a51 100644 --- a/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs +++ b/io-engine/src/bdev/nexus/nexus_bdev_rebuild.rs @@ -247,15 +247,18 @@ impl<'n> Nexus<'n> { async fn terminate_rebuild(&self, child_uri: &str) { // If a rebuild job is not found that's ok // as we were just going to remove it anyway. - if let Ok(rj) = self.rebuild_job_mut(child_uri) { - let ch = rj.force_stop(); - if let Err(e) = ch.await { - error!( - "Failed to wait on rebuild job for child {child_uri} \ + let Ok(rj) = self.rebuild_job_mut(child_uri) else { + return; + }; + let either::Either::Left(ch) = rj.force_stop() else { + return; + }; + if let Err(e) = ch.await { + error!( + "Failed to wait on rebuild job for child {child_uri} \ to terminate with error {}", - e.verbose() - ); - } + e.verbose() + ); } } @@ -355,6 +358,9 @@ impl<'n> Nexus<'n> { // wait for the jobs to complete terminating for job in terminated_jobs { + let either::Either::Left(job) = job else { + continue; + }; if let Err(e) = job.await { error!( "{:?}: error when waiting for the rebuild job \ diff --git a/io-engine/src/bdev/nvmx/utils.rs b/io-engine/src/bdev/nvmx/utils.rs index 5bafb5b1d..8c7bd163c 100644 --- a/io-engine/src/bdev/nvmx/utils.rs +++ b/io-engine/src/bdev/nvmx/utils.rs @@ -40,8 +40,9 @@ pub enum NvmeAerInfoNvmCommandSet { /// Check if the Completion Queue Entry indicates abnormal termination of /// request due to any of the following conditions: -/// - Any media specific errors that occur in the NVM or data integrity type -/// errors. +/// - An Status Code Type(SCT) of media specific errors that occur in the NVM +/// or data integrity type errors, AND a Status Code(SC) value pertaining to +/// one of the below: /// - The command was aborted due to an end-to-end guard check failure. /// - The command was aborted due to an end-to-end application tag check /// failure. @@ -59,9 +60,9 @@ pub(crate) fn nvme_cpl_is_pi_error(cpl: *const spdk_nvme_cpl) -> bool { } sct == NvmeStatusCodeType::MediaError as u16 - || sc == NvmeMediaErrorStatusCode::Guard as u16 - || sc == NvmeMediaErrorStatusCode::ApplicationTag as u16 - || sc == NvmeMediaErrorStatusCode::ReferenceTag as u16 + && (sc == NvmeMediaErrorStatusCode::Guard as u16 + || sc == NvmeMediaErrorStatusCode::ApplicationTag as u16 + || sc == NvmeMediaErrorStatusCode::ReferenceTag as u16) } #[inline] diff --git a/io-engine/src/bdev/uring.rs b/io-engine/src/bdev/uring.rs index f8ff01489..b8d8ba69e 100644 --- a/io-engine/src/bdev/uring.rs +++ b/io-engine/src/bdev/uring.rs @@ -1,4 +1,9 @@ -use std::{collections::HashMap, convert::TryFrom, ffi::CString}; +use std::{ + collections::HashMap, + convert::TryFrom, + ffi::CString, + os::unix::fs::FileTypeExt, +}; use async_trait::async_trait; use futures::channel::oneshot; @@ -36,6 +41,10 @@ impl TryFrom<&Url> for Uring { }); } + let path_is_blockdev = std::fs::metadata(url.path()) + .ok() + .map_or(false, |meta| meta.file_type().is_block_device()); + let mut parameters: HashMap = url.query_pairs().into_owned().collect(); @@ -47,7 +56,13 @@ impl TryFrom<&Url> for Uring { value: value.clone(), })? } - None => 512, + None => { + if path_is_blockdev { + 0 + } else { + 512 + } + } }; let uuid = uri::uuid(parameters.remove("uuid")).context( diff --git a/io-engine/src/grpc/v1/snapshot_rebuild.rs b/io-engine/src/grpc/v1/snapshot_rebuild.rs index 00723841b..85ad114d9 100644 --- a/io-engine/src/grpc/v1/snapshot_rebuild.rs +++ b/io-engine/src/grpc/v1/snapshot_rebuild.rs @@ -110,7 +110,10 @@ impl SnapshotRebuildRpc for SnapshotRebuildService { let Ok(job) = SnapshotRebuildJob::lookup(&args.uuid) else { return Err(tonic::Status::not_found("")); }; - let rx = job.force_stop().await.ok(); + let rx = match job.force_stop() { + either::Either::Left(chan) => chan.await, + either::Either::Right(stopped) => Ok(stopped), + }; info!("Snapshot Rebuild stopped: {rx:?}"); job.destroy(); Ok(()) diff --git a/io-engine/src/rebuild/mod.rs b/io-engine/src/rebuild/mod.rs index 66598618d..263df5f9a 100644 --- a/io-engine/src/rebuild/mod.rs +++ b/io-engine/src/rebuild/mod.rs @@ -57,7 +57,10 @@ impl WithinRange for std::ops::Range { /// Shutdown all pending snapshot rebuilds. pub(crate) async fn shutdown_snapshot_rebuilds() { let jobs = SnapshotRebuildJob::list().into_iter(); - for recv in jobs.map(|job| job.force_stop()).collect::>() { + for recv in jobs + .flat_map(|job| job.force_stop().left()) + .collect::>() + { recv.await.ok(); } } diff --git a/io-engine/src/rebuild/rebuild_descriptor.rs b/io-engine/src/rebuild/rebuild_descriptor.rs index 439a2fcc8..e1b973335 100644 --- a/io-engine/src/rebuild/rebuild_descriptor.rs +++ b/io-engine/src/rebuild/rebuild_descriptor.rs @@ -48,9 +48,11 @@ pub(super) struct RebuildDescriptor { /// Pre-opened descriptor for the source block device. #[allow(clippy::non_send_fields_in_send_ty)] pub(super) src_descriptor: Box, + pub(super) src_handle: Box, /// Pre-opened descriptor for destination block device. #[allow(clippy::non_send_fields_in_send_ty)] pub(super) dst_descriptor: Box, + pub(super) dst_handle: Box, /// Start time of this rebuild. pub(super) start_time: DateTime, } @@ -90,9 +92,8 @@ impl RebuildDescriptor { }); } - let source_hdl = RebuildDescriptor::io_handle(&*src_descriptor).await?; - let destination_hdl = - RebuildDescriptor::io_handle(&*dst_descriptor).await?; + let src_handle = RebuildDescriptor::io_handle(&*src_descriptor).await?; + let dst_handle = RebuildDescriptor::io_handle(&*dst_descriptor).await?; let range = match range { None => { @@ -105,8 +106,8 @@ impl RebuildDescriptor { }; if !Self::validate( - source_hdl.get_device(), - destination_hdl.get_device(), + src_handle.get_device(), + dst_handle.get_device(), &range, ) { return Err(RebuildError::InvalidSrcDstRange {}); @@ -123,7 +124,9 @@ impl RebuildDescriptor { block_size, segment_size_blks, src_descriptor, + src_handle, dst_descriptor, + dst_handle, start_time: Utc::now(), }) } @@ -173,18 +176,14 @@ impl RebuildDescriptor { /// Get a `BlockDeviceHandle` for the source. #[inline(always)] - pub(super) async fn src_io_handle( - &self, - ) -> Result, RebuildError> { - Self::io_handle(&*self.src_descriptor).await + pub(super) fn src_io_handle(&self) -> &dyn BlockDeviceHandle { + self.src_handle.as_ref() } /// Get a `BlockDeviceHandle` for the destination. #[inline(always)] - pub(super) async fn dst_io_handle( - &self, - ) -> Result, RebuildError> { - Self::io_handle(&*self.dst_descriptor).await + pub(super) fn dst_io_handle(&self) -> &dyn BlockDeviceHandle { + self.dst_handle.as_ref() } /// Get a `BlockDeviceHandle` for the given block device descriptor. @@ -231,7 +230,6 @@ impl RebuildDescriptor { ) -> Result { match self .src_io_handle() - .await? .readv_blocks_async( iovs, offset_blk, @@ -269,7 +267,6 @@ impl RebuildDescriptor { iovs: &[IoVec], ) -> Result<(), RebuildError> { self.dst_io_handle() - .await? .writev_blocks_async( iovs, offset_blk, @@ -291,7 +288,6 @@ impl RebuildDescriptor { ) -> Result<(), RebuildError> { // Read the source again. self.src_io_handle() - .await? .readv_blocks_async( iovs, offset_blk, @@ -306,7 +302,6 @@ impl RebuildDescriptor { match self .dst_io_handle() - .await? .comparev_blocks_async( iovs, offset_blk, diff --git a/io-engine/src/rebuild/rebuild_job.rs b/io-engine/src/rebuild/rebuild_job.rs index c068ed0b7..2b594b1f6 100644 --- a/io-engine/src/rebuild/rebuild_job.rs +++ b/io-engine/src/rebuild/rebuild_job.rs @@ -163,13 +163,17 @@ impl RebuildJob { /// Forcefully stops the job, overriding any pending client operation /// returns an async channel which can be used to await for termination. - pub(crate) fn force_stop(&self) -> oneshot::Receiver { + pub(crate) fn force_stop( + &self, + ) -> either::Either, RebuildState> { self.force_terminate(RebuildOperation::Stop) } /// Forcefully fails the job, overriding any pending client operation /// returns an async channel which can be used to await for termination. - pub(crate) fn force_fail(&self) -> oneshot::Receiver { + pub(crate) fn force_fail( + &self, + ) -> either::Either, RebuildState> { self.force_terminate(RebuildOperation::Fail) } @@ -179,10 +183,13 @@ impl RebuildJob { fn force_terminate( &self, op: RebuildOperation, - ) -> oneshot::Receiver { + ) -> either::Either, RebuildState> { self.exec_internal_op(op).ok(); - self.add_completion_listener() - .unwrap_or_else(|_| oneshot::channel().1) + + match self.add_completion_listener() { + Ok(chan) => either::Either::Left(chan), + Err(_) => either::Either::Right(self.state()), + } } /// Get the rebuild stats. diff --git a/io-engine/tests/lvs_pool.rs b/io-engine/tests/lvs_pool.rs index e02ff3670..8335499c3 100644 --- a/io-engine/tests/lvs_pool.rs +++ b/io-engine/tests/lvs_pool.rs @@ -16,14 +16,33 @@ use std::pin::Pin; pub mod common; -static DISKNAME1: &str = "/tmp/disk1.img"; -static DISKNAME2: &str = "/tmp/disk2.img"; +static TESTDIR: &str = "/tmp/io-engine-tests"; +static DISKNAME1: &str = "/tmp/io-engine-tests/disk1.img"; +static DISKNAME2: &str = "/tmp/io-engine-tests/disk2.img"; +static DISKNAME3: &str = "/tmp/io-engine-tests/disk3.img"; #[tokio::test] async fn lvs_pool_test() { - common::delete_file(&[DISKNAME1.into(), DISKNAME2.into()]); + // Create directory for placing test disk files + // todo: Create this from some common place and use for all other tests too. + let _ = std::process::Command::new("mkdir") + .args(["-p"]) + .args([TESTDIR]) + .output() + .expect("failed to execute mkdir"); + + common::delete_file(&[ + DISKNAME1.into(), + DISKNAME2.into(), + DISKNAME3.into(), + ]); common::truncate_file(DISKNAME1, 128 * 1024); common::truncate_file(DISKNAME2, 128 * 1024); + common::truncate_file(DISKNAME3, 128 * 1024); + + //setup disk3 via loop device using a sector size of 4096. + let ldev = common::setup_loopdev_file(DISKNAME3, Some(4096)); + let args = MayastorCliArgs { reactor_mask: "0x3".into(), ..Default::default() @@ -32,7 +51,9 @@ async fn lvs_pool_test() { // should fail to import a pool that does not exist on disk ms.spawn(async { - assert!(Lvs::import("tpool", "aio:///tmp/disk1.img").await.is_err()) + assert!(Lvs::import("tpool", format!("aio://{DISKNAME1}").as_str()) + .await + .is_err()) }) .await; @@ -40,7 +61,7 @@ async fn lvs_pool_test() { ms.spawn(async { Lvs::create_or_import(PoolArgs { name: "tpool".into(), - disks: vec!["aio:///tmp/disk1.img".into()], + disks: vec![format!("aio://{DISKNAME1}")], uuid: None, cluster_size: None, backend: PoolBackend::Lvs, @@ -55,16 +76,23 @@ async fn lvs_pool_test() { // have an idempotent snafu, we dont crash and // burn ms.spawn(async { - assert!(Lvs::create("tpool", "aio:///tmp/disk1.img", None, None) - .await - .is_err()) + assert!(Lvs::create( + "tpool", + format!("aio://{DISKNAME1}").as_str(), + None, + None + ) + .await + .is_err()) }) .await; // should fail to import the pool that is already imported // similar to above, we use the import directly ms.spawn(async { - assert!(Lvs::import("tpool", "aio:///tmp/disk1.img").await.is_err()) + assert!(Lvs::import("tpool", format!("aio://{DISKNAME1}").as_str()) + .await + .is_err()) }) .await; @@ -75,7 +103,7 @@ async fn lvs_pool_test() { assert_eq!(pool.name(), "tpool"); assert_eq!(pool.used(), 0); dbg!(pool.uuid()); - assert_eq!(pool.base_bdev().name(), "/tmp/disk1.img"); + assert_eq!(pool.base_bdev().name(), DISKNAME1); }) .await; @@ -90,9 +118,13 @@ async fn lvs_pool_test() { // import and export implicitly destroy the base_bdev, for // testing import and create we // sometimes create the base_bdev manually - bdev_create("aio:///tmp/disk1.img").await.unwrap(); + bdev_create(format!("aio://{DISKNAME1}").as_str()) + .await + .unwrap(); - assert!(Lvs::import("tpool", "aio:///tmp/disk1.img").await.is_ok()); + assert!(Lvs::import("tpool", format!("aio://{DISKNAME1}").as_str()) + .await + .is_ok()); let pool = Lvs::lookup("tpool").unwrap(); assert_eq!(pool.uuid(), uuid); @@ -107,13 +139,22 @@ async fn lvs_pool_test() { let uuid = pool.uuid(); pool.destroy().await.unwrap(); - bdev_create("aio:///tmp/disk1.img").await.unwrap(); - assert!(Lvs::import("tpool", "aio:///tmp/disk1.img").await.is_err()); + bdev_create(format!("aio://{DISKNAME1}").as_str()) + .await + .unwrap(); + assert!(Lvs::import("tpool", format!("aio://{DISKNAME1}").as_str()) + .await + .is_err()); assert_eq!(Lvs::iter().count(), 0); - assert!(Lvs::create("tpool", "aio:///tmp/disk1.img", None, None) - .await - .is_ok()); + assert!(Lvs::create( + "tpool", + format!("aio://{DISKNAME1}").as_str(), + None, + None + ) + .await + .is_ok()); let pool = Lvs::lookup("tpool").unwrap(); assert_ne!(uuid, pool.uuid()); @@ -181,7 +222,7 @@ async fn lvs_pool_test() { pool.export().await.unwrap(); let pool = Lvs::create_or_import(PoolArgs { name: "tpool".to_string(), - disks: vec!["aio:///tmp/disk1.img".to_string()], + disks: vec![format!("aio://{DISKNAME1}")], uuid: None, cluster_size: None, backend: PoolBackend::Lvs, @@ -297,8 +338,12 @@ async fn lvs_pool_test() { // import the pool all shares should be there, but also validate // the share that not shared to be -- not shared ms.spawn(async { - bdev_create("aio:///tmp/disk1.img").await.unwrap(); - let pool = Lvs::import("tpool", "aio:///tmp/disk1.img").await.unwrap(); + bdev_create(format!("aio://{DISKNAME1}").as_str()) + .await + .unwrap(); + let pool = Lvs::import("tpool", format!("aio://{DISKNAME1}").as_str()) + .await + .unwrap(); for l in pool.lvols().unwrap() { if l.name() == "notshared" { @@ -321,7 +366,7 @@ async fn lvs_pool_test() { let pool = Lvs::create_or_import(PoolArgs { name: "tpool".into(), - disks: vec!["aio:///tmp/disk1.img".into()], + disks: vec![format!("aio://{DISKNAME1}")], uuid: None, cluster_size: None, backend: PoolBackend::Lvs, @@ -336,6 +381,60 @@ async fn lvs_pool_test() { }) .await; + let pool_dev_aio = ldev.clone(); + // should succeed to create an aio bdev pool on a loop blockdev of 4096 + // bytes sector size. + ms.spawn(async move { + Lvs::create_or_import(PoolArgs { + name: "tpool_4k_aio".into(), + disks: vec![format!("aio://{pool_dev_aio}")], + uuid: None, + cluster_size: None, + backend: PoolBackend::Lvs, + }) + .await + .unwrap(); + }) + .await; + + // should be able to find our new LVS created on loopdev, and subsequently + // destroy it. + ms.spawn(async { + let pool = Lvs::lookup("tpool_4k_aio").unwrap(); + assert_eq!(pool.name(), "tpool_4k_aio"); + assert_eq!(pool.used(), 0); + dbg!(pool.uuid()); + pool.destroy().await.unwrap(); + }) + .await; + + let pool_dev_uring = ldev.clone(); + // should succeed to create an uring pool on a loop blockdev of 4096 bytes + // sector size. + ms.spawn(async move { + Lvs::create_or_import(PoolArgs { + name: "tpool_4k_uring".into(), + disks: vec![format!("uring://{pool_dev_uring}")], + uuid: None, + cluster_size: None, + backend: PoolBackend::Lvs, + }) + .await + .unwrap(); + }) + .await; + + // should be able to find our new LVS created on loopdev, and subsequently + // destroy it. + ms.spawn(async { + let pool = Lvs::lookup("tpool_4k_uring").unwrap(); + assert_eq!(pool.name(), "tpool_4k_uring"); + assert_eq!(pool.used(), 0); + dbg!(pool.uuid()); + pool.destroy().await.unwrap(); + }) + .await; + // validate the expected state of mayastor ms.spawn(async { // no shares left except for the discovery controller @@ -352,7 +451,7 @@ async fn lvs_pool_test() { // importing a pool with the wrong name should fail Lvs::create_or_import(PoolArgs { name: "jpool".into(), - disks: vec!["aio:///tmp/disk1.img".into()], + disks: vec![format!("aio://{DISKNAME1}")], uuid: None, cluster_size: None, backend: PoolBackend::Lvs, @@ -369,7 +468,7 @@ async fn lvs_pool_test() { ms.spawn(async { let pool = Lvs::create_or_import(PoolArgs { name: "tpool2".into(), - disks: vec!["/tmp/disk2.img".into()], + disks: vec![format!("aio://{DISKNAME2}")], uuid: None, cluster_size: None, backend: PoolBackend::Lvs, @@ -381,4 +480,6 @@ async fn lvs_pool_test() { .await; common::delete_file(&[DISKNAME2.into()]); + common::detach_loopdev(ldev.as_str()); + common::delete_file(&[DISKNAME3.into()]); } diff --git a/scripts/clean-cargo-tests.sh b/scripts/clean-cargo-tests.sh index 4234c41b3..c9575aae6 100755 --- a/scripts/clean-cargo-tests.sh +++ b/scripts/clean-cargo-tests.sh @@ -1,8 +1,27 @@ +#!/usr/bin/env bash + SCRIPT_DIR="$(dirname "$0")" ROOT_DIR=$(realpath "$SCRIPT_DIR/..") sudo nvme disconnect-all +# Detach any loop devices created for test purposes +for back_file in "/tmp/io-engine-tests"/*; do + # Find loop devices associated with the disk image + devices=$(losetup -j "$back_file" -O NAME --noheadings) + + # Detach each loop device found + while IFS= read -r device; do + if [ -n "$device" ]; then + echo "Detaching loop device: $device" + losetup -d "$device" + fi + done <<< "$devices" +done +# Delete the directory too +rmdir --ignore-fail-on-non-empty "/tmp/io-engine-tests" + + for c in $(docker ps -a --filter "label=io.composer.test.name" --format '{{.ID}}') ; do docker kill "$c" docker rm "$c" diff --git a/scripts/pytest-tests.sh b/scripts/pytest-tests.sh index d0b5164ba..efd5081f9 100755 --- a/scripts/pytest-tests.sh +++ b/scripts/pytest-tests.sh @@ -36,6 +36,15 @@ function clean_all() echo "Done" } +function is_test() +{ + file="${1:-}" + if [ -d "$file" ] || [ -f "$file" ] || [ -f "${file%::*}" ]; then + return 0 + fi + return 1 +} + function run_tests() { while read name extra @@ -95,10 +104,13 @@ while [ "$#" -gt 0 ]; do *) set +e real_1="$(realpath $1 2>/dev/null)" + real_2="$(realpath $SRCDIR/test/python/$1 2>/dev/null)" set -e param="$1" - if [ -d "$real_1" ] || [ -f "$real_1" ] || [ -f "${real_1%::*}" ]; then + if is_test "$real_1"; then param="$real_1" + elif is_test "$real_2"; then + param="$real_2" else TEST_ARGS="${TEST_ARGS:-}$1" fi diff --git a/test/python/common/nvme.py b/test/python/common/nvme.py index d26455fa9..e0e85e752 100644 --- a/test/python/common/nvme.py +++ b/test/python/common/nvme.py @@ -68,6 +68,7 @@ def nvme_connect(uri, delay=10, tmo=600): command = ( f"nix-sudo nvme connect -t tcp -s {port} -a {host} -n {nqn} -c {delay} -l {tmo}" ) + print(command) subprocess.run(command, check=True, shell=True, capture_output=False) time.sleep(1) command = "nix-sudo nvme list -v -o json" @@ -97,6 +98,43 @@ def nvme_id_ctrl(device): return id_ctrl +def match_host_port(addr, host, port): + traddr = f"traddr={host}" + trsvcid = f"trsvcid={port}" + + return traddr in addr and trsvcid in addr + + +def nvme_find_ctrl(uri): + """Find controller from the device uri.""" + u = urlparse(uri) + port = u.port + host = u.hostname + nqn = u.path[1:] + + command = "nix-sudo nvme list -v -o json" + discover = json.loads( + subprocess.run( + command, shell=True, check=True, text=True, capture_output=True + ).stdout + ) + + # Finds correct Device + devs = list(filter(lambda d: nqn in d.get("SubsystemNQN"), discover.get("Devices"))) + assert len(devs) is 1, "Multiple devices with the same subnqn" + + # Find correct Controller + ctrls = list( + filter( + lambda d: match_host_port(d.get("Address"), host, port), + devs[0].get("Controllers"), + ) + ) + assert len(ctrls) is 1, "Multiple controllers with the same address" + + return ctrls[0].get("Controller") + + def nvme_resv_report(device): """Reservation report.""" command = "nix-sudo nvme resv-report {0} -c 1 -o json".format(device) @@ -129,18 +167,21 @@ def nvme_disconnect(uri): nqn = u.path[1:] command = "nix-sudo nvme disconnect -n {0}".format(nqn) + print(command) subprocess.run(command, check=True, shell=True, capture_output=True) def nvme_disconnect_controller(name): """Disconnect the given NVMe controller on this host.""" command = "nix-sudo nvme disconnect -d {0}".format(name) + print(command) subprocess.run(command, check=True, shell=True, capture_output=True) def nvme_disconnect_all(): """Disconnect from all connected nvme subsystems""" command = "nix-sudo nvme disconnect-all" + print(command) subprocess.run(command, check=True, shell=True, capture_output=True) diff --git a/test/python/tests/nexus_fault/test_nexus_fault.py b/test/python/tests/nexus_fault/test_nexus_fault.py index d3fd31534..5c5468274 100644 --- a/test/python/tests/nexus_fault/test_nexus_fault.py +++ b/test/python/tests/nexus_fault/test_nexus_fault.py @@ -16,7 +16,12 @@ from common.command import run_cmd from common.fio import Fio from common.mayastor import container_mod, mayastor_mod -from common.nvme import nvme_connect, nvme_disconnect, nvme_disconnect_controller +from common.nvme import ( + nvme_connect, + nvme_disconnect, + nvme_disconnect_controller, + nvme_find_ctrl, +) import nexus_pb2 as pb @@ -119,13 +124,12 @@ def _(recreate_pool, republish_nexus_ana): @when("the initiator swaps the nexuses") -def _(recreate_pool, republish_nexus_ana): +def _(publish_nexus, recreate_pool, republish_nexus_ana): """the initiator swaps the nexuses.""" print(republish_nexus_ana) - device = nvme_connect(republish_nexus_ana) - # disconnect previous nexus: /dev/nvme*n* - split_dev = device.split("n") - nvme_disconnect_controller(f"{split_dev[0]}n{split_dev[1]}") + prev_ctrl = nvme_find_ctrl(publish_nexus) + nvme_connect(republish_nexus_ana) + nvme_disconnect_controller(f"/dev/{prev_ctrl}") @then("the fio workload should complete gracefully")