From 92ae01268b0d1a7f52d66eab1be55ab79a84e598 Mon Sep 17 00:00:00 2001 From: Alan Hanson Date: Wed, 10 May 2023 07:25:50 -0700 Subject: [PATCH] crutest use SIGUSR1 to stop tests (#699) crutest handle some signals. Added the ability to run a test without specifying a number of test cycles and continue running until a SIGUSR1 signal. At present only the generic test supports this ability. Add a verify the region on receipt of SIGUSR2 --------- Co-authored-by: Alan Hanson --- Cargo.lock | 2 + crutest/Cargo.toml | 2 + crutest/src/cli.rs | 3 +- crutest/src/main.rs | 167 ++++++++++++++++++++++++++++++++++++-------- 4 files changed, 145 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d45f2ea35..6cf736367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,6 +958,8 @@ dependencies = [ "schemars", "serde", "serde_json", + "signal-hook", + "signal-hook-tokio", "statistical", "tokio", "tokio-util 0.7.3", diff --git a/crutest/Cargo.toml b/crutest/Cargo.toml index 0cdcd068a..25c757b57 100644 --- a/crutest/Cargo.toml +++ b/crutest/Cargo.toml @@ -30,6 +30,8 @@ reedline = "0.19.0" schemars = { version = "0.8.12", features = [ "uuid1" ] } serde = { version = "1", features = ["derive"] } serde_json = "1" +signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } +signal-hook = "0.3.15" statistical = "1.0.0" tokio = { version = "1.27.0", features = ["full"] } tokio-util = { version = "0.7", features = ["codec"]} diff --git a/crutest/src/cli.rs b/crutest/src/cli.rs index 77d02a8de..7a72935ce 100644 --- a/crutest/src/cli.rs +++ b/crutest/src/cli.rs @@ -799,7 +799,8 @@ async fn process_cli_command( ))) .await } else { - match generic_workload(guest, count, ri).await { + let mut wtq = WhenToQuit::Count { count }; + match generic_workload(guest, &mut wtq, ri).await { Ok(_) => fw.send(CliMessage::DoneOk).await, Err(e) => { let msg = format!("{}", e); diff --git a/crutest/src/main.rs b/crutest/src/main.rs index 312f768ab..fabe2d49e 100644 --- a/crutest/src/main.rs +++ b/crutest/src/main.rs @@ -8,10 +8,14 @@ use anyhow::{anyhow, bail, Result}; use bytes::Bytes; use clap::Parser; use csv::WriterBuilder; +use futures::StreamExt; use indicatif::{ProgressBar, ProgressStyle}; use rand::prelude::*; use rand_chacha::rand_core::SeedableRng; use serde::{Deserialize, Serialize}; +use signal_hook::consts::signal::*; +use signal_hook_tokio::Signals; +use tokio::sync::mpsc; use tokio::time::{Duration, Instant}; use uuid::Uuid; @@ -92,9 +96,13 @@ enum Workload { #[clap(name = "client", term_width = 80)] #[clap(about = "A Crucible upstairs test client", long_about = None)] pub struct Opt { + /// For tests that support it, run until a SIGUSR1 signal is received. + #[clap(long, global = true, action, conflicts_with = "count")] + continuous: bool, + /// For tests that support it, pass this count value for the number /// of loops the test should do. - #[clap(short, long, global = true, default_value = "0", action)] + #[clap(short, long, global = true, action, default_value = "0")] count: usize, #[clap( @@ -493,6 +501,43 @@ async fn load_write_log( Ok(()) } +// How to determine when a test will stop running. +// Either by count, or a message over a channel. +enum WhenToQuit { + Count { + count: usize, + }, + Signal { + shutdown_rx: mpsc::Receiver, + }, +} + +#[derive(Debug)] +enum SignalAction { + Shutdown, + Verify, +} + +// When a signal is received, send a message over a channel. +async fn handle_signals( + mut signals: Signals, + shutdown_tx: mpsc::Sender, +) { + while let Some(signal) = signals.next().await { + match signal { + SIGUSR1 => { + shutdown_tx.send(SignalAction::Shutdown).await.unwrap(); + } + SIGUSR2 => { + shutdown_tx.send(SignalAction::Verify).await.unwrap(); + } + x => { + panic!("Received unsupported signal {}", x); + } + } + } +} + /** * This is an example Crucible client. * Here we make use of the interfaces that Crucible exposes. @@ -649,6 +694,13 @@ async fn main() -> Result<()> { load_write_log(&guest, &mut region_info, verify_in, verify).await?; } + let (shutdown_tx, shutdown_rx) = mpsc::channel::(4); + if opt.continuous { + println!("Setup signal handler"); + let signals = Signals::new([SIGUSR1, SIGUSR2])?; + tokio::spawn(handle_signals(signals, shutdown_tx)); + } + /* * Call the function for the workload option passed from the command * line. @@ -747,15 +799,18 @@ async fn main() -> Result<()> { } Workload::Generic => { - let count = { - if opt.count == 0 { - 500 + // Either we have a count, or we run until we get a signal. + let mut wtq = { + if opt.continuous { + WhenToQuit::Signal { shutdown_rx } + } else if opt.count == 0 { + WhenToQuit::Count { count: 500 } } else { - opt.count + WhenToQuit::Count { count: opt.count } } }; - generic_workload(&guest, count, &mut region_info).await?; + generic_workload(&guest, &mut wtq, &mut region_info).await?; } Workload::One => { @@ -1295,7 +1350,7 @@ async fn fill_workload( */ async fn generic_workload( guest: &Arc, - count: usize, + wtq: &mut WhenToQuit, ri: &mut RegionInfo, ) -> Result<()> { /* @@ -1303,19 +1358,31 @@ async fn generic_workload( */ let mut rng = rand_chacha::ChaCha8Rng::from_entropy(); - let count_width = count.to_string().len(); + let count_width = match wtq { + WhenToQuit::Count { count } => count.to_string().len(), + _ => 5, + }; let block_width = ri.total_blocks.to_string().len(); let size_width = (10 * ri.block_size).to_string().len(); - for c in 1..=count { + + let mut c = 1; + loop { let op = rng.gen_range(0..10); if op == 0 { // flush - println!( - "{:>0width$}/{:>0width$} Flush", - c, - count, - width = count_width, - ); + match wtq { + WhenToQuit::Count { count } => { + println!( + "{:>0width$}/{:>0width$} Flush", + c, + count, + width = count_width, + ); + } + WhenToQuit::Signal { .. } => { + println!("{:>0width$} Flush", c, width = count_width); + } + } guest.flush(None).await?; } else { // Read or Write both need this @@ -1343,20 +1410,30 @@ async fn generic_workload( fill_vec(block_index, size, &ri.write_log, ri.block_size); let data = Bytes::from(vec); + match wtq { + WhenToQuit::Count { count } => { + print!( + "{:>0width$}/{:>0width$}", + c, + count, + width = count_width, + ); + } + WhenToQuit::Signal { .. } => { + print!("{:>0width$}", c, width = count_width); + } + } print!( - "{:>0width$}/{:>0width$} Write \ - block {:>bw$} len {:>sw$} data:", - c, - count, + " Write block {:>bw$} len {:>sw$} data:", offset.value, data.len(), - width = count_width, bw = block_width, sw = size_width, ); + assert_eq!(data[1], ri.write_log.get_seed(block_index)); for i in 0..size { - print!("{:>3} ", ri.write_log.get_seed(block_index + i)); + print!(" {:>3}", ri.write_log.get_seed(block_index + i)); } println!(); guest.write(offset, data).await?; @@ -1365,14 +1442,23 @@ async fn generic_workload( let length: usize = size * ri.block_size as usize; let vec: Vec = vec![255; length]; let data = crucible::Buffer::from_vec(vec); + match wtq { + WhenToQuit::Count { count } => { + print!( + "{:>0width$}/{:>0width$}", + c, + count, + width = count_width, + ); + } + WhenToQuit::Signal { .. } => { + print!("{:>0width$}", c, width = count_width); + } + } println!( - "{:>0width$}/{:>0width$} Read \ - block {:>bw$} len {:>sw$}", - c, - count, + " Read block {:>bw$} len {:>sw$}", offset.value, data.len(), - width = count_width, bw = block_width, sw = size_width, ); @@ -1393,6 +1479,29 @@ async fn generic_workload( } } } + c += 1; + match wtq { + WhenToQuit::Count { count } => { + if c > *count { + break; + } + } + WhenToQuit::Signal { shutdown_rx } => { + match shutdown_rx.try_recv() { + Ok(SignalAction::Shutdown) => { + println!("shutting down in response to SIGUSR1"); + break; + } + Ok(SignalAction::Verify) => { + println!("Verify Volume"); + if let Err(e) = verify_volume(guest, ri, false).await { + bail!("Requested volume verify failed: {:?}", e) + } + } + _ => {} // Ignore everything else + } + } + } } Ok(()) @@ -1843,7 +1952,8 @@ async fn deactivate_workload( count, width = count_width ); - generic_workload(guest, 20, ri).await?; + let mut wtq = WhenToQuit::Count { count: 20 }; + generic_workload(guest, &mut wtq, ri).await?; println!( "{:>0width$}/{:>0width$}, CLIENT: Now disconnect", c, @@ -1891,7 +2001,8 @@ async fn deactivate_workload( } } println!("One final"); - generic_workload(guest, 20, ri).await?; + let mut wtq = WhenToQuit::Count { count: 20 }; + generic_workload(guest, &mut wtq, ri).await?; println!("final verify"); if let Err(e) = verify_volume(guest, ri, false).await {