Skip to content

Commit

Permalink
Add support for concurrent downloads to client_benchmark example (#1000)
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Sep 6, 2024
1 parent c27abd2 commit 5d15350
Showing 1 changed file with 39 additions and 23 deletions.
62 changes: 39 additions & 23 deletions mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::pin::pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;

use clap::{Parser, Subcommand};
Expand All @@ -27,29 +28,42 @@ fn init_tracing_subscriber() {
subscriber.try_init().expect("unable to install global subscriber");
}

fn run_benchmark(client: impl ObjectClient + Clone, num_iterations: usize, bucket: &str, key: &str) {
fn run_benchmark(
client: impl ObjectClient + Clone + Send,
num_iterations: usize,
num_downloads: usize,
bucket: &str,
key: &str,
) {
for i in 0..num_iterations {
let received_size = Arc::new(AtomicU64::new(0));
let client = client.clone();
let received_size_clone = Arc::clone(&received_size);
let start = Instant::now();
futures::executor::block_on(async move {
let mut request = client
.get_object(bucket, key, None, None)
.await
.expect("couldn't create get request");
let mut request = pin!(request);
loop {
match StreamExt::next(&mut request).await {
Some(Ok((_offset, body))) => {
received_size_clone.fetch_add(body.len() as u64, Ordering::SeqCst);
}
Some(Err(e)) => {
tracing::error!(error = ?e, "request failed");
break;
}
None => break,
}
let received_size = Arc::new(AtomicU64::new(0));

thread::scope(|scope| {
for _ in 0..num_downloads {
let client = client.clone();
let received_size_clone = Arc::clone(&received_size);
scope.spawn(|| {
futures::executor::block_on(async move {
let mut request = client
.get_object(bucket, key, None, None)
.await
.expect("couldn't create get request");
let mut request = pin!(request);
loop {
match request.next().await {
Some(Ok((_offset, body))) => {
received_size_clone.fetch_add(body.len() as u64, Ordering::SeqCst);
}
Some(Err(e)) => {
tracing::error!(error = ?e, "request failed");
break;
}
None => break,
}
}
})
});
}
});

Expand Down Expand Up @@ -93,6 +107,8 @@ struct CliArgs {
part_size: usize,
#[arg(long, help = "Number of benchmark iterations", default_value = "1")]
iterations: usize,
#[arg(long, help = "Number of concurrent downloads", default_value = "1")]
downloads: usize,
}

fn main() {
Expand All @@ -107,7 +123,7 @@ fn main() {
config = config.part_size(args.part_size);
let client = S3CrtClient::new(config).expect("couldn't create client");

run_benchmark(client, args.iterations, &bucket, &key);
run_benchmark(client, args.iterations, args.downloads, &bucket, &key);
}
Client::Mock { object_size } => {
const BUCKET: &str = "bucket";
Expand All @@ -124,7 +140,7 @@ fn main() {

client.add_object(KEY, MockObject::ramp(0xaa, object_size as usize, ETag::for_tests()));

run_benchmark(client, args.iterations, BUCKET, "key");
run_benchmark(client, args.iterations, args.downloads, BUCKET, KEY);
}
}
}

0 comments on commit 5d15350

Please sign in to comment.