Skip to content

Commit

Permalink
Improve get_object interface for backpressure (#1200)
Browse files Browse the repository at this point in the history
Currently, we support flow-control window for GetObject requests by
allowing applications to call `GetObjectResponse::increment_read_window`
but it is tricky to use because we need to hold onto the stream itself
in order to control the feedback loop while also consuming the data.

This change introduces a new trait `ClientBackpressureHandle` for
controlling the read window so that the stream and the flow-control
paths are decoupled.

Applications can now call `GetObjectResponse::take_backpressure_handle`
to get a backpressure handle from the response and use this handle to
extend the read window.

### Does this change impact existing behavior?

Yes, there is a breaking change for `mountpoint-s3-client`.

### Does this change need a changelog entry?

Yes, for `mountpoint-s3-client`.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk authored Dec 17, 2024
1 parent f09ac0c commit d5b36e8
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 127 deletions.
3 changes: 3 additions & 0 deletions mountpoint-s3-client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* `get_object` method now requires a `GetObjectParams` parameter.
Two of the existing parameters, `range` and `if_match` have been moved to `GetObjectParams`.
([#1121](https://github.com/awslabs/mountpoint-s3/pull/1121))
* `increment_read_window` and `read_window_end_offset` methods have been removed from `GetObjectResponse`.
`ClientBackpressureHandle` can be used to interact with flow-control window instead, it can be retrieved from `backpressure_handle` method.
([#1200](https://github.com/awslabs/mountpoint-s3/pull/1200))
* `head_object` method now requires a `HeadObjectParams` parameter.
The structure itself is not required to specify anything to achieve the existing behavior.
([#1083](https://github.com/awslabs/mountpoint-s3/pull/1083))
Expand Down
15 changes: 5 additions & 10 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,20 @@ pub struct FailureGetResponse<Client: ObjectClient, GetWrapperState> {
impl<Client: ObjectClient + Send + Sync, FailState: Send + Sync> GetObjectResponse
for FailureGetResponse<Client, FailState>
{
type BackpressureHandle = <<Client as ObjectClient>::GetObjectResponse as GetObjectResponse>::BackpressureHandle;
type ClientError = Client::ClientError;

fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.request.backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.request.get_object_metadata()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
self.request.get_object_checksum()
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl<Client: ObjectClient, FailState> Stream for FailureGetResponse<Client, FailState> {
Expand Down
10 changes: 5 additions & 5 deletions mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ pub mod config {
/// Types used by all object clients
pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, ChecksumMode, CopyObjectParams, CopyObjectResult, DeleteObjectResult, ETag,
GetBodyPart, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectParams, GetObjectResponse,
HeadObjectParams, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult, ObjectInfo,
ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus,
UploadChecksum, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, ChecksumMode, ClientBackpressureHandle, CopyObjectParams, CopyObjectResult,
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectParams,
GetObjectResponse, HeadObjectParams, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums,
RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};
}

Expand Down
86 changes: 56 additions & 30 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::{Duration, SystemTime};
Expand All @@ -26,13 +27,13 @@ use crate::checksums::{
};
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use crate::object_client::{
Checksum, ChecksumAlgorithm, ChecksumMode, CopyObjectError, CopyObjectParams, CopyObjectResult, DeleteObjectError,
DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectResponse, HeadObjectError, HeadObjectParams,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectChecksumError, ObjectClient,
ObjectClientError, ObjectClientResult, ObjectInfo, ObjectMetadata, ObjectPart, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus,
UploadChecksum, UploadReview, UploadReviewPart,
Checksum, ChecksumAlgorithm, ChecksumMode, ClientBackpressureHandle, CopyObjectError, CopyObjectParams,
CopyObjectResult, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectParams, GetObjectResponse,
HeadObjectError, HeadObjectParams, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute,
ObjectChecksumError, ObjectClient, ObjectClientError, ObjectClientResult, ObjectInfo, ObjectMetadata, ObjectPart,
PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -668,15 +669,33 @@ fn validate_checksum(
}
Ok(provided_checksum)
}
#[derive(Clone, Debug)]
pub struct MockBackpressureHandle {
read_window_end_offset: Arc<AtomicU64>,
}

impl ClientBackpressureHandle for MockBackpressureHandle {
fn increment_read_window(&mut self, len: usize) {
self.read_window_end_offset.fetch_add(len as u64, Ordering::SeqCst);
}

fn ensure_read_window(&mut self, desired_end_offset: u64) {
let diff = desired_end_offset.saturating_sub(self.read_window_end_offset()) as usize;
self.increment_read_window(diff);
}

fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset.load(Ordering::SeqCst)
}
}

#[derive(Debug)]
pub struct MockGetObjectResponse {
object: MockObject,
next_offset: u64,
length: usize,
part_size: usize,
enable_backpressure: bool,
read_window_end_offset: u64,
backpressure_handle: Option<MockBackpressureHandle>,
}

impl MockGetObjectResponse {
Expand All @@ -696,23 +715,20 @@ impl MockGetObjectResponse {

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectResponse for MockGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.backpressure_handle.as_mut()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.object.object_metadata.clone()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
Ok(self.object.checksum.clone())
}

fn increment_read_window(mut self: Pin<&mut Self>, len: usize) {
self.read_window_end_offset += len as u64;
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
self.read_window_end_offset
}
}

impl Stream for MockGetObjectResponse {
Expand All @@ -726,10 +742,12 @@ impl Stream for MockGetObjectResponse {
let next_read_size = self.part_size.min(self.length);

// Simulate backpressure mechanism
if self.enable_backpressure && self.next_offset >= self.read_window_end_offset {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
if let Some(handle) = &self.backpressure_handle {
if self.next_offset >= handle.read_window_end_offset() {
return Poll::Ready(Some(Err(ObjectClientError::ClientError(MockClientError(
"empty read window".into(),
)))));
}
}
let next_part = self.object.read(self.next_offset, next_read_size);

Expand Down Expand Up @@ -855,13 +873,20 @@ impl ObjectClient for MockClient {
(0, object.len())
};

let backpressure_handle = if self.config.enable_backpressure {
let read_window_end_offset = Arc::new(AtomicU64::new(
next_offset + self.config.initial_read_window_size as u64,
));
Some(MockBackpressureHandle { read_window_end_offset })
} else {
None
};
Ok(MockGetObjectResponse {
object: object.clone(),
next_offset,
length,
part_size: self.config.part_size,
enable_backpressure: self.config.enable_backpressure,
read_window_end_offset: next_offset + self.config.initial_read_window_size as u64,
backpressure_handle,
})
} else {
Err(ObjectClientError::ServiceError(GetObjectError::NoSuchKey))
Expand Down Expand Up @@ -1199,7 +1224,7 @@ enum MockObjectParts {

#[cfg(test)]
mod tests {
use futures::{pin_mut, StreamExt};
use futures::StreamExt;
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::ops::Range;
Expand Down Expand Up @@ -1295,11 +1320,14 @@ mod tests {
rng.fill_bytes(&mut body);
client.add_object(key, MockObject::from_bytes(&body, ETag::for_tests()));

let get_request = client
let mut get_request = client
.get_object("test_bucket", key, &GetObjectParams::new().range(range.clone()))
.await
.expect("should not fail");
pin_mut!(get_request);
let mut backpressure_handle = get_request
.backpressure_handle()
.cloned()
.expect("should be able to get a backpressure handle");

let mut accum = vec![];
let mut next_offset = range.as_ref().map(|r| r.start).unwrap_or(0);
Expand All @@ -1309,10 +1337,8 @@ mod tests {
next_offset += body.len() as u64;
accum.extend_from_slice(&body[..]);

while next_offset >= get_request.as_ref().read_window_end_offset() {
get_request
.as_mut()
.increment_read_window(backpressure_read_window_size);
while next_offset >= backpressure_handle.read_window_end_offset() {
backpressure_handle.increment_read_window(backpressure_read_window_size);
}
}
let expected_range = range.unwrap_or(0..size as u64);
Expand Down
27 changes: 12 additions & 15 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::object_client::{
PutObjectResult, PutObjectSingleParams,
};

use super::MockBackpressureHandle;

/// A [MockClient] that rate limits overall download throughput to simulate a target network
/// performance without the jitter or service latency of targeting a real service. Note that while
/// the rate limit is shared by all downloading streams, there is no fairness, so some streams can
Expand Down Expand Up @@ -60,36 +62,31 @@ impl ThroughputMockClient {
}

#[pin_project]
pub struct ThroughputGetObjectRequest {
pub struct ThroughputGetObjectResponse {
#[pin]
request: MockGetObjectResponse,
rate_limiter: LeakyBucket,
}

#[cfg_attr(not(docsrs), async_trait)]
impl GetObjectResponse for ThroughputGetObjectRequest {
impl GetObjectResponse for ThroughputGetObjectResponse {
type BackpressureHandle = MockBackpressureHandle;
type ClientError = MockClientError;

fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle> {
self.request.backpressure_handle()
}

fn get_object_metadata(&self) -> ObjectMetadata {
self.request.object.object_metadata.clone()
}

fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError> {
Ok(self.request.object.checksum.clone())
}

fn increment_read_window(self: Pin<&mut Self>, len: usize) {
let this = self.project();
this.request.increment_read_window(len);
}

fn read_window_end_offset(self: Pin<&Self>) -> u64 {
let this = self.project_ref();
this.request.read_window_end_offset()
}
}

impl Stream for ThroughputGetObjectRequest {
impl Stream for ThroughputGetObjectResponse {
type Item = ObjectClientResult<GetBodyPart, GetObjectError, MockClientError>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand All @@ -107,7 +104,7 @@ impl Stream for ThroughputGetObjectRequest {

#[async_trait]
impl ObjectClient for ThroughputMockClient {
type GetObjectResponse = ThroughputGetObjectRequest;
type GetObjectResponse = ThroughputGetObjectResponse;
type PutObjectRequest = MockPutObjectRequest;
type ClientError = MockClientError;

Expand Down Expand Up @@ -156,7 +153,7 @@ impl ObjectClient for ThroughputMockClient {
) -> ObjectClientResult<Self::GetObjectResponse, GetObjectError, Self::ClientError> {
let request = self.inner.get_object(bucket, key, params).await?;
let rate_limiter = self.rate_limiter.clone();
Ok(ThroughputGetObjectRequest { request, rate_limiter })
Ok(ThroughputGetObjectResponse { request, rate_limiter })
}

async fn list_objects(
Expand Down
51 changes: 30 additions & 21 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::fmt::{self, Debug};
use std::ops::Range;
use std::pin::Pin;
use std::time::SystemTime;

use async_trait::async_trait;
Expand Down Expand Up @@ -586,6 +585,29 @@ impl UploadChecksum {
}
}

/// A handle for controlling backpressure enabled requests.
///
/// If the client was created with `enable_read_backpressure` set true,
/// each meta request has a flow-control window that shrinks as response
/// body data is downloaded (headers do not affect the size of the window).
/// The client's `initial_read_window` determines the starting size of each meta request's window.
/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
/// Maintain a larger window to keep up a high download throughput,
/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
/// Maintain a smaller window to limit the amount of data buffered in memory.
pub trait ClientBackpressureHandle {
/// Increment the flow-control read window, so that response data continues downloading.
fn increment_read_window(&mut self, len: usize);

/// Move the upper bound of the read window to the given offset if it's not already there.
fn ensure_read_window(&mut self, desired_end_offset: u64);

/// Get the upper bound of the read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(&self) -> u64;
}

/// A streaming response to a GetObject request.
///
/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
Expand All @@ -595,33 +617,20 @@ impl UploadChecksum {
pub trait GetObjectResponse:
Stream<Item = ObjectClientResult<GetBodyPart, GetObjectError, Self::ClientError>> + Send + Sync
{
type BackpressureHandle: ClientBackpressureHandle + Clone + Send + Sync;
type ClientError: std::error::Error + Send + Sync + 'static;

/// Take the backpressure handle from the response.
///
/// If `enable_read_backpressure` is false this call will return `None`,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn backpressure_handle(&mut self) -> Option<&mut Self::BackpressureHandle>;

/// Get the object's user defined metadata.
fn get_object_metadata(&self) -> ObjectMetadata;

/// Get the object's checksum, if uploaded with one
fn get_object_checksum(&self) -> Result<Checksum, ObjectChecksumError>;

/// Increment the flow-control window, so that response data continues downloading.
///
/// If the client was created with `enable_read_backpressure` set true,
/// each meta request has a flow-control window that shrinks as response
/// body data is downloaded (headers do not affect the size of the window).
/// The client's `initial_read_window` determines the starting size of each meta request's window.
/// If a meta request's flow-control window reaches 0, no further data will be downloaded.
/// If the `initial_read_window` is 0, the request will not start until the window is incremented.
/// Maintain a larger window to keep up a high download throughput,
/// parts cannot download in parallel unless the window is large enough to hold multiple parts.
/// Maintain a smaller window to limit the amount of data buffered in memory.
///
/// If `enable_read_backpressure` is false this call will have no effect,
/// no backpressure is being applied and data is being downloaded as fast as possible.
fn increment_read_window(self: Pin<&mut Self>, len: usize);

/// Get the upper bound of the current read window. When backpressure is enabled, [GetObjectRequest] can
/// return data up to this offset *exclusively*.
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// Failures to return object checksum
Expand Down
Loading

0 comments on commit d5b36e8

Please sign in to comment.