Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make s3 client able to report read window offset #971

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## Unreleased

### Breaking changes

* When using GetObject with backpressure enabled, an error will be returned when there is not enough read window instead of blocking. ([#971](https://github.com/awslabs/mountpoint-s3/pull/971))

### Other changes

* Allow querying initial read window size and read window end offset for backpressure GetObject. ([#971](https://github.com/awslabs/mountpoint-s3/pull/971))

## v0.9.0 (June 26, 2024)

* Adds support for `AWS_ENDPOINT_URL` environment variable. ([#895](https://github.com/awslabs/mountpoint-s3/pull/895))
Expand Down
9 changes: 9 additions & 0 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ where
self.client.write_part_size()
}

fn initial_read_window_size(&self) -> Option<usize> {
self.client.initial_read_window_size()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down Expand Up @@ -188,6 +192,11 @@ impl<Client: ObjectClient, FailState: Send> GetObjectRequest for FailureGetReque
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl<Client: ObjectClient, FailState> Stream for FailureGetRequest<Client, FailState> {
Expand Down
128 changes: 64 additions & 64 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub struct MockClientConfig {
/// A seed to randomize the order of ListObjectsV2 results, or None to use ordered list
pub unordered_list_seed: Option<u64>,
/// A flag to enable backpressure read
pub enable_back_pressure: bool,
pub enable_backpressure: bool,
/// Initial backpressure read window size, ignored if enable_back_pressure is false
pub initial_read_window_size: usize,
}
Expand Down Expand Up @@ -475,8 +475,8 @@ pub struct MockGetObjectRequest {
next_offset: u64,
length: usize,
part_size: usize,
enable_back_pressure: bool,
current_window_size: usize,
enable_backpressure: bool,
read_window_end_offset: u64,
}

impl MockGetObjectRequest {
Expand All @@ -498,7 +498,11 @@ impl GetObjectRequest for MockGetObjectRequest {
type ClientError = MockClientError;

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.current_window_size += len;
self.read_window_end_offset += len as u64;
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
self.read_window_end_offset
}
}

Expand All @@ -510,15 +514,13 @@ impl Stream for MockGetObjectRequest {
return Poll::Ready(None);
}

let mut next_read_size = self.part_size.min(self.length);
let next_read_size = self.part_size.min(self.length);

// Simulate backpressure mechanism
if self.enable_back_pressure {
if self.current_window_size == 0 {
return Poll::Pending;
}
next_read_size = self.current_window_size.min(next_read_size);
self.current_window_size -= next_read_size;
if self.enable_backpressure && self.next_offset >= self.read_window_end_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
}
let next_part = self.object.read(self.next_offset, next_read_size);

Expand Down Expand Up @@ -562,6 +564,14 @@ impl ObjectClient for MockClient {
Some(self.config.part_size)
}

fn initial_read_window_size(&self) -> Option<usize> {
if self.config.enable_backpressure {
Some(self.config.initial_read_window_size)
} else {
None
}
}

async fn delete_object(
&self,
bucket: &str,
Expand Down Expand Up @@ -616,8 +626,8 @@ impl ObjectClient for MockClient {
next_offset,
length,
part_size: self.config.part_size,
enable_back_pressure: self.config.enable_back_pressure,
current_window_size: self.config.initial_read_window_size,
enable_backpressure: self.config.enable_backpressure,
read_window_end_offset: next_offset + self.config.initial_read_window_size as u64,
})
} else {
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey))
Expand Down Expand Up @@ -908,18 +918,25 @@ enum MockObjectParts {

#[cfg(test)]
mod tests {
use std::{
sync::mpsc::{self, RecvTimeoutError},
thread,
};

use futures::{pin_mut, StreamExt};
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use test_case::test_case;

use super::*;

macro_rules! assert_client_error {
($e:expr, $err:expr) => {
let err = $e.expect_err("should fail");
match err {
ObjectClientError::ClientError(MockClientError(m)) => {
assert_eq!(&*m, $err);
}
_ => assert!(false, "wrong error type"),
}
};
}

async fn test_get_object(key: &str, size: usize, range: Option<Range<u64>>) {
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

Expand Down Expand Up @@ -971,7 +988,7 @@ mod tests {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: true,
enable_backpressure: true,
initial_read_window_size: backpressure_read_window_size,
});

Expand All @@ -992,9 +1009,12 @@ mod tests {
assert_eq!(offset, next_offset, "wrong body part offset");
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);

while next_offset >= get_request.as_ref().read_window_end_offset() {
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);
}
}
let expected_range = range.unwrap_or(0..size as u64);
let expected_range = expected_range.start as usize..expected_range.end as usize;
Expand Down Expand Up @@ -1024,18 +1044,6 @@ mod tests {
rng.fill_bytes(&mut body);
client.add_object("key1", body[..].into());

macro_rules! assert_client_error {
($e:expr, $err:expr) => {
let err = $e.expect_err("should fail");
match err {
ObjectClientError::ClientError(MockClientError(m)) => {
assert_eq!(&*m, $err);
}
_ => assert!(false, "wrong error type"),
}
};
}

assert!(matches!(
client.get_object("wrong_bucket", "key1", None, None).await,
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchBucket))
Expand Down Expand Up @@ -1068,53 +1076,45 @@ mod tests {
);
}

// Verify that the request is blocked when we don't increment read window size
// Verify that an error is returned when we don't increment read window size
#[tokio::test]
async fn verify_backpressure_get_object() {
let key = "key1";
let size = 1000;
let range = 50..1000;
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

let mut rng = ChaChaRng::seed_from_u64(0x12345678);
let client = MockClient::new(MockClientConfig {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
enable_back_pressure: true,
enable_backpressure: true,
initial_read_window_size: 256,
});

let mut body = vec![0u8; size];
rng.fill_bytes(&mut body);
client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests()));
let part_size = client.read_part_size().unwrap();
let size = part_size * 2;
let range = 0..(part_size + 1) as u64;

let mut expected_body = vec![0u8; size];
rng.fill_bytes(&mut expected_body);
client.add_object(key, MockObject::from_bytes(&expected_body, ETag::for_tests()));

let mut get_request = client
.get_object("test_bucket", key, Some(range.clone()), None)
.await
.expect("should not fail");

let mut accum = vec![];
let mut next_offset = range.start;

let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
futures::executor::block_on(async move {
while let Some(r) = get_request.next().await {
let (offset, body) = r.unwrap();
assert_eq!(offset, next_offset, "wrong body part offset");
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);
}
let expected_range = range;
let expected_range = expected_range.start as usize..expected_range.end as usize;
assert_eq!(&accum[..], &body[expected_range], "body does not match");
sender.send(accum).unwrap();
})
});
match receiver.recv_timeout(Duration::from_millis(100)) {
Ok(_) => panic!("request should have been blocked"),
Err(e) => assert_eq!(e, RecvTimeoutError::Timeout),
}
// Verify that we can receive some data since the window size is more than 0
let first_part = get_request.next().await.expect("result should not be empty");
let (offset, body) = first_part.unwrap();
assert_eq!(offset, 0, "wrong body part offset");

// The CRT always return at least a part even if the window is smaller than that
let expected_range = range.start as usize..part_size;
assert_eq!(&body[..], &expected_body[expected_range]);

// This await should return an error because current window is not enough to get the next part
let next = get_request.next().await.expect("result should not be empty");
assert_client_error!(next, "empty read window");
}

