Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Jun 6, 2024
1 parent f1f394a commit 5851790
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 47 deletions.
3 changes: 1 addition & 2 deletions mountpoint-s3-client/examples/client_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ fn main() {
bucket: BUCKET.to_owned(),
part_size: args.part_size,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
};
let client = ThroughputMockClient::new(config, args.throughput_target_gbps);
let client = Arc::new(client);
Expand Down
3 changes: 1 addition & 2 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,7 @@ mod tests {
bucket: bucket.to_string(),
part_size: 128,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let body = vec![0u8; 50];
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub mod imds_crt_client;
pub mod instance_info;
#[doc(hidden)]
pub mod mock_client;
pub mod object_client;
mod object_client;
mod s3_crt_client;
#[doc(hidden)]
pub mod user_agent;
Expand All @@ -71,9 +71,9 @@ pub mod config {
pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts,
GetObjectAttributesResult, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, RestoreStatus,
UploadReview, UploadReviewPart,
GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums,
RestoreStatus, UploadReview, UploadReviewPart,
};
}

Expand Down
33 changes: 11 additions & 22 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let mut body = vec![0u8; size];
Expand Down Expand Up @@ -1007,8 +1006,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let mut body = vec![0u8; 2000];
Expand Down Expand Up @@ -1114,8 +1112,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let mut keys = vec![];
Expand Down Expand Up @@ -1204,8 +1201,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let mut keys = vec![];
Expand Down Expand Up @@ -1253,8 +1249,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: Some(1234),
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

