Skip to content

Commit

Permalink
crutest use SIGUSR1 to stop tests (#699)
Browse files Browse the repository at this point in the history
crutest handle some signals.

Added the ability to run a test without specifying a number of test
cycles and continue running until a SIGUSR1 signal.

At present only the generic test supports this ability.

Add a verify the region on receipt of SIGUSR2

---------

Co-authored-by: Alan Hanson <[email protected]>
  • Loading branch information
leftwo and Alan Hanson authored May 10, 2023
1 parent 47f3569 commit 92ae012
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crutest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ reedline = "0.19.0"
schemars = { version = "0.8.12", features = [ "uuid1" ] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }
signal-hook = "0.3.15"
statistical = "1.0.0"
tokio = { version = "1.27.0", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"]}
Expand Down
3 changes: 2 additions & 1 deletion crutest/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,8 @@ async fn process_cli_command(
)))
.await
} else {
match generic_workload(guest, count, ri).await {
let mut wtq = WhenToQuit::Count { count };
match generic_workload(guest, &mut wtq, ri).await {
Ok(_) => fw.send(CliMessage::DoneOk).await,
Err(e) => {
let msg = format!("{}", e);
Expand Down
167 changes: 139 additions & 28 deletions crutest/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use anyhow::{anyhow, bail, Result};
use bytes::Bytes;
use clap::Parser;
use csv::WriterBuilder;
use futures::StreamExt;
use indicatif::{ProgressBar, ProgressStyle};
use rand::prelude::*;
use rand_chacha::rand_core::SeedableRng;
use serde::{Deserialize, Serialize};
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use tokio::sync::mpsc;
use tokio::time::{Duration, Instant};
use uuid::Uuid;

Expand Down Expand Up @@ -92,9 +96,13 @@ enum Workload {
#[clap(name = "client", term_width = 80)]
#[clap(about = "A Crucible upstairs test client", long_about = None)]
pub struct Opt {
/// For tests that support it, run until a SIGUSR1 signal is received.
#[clap(long, global = true, action, conflicts_with = "count")]
continuous: bool,

/// For tests that support it, pass this count value for the number
/// of loops the test should do.
#[clap(short, long, global = true, default_value = "0", action)]
#[clap(short, long, global = true, action, default_value = "0")]
count: usize,

#[clap(
Expand Down Expand Up @@ -493,6 +501,43 @@ async fn load_write_log(
Ok(())
}

// How to determine when a test will stop running.
// Either by count, or a message over a channel.
enum WhenToQuit {
Count {
count: usize,
},
Signal {
shutdown_rx: mpsc::Receiver<SignalAction>,
},
}

#[derive(Debug)]
enum SignalAction {
Shutdown,
Verify,
}

// When a signal is received, send a message over a channel.
async fn handle_signals(
mut signals: Signals,
shutdown_tx: mpsc::Sender<SignalAction>,
) {
while let Some(signal) = signals.next().await {
match signal {
SIGUSR1 => {
shutdown_tx.send(SignalAction::Shutdown).await.unwrap();
}
SIGUSR2 => {
shutdown_tx.send(SignalAction::Verify).await.unwrap();
}
x => {
panic!("Received unsupported signal {}", x);
}
}
}
}

/**
* This is an example Crucible client.
* Here we make use of the interfaces that Crucible exposes.
Expand Down Expand Up @@ -649,6 +694,13 @@ async fn main() -> Result<()> {
load_write_log(&guest, &mut region_info, verify_in, verify).await?;
}

let (shutdown_tx, shutdown_rx) = mpsc::channel::<SignalAction>(4);
if opt.continuous {
println!("Setup signal handler");
let signals = Signals::new([SIGUSR1, SIGUSR2])?;
tokio::spawn(handle_signals(signals, shutdown_tx));
}

/*
* Call the function for the workload option passed from the command
* line.
Expand Down Expand Up @@ -747,15 +799,18 @@ async fn main() -> Result<()> {
}

Workload::Generic => {
let count = {
if opt.count == 0 {
500
// Either we have a count, or we run until we get a signal.
let mut wtq = {
if opt.continuous {
WhenToQuit::Signal { shutdown_rx }
} else if opt.count == 0 {
WhenToQuit::Count { count: 500 }
} else {
opt.count
WhenToQuit::Count { count: opt.count }
}
};

generic_workload(&guest, count, &mut region_info).await?;
generic_workload(&guest, &mut wtq, &mut region_info).await?;
}

Workload::One => {
Expand Down Expand Up @@ -1295,27 +1350,39 @@ async fn fill_workload(
*/
async fn generic_workload(
guest: &Arc<Guest>,
count: usize,
wtq: &mut WhenToQuit,
ri: &mut RegionInfo,
) -> Result<()> {
/*
* TODO: Allow the user to specify a seed here.
*/
let mut rng = rand_chacha::ChaCha8Rng::from_entropy();

let count_width = count.to_string().len();
let count_width = match wtq {
WhenToQuit::Count { count } => count.to_string().len(),
_ => 5,
};
let block_width = ri.total_blocks.to_string().len();
let size_width = (10 * ri.block_size).to_string().len();
for c in 1..=count {

let mut c = 1;
loop {
let op = rng.gen_range(0..10);
if op == 0 {
// flush
println!(
"{:>0width$}/{:>0width$} Flush",
c,
count,
width = count_width,
);
match wtq {
WhenToQuit::Count { count } => {
println!(
"{:>0width$}/{:>0width$} Flush",
c,
count,
width = count_width,
);
}
WhenToQuit::Signal { .. } => {
println!("{:>0width$} Flush", c, width = count_width);
}
}
guest.flush(None).await?;
} else {
// Read or Write both need this
Expand Down Expand Up @@ -1343,20 +1410,30 @@ async fn generic_workload(
fill_vec(block_index, size, &ri.write_log, ri.block_size);
let data = Bytes::from(vec);

match wtq {
WhenToQuit::Count { count } => {
print!(
"{:>0width$}/{:>0width$}",
c,
count,
width = count_width,
);
}
WhenToQuit::Signal { .. } => {
print!("{:>0width$}", c, width = count_width);
}
}
print!(
"{:>0width$}/{:>0width$} Write \
block {:>bw$} len {:>sw$} data:",
c,
count,
" Write block {:>bw$} len {:>sw$} data:",
offset.value,
data.len(),
width = count_width,
bw = block_width,
sw = size_width,
);

assert_eq!(data[1], ri.write_log.get_seed(block_index));
for i in 0..size {
print!("{:>3} ", ri.write_log.get_seed(block_index + i));
print!(" {:>3}", ri.write_log.get_seed(block_index + i));
}
println!();
guest.write(offset, data).await?;
Expand All @@ -1365,14 +1442,23 @@ async fn generic_workload(
let length: usize = size * ri.block_size as usize;
let vec: Vec<u8> = vec![255; length];
let data = crucible::Buffer::from_vec(vec);
match wtq {
WhenToQuit::Count { count } => {
print!(
"{:>0width$}/{:>0width$}",
c,
count,
width = count_width,
);
}
WhenToQuit::Signal { .. } => {
print!("{:>0width$}", c, width = count_width);
}
}
println!(
"{:>0width$}/{:>0width$} Read \
block {:>bw$} len {:>sw$}",
c,
count,
" Read block {:>bw$} len {:>sw$}",
offset.value,
data.len(),
width = count_width,
bw = block_width,
sw = size_width,
);
Expand All @@ -1393,6 +1479,29 @@ async fn generic_workload(
}
}
}
c += 1;
match wtq {
WhenToQuit::Count { count } => {
if c > *count {
break;
}
}
WhenToQuit::Signal { shutdown_rx } => {
match shutdown_rx.try_recv() {
Ok(SignalAction::Shutdown) => {
println!("shutting down in response to SIGUSR1");
break;
}
Ok(SignalAction::Verify) => {
println!("Verify Volume");
if let Err(e) = verify_volume(guest, ri, false).await {
bail!("Requested volume verify failed: {:?}", e)
}
}
_ => {} // Ignore everything else
}
}
}
}

Ok(())
Expand Down Expand Up @@ -1843,7 +1952,8 @@ async fn deactivate_workload(
count,
width = count_width
);
generic_workload(guest, 20, ri).await?;
let mut wtq = WhenToQuit::Count { count: 20 };
generic_workload(guest, &mut wtq, ri).await?;
println!(
"{:>0width$}/{:>0width$}, CLIENT: Now disconnect",
c,
Expand Down Expand Up @@ -1891,7 +2001,8 @@ async fn deactivate_workload(
}
}
println!("One final");
generic_workload(guest, 20, ri).await?;
let mut wtq = WhenToQuit::Count { count: 20 };
generic_workload(guest, &mut wtq, ri).await?;

println!("final verify");
if let Err(e) = verify_volume(guest, ri, false).await {
Expand Down

0 comments on commit 92ae012

Please sign in to comment.