Skip to content

Commit

Permalink
Add support for single PutObject in mountpoint-s3-client (#1046)
Browse files Browse the repository at this point in the history
* Reintroduce the CRT InputStream as an option for the Message body

Signed-off-by: Alessandro Passaro <[email protected]>

* Implement put_object

Signed-off-by: Alessandro Passaro <[email protected]>

* Address PR feedback

Signed-off-by: Alessandro Passaro <[email protected]>

* Tidy up comments and tests on InputStream

Signed-off-by: Alessandro Passaro <[email protected]>

* Add comments to PutObjectTrailingChecksums and S3Operation

Signed-off-by: Alessandro Passaro <[email protected]>

* Introduce separate params type

Signed-off-by: Alessandro Passaro <[email protected]>

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Oct 11, 2024
1 parent 0415b5c commit 9ea9c7e
Show file tree
Hide file tree
Showing 14 changed files with 811 additions and 72 deletions.
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/checksums.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Provides base64 encoding/decoding for CRC32C checksums.
use mountpoint_s3_crt::checksums::crc32c::Crc32c;
pub use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c};

use base64ct::Base64;
use base64ct::Encoding;
Expand Down
16 changes: 13 additions & 3 deletions mountpoint-s3-client/src/failure_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ use pin_project::pin_project;
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest,
PutObjectResult, UploadReview,
ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams,
PutObjectRequest, PutObjectResult, PutObjectSingleParams, UploadReview,
};
use crate::ObjectClient;

// Wrapper for injecting failures into a get stream or a put request
pub struct FailureRequestWrapper<Client: ObjectClient, RequestWrapperState> {
Expand Down Expand Up @@ -167,6 +166,17 @@ where
})
}

async fn put_object_single<'a>(
&self,
bucket: &str,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
// TODO failure hook for put_object_single
self.client.put_object_single(bucket, key, params, contents).await
}

async fn get_object_attributes(
&self,
bucket: &str,
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts,
GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums,
RestoreStatus, UploadReview, UploadReviewPart,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};
}

Expand Down
51 changes: 50 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::object_client::{
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError,
HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError,
ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -341,6 +341,7 @@ pub enum Operation {
GetObjectAttributes,
ListObjectsV2,
PutObject,
PutObjectSingle,
}

/// Counter for a specific client [Operation].
Expand Down Expand Up @@ -714,6 +715,29 @@ impl ObjectClient for MockClient {
Ok(put_request)
}

async fn put_object_single<'a>(
&self,
bucket: &str,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
trace!(bucket, key, "PutObject");
self.inc_op_count(Operation::PutObjectSingle);

if bucket != self.config.bucket {
return Err(ObjectClientError::ServiceError(PutObjectError::NoSuchBucket));
}

let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
add_object(&self.objects, key, object);
Ok(PutObjectResult {
sse_type: None,
sse_kms_key_id: None,
})
}

async fn get_object_attributes(
&self,
bucket: &str,
Expand Down Expand Up @@ -1508,6 +1532,31 @@ mod tests {
}
}

#[tokio::test]
async fn test_put_object_single() {
let client = MockClient::new(MockClientConfig {
bucket: "test_bucket".to_string(),
part_size: 1024,
unordered_list_seed: None,
..Default::default()
});

let content = vec![42u8; 512];
let _put_result = client
.put_object_single("test_bucket", "key1", &Default::default(), &content)
.await
.expect("put_object failed");

let get_request = client
.get_object("test_bucket", "key1", None, None)
.await
.expect("get_object failed");

// Check that the result of get_object is correct.
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(&content, &*actual);
}