#[tokio::test]
Expand Down
9 changes: 9 additions & 0 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl GetObjectRequest for ThroughputGetObjectRequest {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl Stream for ThroughputGetObjectRequest {
Expand Down Expand Up @@ -105,6 +110,10 @@ impl ObjectClient for ThroughputMockClient {
self.inner.write_part_size()
}

fn initial_read_window_size(&self) -> Option<usize> {
self.inner.initial_read_window_size()
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
8 changes: 8 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ pub trait ObjectClient {
/// can be `None` if the client does not do multi-part operations.
fn write_part_size(&self) -> Option<usize>;

/// Query the initial read window size this client uses for backpressure GetObject requests.
/// This can be `None` if backpressure is disabled.
fn initial_read_window_size(&self) -> Option<usize>;

/// Delete a single object from the object store.
///
/// DeleteObject will succeed even if the object within the bucket does not exist.
Expand Down Expand Up @@ -378,6 +382,10 @@ pub trait GetObjectRequest:
/// If `enable_read_backpressure` is false this call will have no effect,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn increment_read_window(self: Pin<&mut Self>, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// A streaming put request which allows callers to asynchronously write the body of the request.
Expand Down
22 changes: 19 additions & 3 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ use pin_project::{pin_project, pinned_drop};
use thiserror::Error;
use tracing::{debug, error, trace, Span};

use self::get_object::S3GetObjectRequest;
use self::put_object::S3PutObjectRequest;
use crate::endpoint_config::EndpointError;
use crate::endpoint_config::{self, EndpointConfig};
use crate::object_client::*;
use crate::user_agent::UserAgent;
use crate::{object_client::*, S3GetObjectRequest, S3PutObjectRequest};

macro_rules! request_span {
($self:expr, $method:expr, $($field:tt)*) => {{
Expand Down Expand Up @@ -267,6 +265,8 @@ struct S3CrtClientInner {
request_payer: Option<String>,
read_part_size: usize,
write_part_size: usize,
enable_backpressure: bool,
initial_read_window_size: usize,
bucket_owner: Option<String>,
credentials_provider: Option<CredentialsProvider>,
host_resolver: HostResolver,
Expand Down Expand Up @@ -395,6 +395,8 @@ impl S3CrtClientInner {
request_payer: config.request_payer,
read_part_size: config.read_part_size,
write_part_size: config.write_part_size,
enable_backpressure: config.read_backpressure,
initial_read_window_size: config.initial_read_window,
bucket_owner: config.bucket_owner,
credentials_provider: Some(credentials_provider),
host_resolver,
Expand Down Expand Up @@ -974,6 +976,12 @@ pub enum S3RequestError {
/// The request was throttled by S3
#[error("Request throttled")]
Throttled,

/// Cannot fetch more data because current read window is exhausted. The read window must
/// be advanced using [GetObjectRequest::increment_read_window(u64)] to continue fetching
/// new data.
#[error("Polled for data with empty read window")]
EmptyReadWindow,
}

impl S3RequestError {
Expand Down Expand Up @@ -1178,6 +1186,14 @@ impl ObjectClient for S3CrtClient {
Some(self.inner.write_part_size)
}

fn initial_read_window_size(&self) -> Option<usize> {
if self.inner.enable_backpressure {
Some(self.inner.initial_read_window_size)
} else {
None
}
}

async fn delete_object(
&self,
bucket: &str,
Expand Down
Loading
Loading