diff --git a/mountpoint-s3-client/examples/client_benchmark.rs b/mountpoint-s3-client/examples/client_benchmark.rs index 8f846b604..0a402b05d 100644 --- a/mountpoint-s3-client/examples/client_benchmark.rs +++ b/mountpoint-s3-client/examples/client_benchmark.rs @@ -117,8 +117,7 @@ fn main() { bucket: BUCKET.to_owned(), part_size: args.part_size, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }; let client = ThroughputMockClient::new(config, args.throughput_target_gbps); let client = Arc::new(client); diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index 8b3372bf1..9954d6267 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -376,8 +376,7 @@ mod tests { bucket: bucket.to_string(), part_size: 128, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let body = vec![0u8; 50]; diff --git a/mountpoint-s3-client/src/lib.rs b/mountpoint-s3-client/src/lib.rs index 5fdfccd83..96443e6db 100644 --- a/mountpoint-s3-client/src/lib.rs +++ b/mountpoint-s3-client/src/lib.rs @@ -52,7 +52,7 @@ pub mod imds_crt_client; pub mod instance_info; #[doc(hidden)] pub mod mock_client; -pub mod object_client; +mod object_client; mod s3_crt_client; #[doc(hidden)] pub mod user_agent; @@ -71,9 +71,9 @@ pub mod config { pub mod types { pub use super::object_client::{ Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts, - GetObjectAttributesResult, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult, - ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, RestoreStatus, - UploadReview, UploadReviewPart, + GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute, + ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, + RestoreStatus, UploadReview, UploadReviewPart, }; } diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index f185c13ac..dd3c68c4a 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -916,8 +916,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let mut body = vec![0u8; size]; @@ -1007,8 +1006,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let mut body = vec![0u8; 2000]; @@ -1114,8 +1112,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let mut keys = vec![]; @@ -1204,8 +1201,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let mut keys = vec![]; @@ -1253,8 +1249,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: Some(1234), - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); for i in 0..20 { @@ -1328,8 +1323,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: Some(1234), - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); for i in 0..20 { @@ -1397,8 +1391,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: Some(1234), - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); for i in 0..20 { @@ -1465,8 +1458,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let mut put_request = client @@ -1522,8 +1514,7 @@ mod tests { bucket: bucket.to_owned(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let key = "key1"; @@ -1553,8 +1544,7 @@ mod tests { bucket: bucket.to_owned(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let head_counter_1 = client.new_counter(Operation::HeadObject); @@ -1591,8 +1581,7 @@ mod tests { bucket: bucket.to_owned(), part_size: PART_SIZE, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let key = "key1"; diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index a747ae4cc..932829a4f 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -187,8 +187,7 @@ mod tests { part_size: 8 * 1024 * 1024, bucket: "test_bucket".to_owned(), unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }; let client = ThroughputMockClient::new(config, rate_gbps); diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index ee23ff5a0..dbaea661a 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -346,7 +346,20 @@ pub trait GetObjectRequest: { type ClientError: std::error::Error + Send + Sync + 'static; - /// Increase current read window for backpressure read. + /// Increment the flow-control window, so that response data continues downloading. + /// + /// If the client was created with `enable_read_backpressure` set true, + /// each meta request has a flow-control window that shrinks as response + /// body data is downloaded (headers do not affect the size of the window). + /// The client's `initial_read_window` determines the starting size of each meta request's window. + /// If a meta request's flow-control window reaches 0, no further data will be downloaded. + /// If the `initial_read_window` is 0, the request will not start until the window is incremented. + /// Maintain a larger window to keep up a high download throughput, + /// parts cannot download in parallel unless the window is large enough to hold multiple parts. + /// Maintain a smaller window to limit the amount of data buffered in memory. + /// + /// If `enable_read_backpressure` is false this call will have no effect, + /// no backpressure is being applied and data is being downloaded as fast as possible. fn increment_read_window(self: Pin<&mut Self>, len: usize); } diff --git a/mountpoint-s3-client/tests/common/mod.rs b/mountpoint-s3-client/tests/common/mod.rs index 7cb096685..9ae04eec9 100644 --- a/mountpoint-s3-client/tests/common/mod.rs +++ b/mountpoint-s3-client/tests/common/mod.rs @@ -10,7 +10,7 @@ use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use bytes::Bytes; use futures::{pin_mut, Stream, StreamExt}; use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; -use mountpoint_s3_client::object_client::GetObjectRequest; +use mountpoint_s3_client::types::GetObjectRequest; use mountpoint_s3_client::S3CrtClient; use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; use rand::rngs::OsRng; @@ -194,14 +194,13 @@ pub async fn check_get_result( } /// Check the result of a GET against expected bytes. -pub async fn check_back_pressure_get_result( +pub async fn check_backpressure_get_result( read_window: usize, result: impl GetObjectRequest, range: Option>, expected: &[u8], ) { let mut accum_read_window = read_window; - let mut accum_len = 0; let mut accum = vec![]; let mut next_offset = range.map(|r| r.start).unwrap_or(0); pin_mut!(result); @@ -211,10 +210,9 @@ pub async fn check_back_pressure_get_result( next_offset += body.len() as u64; accum.extend_from_slice(&body[..]); - accum_len += body.len(); // We run out of data to read if read window is smaller than accum length of data, // so we keeping adding window size, otherwise the request will be blocked. - while accum_read_window <= accum_len { + while accum_read_window <= accum.len() { result.as_mut().increment_read_window(read_window); accum_read_window += read_window; } @@ -246,8 +244,7 @@ macro_rules! object_client_test { bucket: bucket.to_string(), part_size: 1024, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }); let key = format!("{prefix}hello"); diff --git a/mountpoint-s3-client/tests/get_object.rs b/mountpoint-s3-client/tests/get_object.rs index 120eec304..d56e79b15 100644 --- a/mountpoint-s3-client/tests/get_object.rs +++ b/mountpoint-s3-client/tests/get_object.rs @@ -5,6 +5,9 @@ pub mod common; use std::ops::Range; use std::option::Option::None; use std::str::FromStr; +use std::sync::mpsc::{self, RecvTimeoutError}; +use std::thread; +use std::time::Duration; use aws_sdk_s3::primitives::ByteStream; use bytes::Bytes; @@ -78,7 +81,7 @@ async fn test_get_object_backpressure(size: usize, range: Option>) { let initial_window_size = 8 * 1024 * 1024; let client: S3CrtClient = get_test_backpressure_client(initial_window_size); - let result = client + let request = client .get_object(&bucket, &key, range.clone(), None) .await .expect("get_object should succeed"); @@ -86,7 +89,58 @@ async fn test_get_object_backpressure(size: usize, range: Option>) { Some(Range { start, end }) => &body[start as usize..end as usize], None => &body, }; - check_back_pressure_get_result(initial_window_size, result, range, expected).await; + check_backpressure_get_result(initial_window_size, request, range, expected).await; +} + +// Verify that the request is blocked when we don't increment read window size +#[tokio::test] +async fn verify_backpressure_get_object() { + let size = 1000; + let range = 50..1000; + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object"); + + let key = format!("{prefix}/test"); + let body = vec![0x42; size]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(body.clone())) + .send() + .await + .unwrap(); + + let initial_window_size = 256; + let client: S3CrtClient = get_test_backpressure_client(initial_window_size); + + let mut get_request = client + .get_object("test_bucket", &key, Some(range.clone()), None) + .await + .expect("should not fail"); + + let mut accum = vec![]; + let mut next_offset = range.start; + + let (sender, receiver) = mpsc::channel(); + thread::spawn(move || { + futures::executor::block_on(async move { + while let Some(r) = get_request.next().await { + let (offset, body) = r.unwrap(); + assert_eq!(offset, next_offset, "wrong body part offset"); + next_offset += body.len() as u64; + accum.extend_from_slice(&body[..]); + } + let expected_range = range; + let expected_range = expected_range.start as usize..expected_range.end as usize; + assert_eq!(&accum[..], &body[expected_range], "body does not match"); + sender.send(accum).unwrap(); + }) + }); + match receiver.recv_timeout(Duration::from_millis(100)) { + Ok(_) => panic!("request should have been blocked"), + Err(e) => assert_eq!(e, RecvTimeoutError::Timeout), + } } #[tokio::test] diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index cc3e2910f..ceee178b8 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -565,7 +565,7 @@ impl MetaRequest { /// Increment the flow-control windows size. pub fn increment_read_window(&mut self, bytes: u64) { - // SAFETY: `self.inner` is a valid `aws_s3_meta_request`. + // SAFETY: `self.inner` is a valid `aws_s3_meta_request` since we hold a ref count to it. unsafe { aws_s3_meta_request_increment_read_window(self.inner.as_ptr(), bytes) }; } } diff --git a/mountpoint-s3/src/bin/mock-mount-s3.rs b/mountpoint-s3/src/bin/mock-mount-s3.rs index 35c3cba1b..2ed54debf 100644 --- a/mountpoint-s3/src/bin/mock-mount-s3.rs +++ b/mountpoint-s3/src/bin/mock-mount-s3.rs @@ -39,8 +39,7 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T bucket: args.bucket_name.clone(), part_size: args.part_size as usize, unordered_list_seed: None, - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }; let client = ThroughputMockClient::new(config, max_throughput_gbps); diff --git a/mountpoint-s3/src/inode.rs b/mountpoint-s3/src/inode.rs index 1783dc348..87d790849 100644 --- a/mountpoint-s3/src/inode.rs +++ b/mountpoint-s3/src/inode.rs @@ -2309,8 +2309,7 @@ mod tests { bucket: "test_bucket".to_string(), part_size: 1024 * 1024, unordered_list_seed: (!ordered).then_some(123456), - enable_back_pressure: false, - initial_read_window_size: 0, + ..Default::default() }; let client = Arc::new(MockClient::new(client_config));