proptest::proptest! {
#[test]
fn test_ramp(size in 1..2*RAMP_BUFFER_SIZE, read_size in 1..2*RAMP_BUFFER_SIZE, offset in 0..RAMP_BUFFER_SIZE) {
Expand Down
22 changes: 16 additions & 6 deletions mountpoint-s3-client/src/mock_client/throughput_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use pin_project::pin_project;

use crate::mock_client::leaky_bucket::LeakyBucket;
use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, MockPutObjectRequest};
use crate::mock_client::{
MockClient, MockClientConfig, MockClientError, MockGetObjectRequest, MockObject, MockPutObjectRequest,
};
use crate::object_client::{
DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult,
GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams,
ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult,
PutObjectSingleParams,
};
use crate::types::ETag;

use super::MockGetObjectRequest;

/// A [MockClient] that rate limits overall download throughput to simulate a target network
/// performance without the jitter or service latency of targeting a real service. Note that while
Expand Down Expand Up @@ -168,6 +168,16 @@ impl ObjectClient for ThroughputMockClient {
self.inner.put_object(bucket, key, params).await
}

async fn put_object_single<'a>(
&self,
bucket: &str,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
self.inner.put_object_single(bucket, key, params, contents).await
}

async fn get_object_attributes(
&self,
bucket: &str,
Expand Down
71 changes: 71 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ pub trait ObjectClient {
params: &PutObjectParams,
) -> ObjectClientResult<Self::PutObjectRequest, PutObjectError, Self::ClientError>;

/// Put an object into the object store.
async fn put_object_single<'a>(
&self,
bucket: &str,
key: &str,
params: &PutObjectSingleParams,
contents: impl AsRef<[u8]> + Send + 'a,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError>;

/// Retrieves all the metadata from an object without returning the object contents.
async fn get_object_attributes(
&self,
Expand Down Expand Up @@ -361,6 +370,68 @@ pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart;
/// A checksum algorithm used by the object client for integrity checks on uploads and downloads.
pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm;

/// Parameters to a [`put_object_single`](ObjectClient::put_object_single) request
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectSingleParams {
/// User-provided checksum of the data to upload.
pub checksum: Option<UploadChecksum>,
/// Storage class to be used when creating new S3 object
pub storage_class: Option<String>,
/// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
pub server_side_encryption: Option<String>,
/// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used
/// when creating new S3 object
pub ssekms_key_id: Option<String>,
}

impl PutObjectSingleParams {
/// Create a default [PutObjectSingleParams].
pub fn new() -> Self {
Self::default()
}

/// Set checksum.
pub fn checksum(mut self, value: Option<UploadChecksum>) -> Self {
self.checksum = value;
self
}

/// Set the storage class.
pub fn storage_class(mut self, value: String) -> Self {
self.storage_class = Some(value);
self
}

/// Set server-side encryption type.
pub fn server_side_encryption(mut self, value: Option<String>) -> Self {
self.server_side_encryption = value;
self
}

/// Set KMS key ID to be used for server-side encryption.
pub fn ssekms_key_id(mut self, value: Option<String>) -> Self {
self.ssekms_key_id = value;
self
}
}

/// A checksum used by the object client for integrity checks on uploads.
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum UploadChecksum {
Crc32c(crate::checksums::Crc32c),
}

impl UploadChecksum {
/// The checksum algorithm used to compute this checksum.
pub fn checksum_algorithm(&self) -> ChecksumAlgorithm {
match self {
UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c,
}
}
}

/// A streaming response to a GetObject request.
///
/// This struct implements [`futures::Stream`], which you can use to read the body of the object.
Expand Down
Loading

4 comments on commit 9ea9c7e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9ea9c7e Previous: 0415b5c Ratio
seq_read_4t_direct 30896.421875 MiB 4402.78125 MiB 7.02
seq_read_skip_17m 11689.83984375 MiB 3317.32421875 MiB 3.52

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9ea9c7e Previous: 0415b5c Ratio
seq_read_4t_direct 29927.77734375 MiB 4402.78125 MiB 6.80

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9ea9c7e Previous: 0415b5c Ratio
seq_read_4t_direct 30067.43359375 MiB 4402.78125 MiB 6.83

This comment was automatically generated by workflow using github-action-benchmark.

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 9ea9c7e Previous: 0415b5c Ratio
seq_read_4t_direct 24480.8671875 MiB 4402.78125 MiB 5.56

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.