for i in 0..20 {
Expand Down Expand Up @@ -1328,8 +1323,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: Some(1234),
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

for i in 0..20 {
Expand Down Expand Up @@ -1397,8 +1391,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: Some(1234),
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

for i in 0..20 {
Expand Down Expand Up @@ -1465,8 +1458,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let mut put_request = client
Expand Down Expand Up @@ -1522,8 +1514,7 @@ mod tests {
bucket: bucket.to_owned(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let key = "key1";
Expand Down Expand Up @@ -1553,8 +1544,7 @@ mod tests {
bucket: bucket.to_owned(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let head_counter_1 = client.new_counter(Operation::HeadObject);
Expand Down Expand Up @@ -1591,8 +1581,7 @@ mod tests {
bucket: bucket.to_owned(),
part_size: PART_SIZE,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let key = "key1";
Expand Down
3 changes: 1 addition & 2 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,7 @@ mod tests {
part_size: 8 * 1024 * 1024,
bucket: "test_bucket".to_owned(),
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
};
let client = ThroughputMockClient::new(config, rate_gbps);

Expand Down
15 changes: 14 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,20 @@ pub trait GetObjectRequest:
{
type ClientError: std::error::Error + Send + Sync + 'static;

/// Increase current read window for backpressure read.
/// Increment the flow-control window, so that response data continues downloading.
///
/// If the client was created with `enable_read_backpressure` set true,
/// each meta request has a flow-control window that shrinks as response
/// body data is downloaded (headers do not affect the size of the window).
/// The client's `initial_read_window` determines the starting size of each meta request's window.
/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
/// Maintain a larger window to keep up a high download throughput,
/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
/// Maintain a smaller window to limit the amount of data buffered in memory.
///
/// If `enable_read_backpressure` is false this call will have no effect,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn increment_read_window(self: Pin<&mut Self>, len: usize);
}

Expand Down
11 changes: 4 additions & 7 deletions mountpoint-s3-client/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aws_smithy_runtime_api::client::orchestrator::HttpResponse;
use bytes::Bytes;
use futures::{pin_mut, Stream, StreamExt};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::object_client::GetObjectRequest;
use mountpoint_s3_client::types::GetObjectRequest;
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter;
use rand::rngs::OsRng;
Expand Down Expand Up @@ -194,14 +194,13 @@ pub async fn check_get_result<E: std::fmt::Debug>(
}

/// Check the result of a GET against expected bytes.
pub async fn check_back_pressure_get_result(
pub async fn check_backpressure_get_result(
read_window: usize,
result: impl GetObjectRequest,
range: Option<Range<u64>>,
expected: &[u8],
) {
let mut accum_read_window = read_window;
let mut accum_len = 0;
let mut accum = vec![];
let mut next_offset = range.map(|r| r.start).unwrap_or(0);
pin_mut!(result);
Expand All @@ -211,10 +210,9 @@ pub async fn check_back_pressure_get_result(
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);

accum_len += body.len();
// We run out of data to read if read window is smaller than accum length of data,
// so we keeping adding window size, otherwise the request will be blocked.
while accum_read_window <= accum_len {
while accum_read_window <= accum.len() {
result.as_mut().increment_read_window(read_window);
accum_read_window += read_window;
}
Expand Down Expand Up @@ -246,8 +244,7 @@ macro_rules! object_client_test {
bucket: bucket.to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
});

let key = format!("{prefix}hello");
Expand Down
58 changes: 56 additions & 2 deletions mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ pub mod common;
use std::ops::Range;
use std::option::Option::None;
use std::str::FromStr;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::thread;
use std::time::Duration;

use aws_sdk_s3::primitives::ByteStream;
use bytes::Bytes;
Expand Down Expand Up @@ -78,15 +81,66 @@ async fn test_get_object_backpressure(size: usize, range: Option<Range<u64>>) {
let initial_window_size = 8 * 1024 * 1024;
let client: S3CrtClient = get_test_backpressure_client(initial_window_size);

let result = client
let request = client
.get_object(&bucket, &key, range.clone(), None)
.await
.expect("get_object should succeed");
let expected = match range {
Some(Range { start, end }) => &body[start as usize..end as usize],
None => &body,
};
check_back_pressure_get_result(initial_window_size, result, range, expected).await;
check_backpressure_get_result(initial_window_size, request, range, expected).await;
}

// Verify that the request is blocked when we don't increment read window size
#[tokio::test]
async fn verify_backpressure_get_object() {
let size = 1000;
let range = 50..1000;
let sdk_client = get_test_sdk_client().await;
let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object");

let key = format!("{prefix}/test");
let body = vec![0x42; size];
sdk_client
.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(body.clone()))
.send()
.await
.unwrap();

let initial_window_size = 256;
let client: S3CrtClient = get_test_backpressure_client(initial_window_size);

let mut get_request = client
.get_object("test_bucket", &key, Some(range.clone()), None)
.await
.expect("should not fail");

let mut accum = vec![];
let mut next_offset = range.start;

let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
futures::executor::block_on(async move {
while let Some(r) = get_request.next().await {
let (offset, body) = r.unwrap();
assert_eq!(offset, next_offset, "wrong body part offset");
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);
}
let expected_range = range;
let expected_range = expected_range.start as usize..expected_range.end as usize;
assert_eq!(&accum[..], &body[expected_range], "body does not match");
sender.send(accum).unwrap();
})
});
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(_) => panic!("request should have been blocked"),
Err(e) => assert_eq!(e, RecvTimeoutError::Timeout),
}
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ impl MetaRequest {

/// Increment the flow-control windows size.
pub fn increment_read_window(&mut self, bytes: u64) {
// SAFETY: `self.inner` is a valid `aws_s3_meta_request`.
// SAFETY: `self.inner` is a valid `aws_s3_meta_request` since we hold a ref count to it.
unsafe { aws_s3_meta_request_increment_read_window(self.inner.as_ptr(), bytes) };
}
}
Expand Down
3 changes: 1 addition & 2 deletions mountpoint-s3/src/bin/mock-mount-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T
bucket: args.bucket_name.clone(),
part_size: args.part_size as usize,
unordered_list_seed: None,
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
};
let client = ThroughputMockClient::new(config, max_throughput_gbps);

Expand Down
3 changes: 1 addition & 2 deletions mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2309,8 +2309,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024 * 1024,
unordered_list_seed: (!ordered).then_some(123456),
enable_back_pressure: false,
initial_read_window_size: 0,
..Default::default()
};
let client = Arc::new(MockClient::new(client_config));

Expand Down

0 comments on commit 5851790

Please sign in to comment.