Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker tasks to the agent. #1393

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 176 additions & 13 deletions agent-antagonist/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rand::random;
use signal_hook::consts::signal::*;
use signal_hook_tokio::Signals;
use slog::Logger;
use slog::{info, warn};
use slog::{debug, info, warn};
use std::net::SocketAddr;
use std::process::Command;
use std::sync::{
Expand All @@ -27,8 +27,11 @@ use crucible_agent_client::{
about = "Stress tester for the crucible agent"
)]
enum Args {
/// Run a number of antagonist loop tasks that will do the following:
/// Run a number of antagonist loop tasks.
/// There are two options for what the test will do.
/// All tests operate on a 1GB region.
///
/// The default test loop will do this loop:
/// - Create a 1GB region
///
/// - Randomly:
Expand All @@ -40,6 +43,14 @@ enum Args {
///
/// - Delete the region
///
/// The clone stress test will do this:
/// - Create a 1GB region
/// - Create a snapshot of that region.
/// - Loop on:
/// - Clone the snapshot to a new region.
/// - Delete the region.
///
///
/// Additionally, one task is spawned that will:
///
/// - Get a list of regions
Expand All @@ -51,6 +62,10 @@ enum Args {
#[clap(short, long)]
agent: Option<SocketAddr>,

/// Stress test the downstairs clone of a snapshot operation.
#[clap(long, action)]
clone_stress: bool,

/// Dataset for the crucible agent - leave blank to autodetect if in the
/// crucible zone
#[clap(short, long)]
Expand All @@ -69,7 +84,7 @@ fn command(log: &Logger, bin: &'static str, args: &[&str]) -> Result<String> {
info!(log, "{} {:?} took {:?}", bin, args, elapsed);

if !cmd.status.success() {
bail!("zfs list failed!");
bail!("command: {} {:?} failed {:?}", bin, args, cmd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where's the dunce emoji

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be clear I meant myself! :o

}

Ok(String::from_utf8(cmd.stdout)?.trim_end().to_string())
Expand Down Expand Up @@ -174,19 +189,109 @@ async fn main_thread(
Ok(())
}

async fn main_clone_thread(
log: Logger,
agent: SocketAddr,
dataset: String,
stop_flag: Arc<AtomicBool>,
) -> Result<()> {
// Create a 1 GB region
let region_id = Uuid::new_v4();

let region_request = CreateRegion {
block_size: 512,
extent_count: 16,
extent_size: 131072,
id: RegionId(region_id.to_string()),
encrypted: true,
cert_pem: None,
key_pem: None,
root_pem: None,
source: None,
};

if let Err(e) = create_a_region(agent, &log, region_request.clone()).await {
bail!("Region create {region_id} failed: {e}");
}

let snapshot_id = Uuid::new_v4();
info!(log, "Create snapshot {snapshot_id}");

if let Err(e) =
create_a_snapshot(agent, &log, &dataset, region_id, snapshot_id).await
{
bail!("Snapshot create returned {e}");
}

let mut count = 1;
loop {
if stop_flag.load(Ordering::SeqCst) {
break;
}
let clone_region_id = Uuid::new_v4();
info!(
log,
"From {region_id}--{snapshot_id} clone:{clone_region_id} at \
count:{count}---"
);
if let Err(e) = clone_a_snapshot(
agent,
&log,
region_id,
region_request.clone(),
snapshot_id,
clone_region_id,
)
.await
{
bail!("Snapshot clone returned {e}");
}

info!(log, "Delete clone:{clone_region_id} at count:{count}");

if let Err(e) = delete_a_region(agent, &log, clone_region_id).await {
bail!("Region clone delete {clone_region_id} failed: {e}");
}
count += 1;
info!(
log,
"Completed {:5} clones from {region_id}--{snapshot_id}", count
);

// If we don't add a little disturbance, all the threads end up
// cloning at the same time. This little variation here will ensure
// that the tasks are not always synced up to each other.
if random() && random() {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
}

if let Err(e) = delete_a_snapshot(agent, &log, region_id, snapshot_id).await
{
bail!("Snapshot delete returned {e}");
}

// Delete region
if let Err(e) = delete_a_region(agent, &log, region_id).await {
bail!("Region delete {region_id} failed: {e}");
}
Ok(())
}

// Create a region.
// Loop till it is ready.
async fn create_a_region(
agent: SocketAddr,
log: &Logger,
region_request: CreateRegion,
) -> Result<()> {
info!(log, "creating region {:?}", region_request.id);
let mut retry = 1;
loop {
info!(log, "creating region {:?}", region_request.id);
let client = get_client(&agent);
let region = match client.region_create(&region_request).await {
Ok(region) => {
info!(log, "creating region {:?} ok", region_request.id,);
debug!(log, "creating region {:?} ok", region_request.id,);
region
}

Expand All @@ -203,9 +308,10 @@ async fn create_a_region(
RegionState::Requested => {
info!(
log,
"waiting for region {:?}: state {:?}",
"waiting for region {:?}: state {:?} try:{}",
region_request.id,
region.state,
retry,
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand All @@ -226,6 +332,7 @@ async fn create_a_region(
);
}
}
retry += 1;
}
Ok(())
}
Expand Down Expand Up @@ -567,6 +674,44 @@ async fn clone_a_snapshot(
"Use {:?} for source clone {:?}", source_addr, region_request
);

// We just created a region, then took a snapshot. Next we will try to
// clone that snapshot. It's possible we arrive here before the downstairs
// (that we are cloning from) has come all the way online. We loop a few
// times and hit a known endpoint on the expected downstairs repair port to
// verify that things have come online before trying to clone. This avoids
// us trying to clone too soon.
let url = format!("http://{}/region-info", source_addr).to_string();

let mut retry = 0;
loop {
let res = reqwest::get(url.clone()).await;
match res {
Ok(resp) => {
if resp.status().is_success() {
info!(log, "http to clone {} was successful.", url);
break;
} else {
warn!(
log,
"Request {retry} to {} failed with status: {}",
url,
resp.status()
);
}
}
Err(e) => {
warn!(log, "Request {retry} to {} failed: {}", url, e);
}
}
retry += 1;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if retry > 20 {
bail!("Failed check to clone endpoint {}", url);
} else {
warn!(log, "http to clone {} failed, try:{}", url, retry);
}
}

if let Err(e) = create_a_region(agent, log, region_request.clone()).await {
bail!("Region clone create failed, returned {e}");
}
Expand All @@ -582,14 +727,15 @@ async fn delete_a_region(
log: &Logger,
region_id: Uuid,
) -> Result<()> {
info!(log, "tombstoning region {:?}", region_id);
let mut retry = 1;
loop {
info!(log, "tombstoning region {:?}", region_id);
let client = get_client(&agent);
let r = client.region_delete(&RegionId(region_id.to_string())).await;
drop(client);
match r {
Ok(_) => {
info!(log, "tombstoning region {:?} ok", region_id);
debug!(log, "tombstoning region {:?} ok", region_id);
}

Err(e) => {
Expand All @@ -602,7 +748,7 @@ async fn delete_a_region(
drop(client);
let region = match r {
Ok(region) => {
info!(log, "get region {:?} ok", region_id);
debug!(log, "get region {:?} ok", region_id);
region
}

Expand All @@ -615,9 +761,10 @@ async fn delete_a_region(
RegionState::Tombstoned => {
info!(
log,
"waiting for region {:?}: state {:?}",
"waiting for region {:?}: state {:?} try:{}",
region_id,
region.state,
retry,
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Expand All @@ -635,6 +782,7 @@ async fn delete_a_region(
);
}
}
retry += 1;
}
Ok(())
}
Expand Down Expand Up @@ -724,6 +872,7 @@ async fn main() -> Result<()> {
match args {
Args::Run {
agent,
clone_stress,
dataset,
tasks,
} => {
Expand Down Expand Up @@ -788,14 +937,28 @@ async fn main() -> Result<()> {
let dataset = dataset.clone();
let stop_flag_clone = stop_flag.clone();

tokio::spawn(async move {
main_thread(log, agent, dataset, stop_flag_clone).await
})
if clone_stress {
tokio::spawn(async move {
main_clone_thread(
log,
agent,
dataset,
stop_flag_clone,
)
.await
})
} else {
tokio::spawn(async move {
main_thread(log, agent, dataset, stop_flag_clone)
.await
})
}
})
.collect();

// Add another task that grabs all regions, and queries all
// snapshots for those regions

let stop_flag_clone = stop_flag.clone();
let qlog = log.new(slog::o!("query" => 0));
jhs.push(tokio::spawn(async move {
Expand Down
Loading