From 9ea9c7ed421b4fa0878b9f680da5d2b5b96c77eb Mon Sep 17 00:00:00 2001 From: Alessandro Passaro Date: Fri, 11 Oct 2024 22:40:18 +0100 Subject: [PATCH] Add support for single PutObject in mountpoint-s3-client (#1046) * Reintroduce the CRT InputStream as an option for the Message body Signed-off-by: Alessandro Passaro * Implement put_object Signed-off-by: Alessandro Passaro * Address PR feedback Signed-off-by: Alessandro Passaro * Tidy up comments and tests on InputStream Signed-off-by: Alessandro Passaro * Add comments to PutObjectTrailingChecksums and S3Operation Signed-off-by: Alessandro Passaro * Introduce separate params type Signed-off-by: Alessandro Passaro --------- Signed-off-by: Alessandro Passaro --- mountpoint-s3-client/src/checksums.rs | 2 +- mountpoint-s3-client/src/failure_client.rs | 16 +- mountpoint-s3-client/src/lib.rs | 4 +- mountpoint-s3-client/src/mock_client.rs | 51 +++- .../src/mock_client/throughput_client.rs | 22 +- mountpoint-s3-client/src/object_client.rs | 71 ++++++ mountpoint-s3-client/src/s3_crt_client.rs | 59 ++++- .../src/s3_crt_client/put_object.rs | 157 +++++++++--- mountpoint-s3-client/tests/common/mod.rs | 5 +- .../tests/put_object_single.rs | 238 ++++++++++++++++++ .../src/http/request_response.rs | 37 ++- mountpoint-s3-crt/src/io.rs | 1 + mountpoint-s3-crt/src/io/stream.rs | 201 +++++++++++++++ mountpoint-s3-crt/src/s3/client.rs | 19 +- 14 files changed, 811 insertions(+), 72 deletions(-) create mode 100644 mountpoint-s3-client/tests/put_object_single.rs create mode 100644 mountpoint-s3-crt/src/io/stream.rs diff --git a/mountpoint-s3-client/src/checksums.rs b/mountpoint-s3-client/src/checksums.rs index 2c526d680..f033686fb 100644 --- a/mountpoint-s3-client/src/checksums.rs +++ b/mountpoint-s3-client/src/checksums.rs @@ -1,5 +1,5 @@ //! Provides base64 encoding/decoding for CRC32C checksums. -use mountpoint_s3_crt::checksums::crc32c::Crc32c; +pub use mountpoint_s3_crt::checksums::crc32c::{self, Crc32c}; use base64ct::Base64; use base64ct::Encoding; diff --git a/mountpoint-s3-client/src/failure_client.rs b/mountpoint-s3-client/src/failure_client.rs index bd1345a7f..339e946c0 100644 --- a/mountpoint-s3-client/src/failure_client.rs +++ b/mountpoint-s3-client/src/failure_client.rs @@ -17,10 +17,9 @@ use pin_project::pin_project; use crate::object_client::{ DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, - PutObjectResult, UploadReview, + ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, PutObjectError, PutObjectParams, + PutObjectRequest, PutObjectResult, PutObjectSingleParams, UploadReview, }; -use crate::ObjectClient; // Wrapper for injecting failures into a get stream or a put request pub struct FailureRequestWrapper { @@ -167,6 +166,17 @@ where }) } + async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult { + // TODO failure hook for put_object_single + self.client.put_object_single(bucket, key, params, contents).await + } + async fn get_object_attributes( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/lib.rs b/mountpoint-s3-client/src/lib.rs index 7b34ac612..ab53cde8c 100644 --- a/mountpoint-s3-client/src/lib.rs +++ b/mountpoint-s3-client/src/lib.rs @@ -74,8 +74,8 @@ pub mod types { pub use super::object_client::{ Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts, GetObjectAttributesResult, GetObjectRequest, HeadObjectResult, ListObjectsResult, ObjectAttribute, - ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, - RestoreStatus, UploadReview, UploadReviewPart, + ObjectClientResult, ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectSingleParams, + PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart, }; } diff --git a/mountpoint-s3-client/src/mock_client.rs b/mountpoint-s3-client/src/mock_client.rs index 08e7c8be2..db4ee2784 100644 --- a/mountpoint-s3-client/src/mock_client.rs +++ b/mountpoint-s3-client/src/mock_client.rs @@ -29,7 +29,7 @@ use crate::object_client::{ GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult, ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, - PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, + PutObjectSingleParams, PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart, }; mod leaky_bucket; @@ -341,6 +341,7 @@ pub enum Operation { GetObjectAttributes, ListObjectsV2, PutObject, + PutObjectSingle, } /// Counter for a specific client [Operation]. @@ -714,6 +715,29 @@ impl ObjectClient for MockClient { Ok(put_request) } + async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult { + trace!(bucket, key, "PutObject"); + self.inc_op_count(Operation::PutObjectSingle); + + if bucket != self.config.bucket { + return Err(ObjectClientError::ServiceError(PutObjectError::NoSuchBucket)); + } + + let mut object: MockObject = contents.into(); + object.set_storage_class(params.storage_class.clone()); + add_object(&self.objects, key, object); + Ok(PutObjectResult { + sse_type: None, + sse_kms_key_id: None, + }) + } + async fn get_object_attributes( &self, bucket: &str, @@ -1508,6 +1532,31 @@ mod tests { } } + #[tokio::test] + async fn test_put_object_single() { + let client = MockClient::new(MockClientConfig { + bucket: "test_bucket".to_string(), + part_size: 1024, + unordered_list_seed: None, + ..Default::default() + }); + + let content = vec![42u8; 512]; + let _put_result = client + .put_object_single("test_bucket", "key1", &Default::default(), &content) + .await + .expect("put_object failed"); + + let get_request = client + .get_object("test_bucket", "key1", None, None) + .await + .expect("get_object failed"); + + // Check that the result of get_object is correct. + let actual = get_request.collect().await.expect("failed to collect body"); + assert_eq!(&content, &*actual); + } + proptest::proptest! { #[test] fn test_ramp(size in 1..2*RAMP_BUFFER_SIZE, read_size in 1..2*RAMP_BUFFER_SIZE, offset in 0..RAMP_BUFFER_SIZE) { diff --git a/mountpoint-s3-client/src/mock_client/throughput_client.rs b/mountpoint-s3-client/src/mock_client/throughput_client.rs index fc2c74831..1634eb4b1 100644 --- a/mountpoint-s3-client/src/mock_client/throughput_client.rs +++ b/mountpoint-s3-client/src/mock_client/throughput_client.rs @@ -10,15 +10,15 @@ use mountpoint_s3_crt::s3::client::BufferPoolUsageStats; use pin_project::pin_project; use crate::mock_client::leaky_bucket::LeakyBucket; -use crate::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, MockPutObjectRequest}; +use crate::mock_client::{ + MockClient, MockClientConfig, MockClientError, MockGetObjectRequest, MockObject, MockPutObjectRequest, +}; use crate::object_client::{ - DeleteObjectError, DeleteObjectResult, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, + DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError, GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectResult, ListObjectsError, ListObjectsResult, - ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, + ObjectAttribute, ObjectClient, ObjectClientResult, PutObjectError, PutObjectParams, PutObjectResult, + PutObjectSingleParams, }; -use crate::types::ETag; - -use super::MockGetObjectRequest; /// A [MockClient] that rate limits overall download throughput to simulate a target network /// performance without the jitter or service latency of targeting a real service. Note that while @@ -168,6 +168,16 @@ impl ObjectClient for ThroughputMockClient { self.inner.put_object(bucket, key, params).await } + async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult { + self.inner.put_object_single(bucket, key, params, contents).await + } + async fn get_object_attributes( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/object_client.rs b/mountpoint-s3-client/src/object_client.rs index 1b34bbdae..cf50317df 100644 --- a/mountpoint-s3-client/src/object_client.rs +++ b/mountpoint-s3-client/src/object_client.rs @@ -139,6 +139,15 @@ pub trait ObjectClient { params: &PutObjectParams, ) -> ObjectClientResult; + /// Put an object into the object store. + async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult; + /// Retrieves all the metadata from an object without returning the object contents. async fn get_object_attributes( &self, @@ -361,6 +370,68 @@ pub type UploadReviewPart = mountpoint_s3_crt::s3::client::UploadReviewPart; /// A checksum algorithm used by the object client for integrity checks on uploads and downloads. pub type ChecksumAlgorithm = mountpoint_s3_crt::s3::client::ChecksumAlgorithm; +/// Parameters to a [`put_object_single`](ObjectClient::put_object_single) request +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct PutObjectSingleParams { + /// User-provided checksum of the data to upload. + pub checksum: Option, + /// Storage class to be used when creating new S3 object + pub storage_class: Option, + /// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse) + pub server_side_encryption: Option, + /// If `server_side_encryption` has a valid value of aws:kms or aws:kms:dsse, this value may be used to specify AWS KMS key ID to be used + /// when creating new S3 object + pub ssekms_key_id: Option, +} + +impl PutObjectSingleParams { + /// Create a default [PutObjectSingleParams]. + pub fn new() -> Self { + Self::default() + } + + /// Set checksum. + pub fn checksum(mut self, value: Option) -> Self { + self.checksum = value; + self + } + + /// Set the storage class. + pub fn storage_class(mut self, value: String) -> Self { + self.storage_class = Some(value); + self + } + + /// Set server-side encryption type. + pub fn server_side_encryption(mut self, value: Option) -> Self { + self.server_side_encryption = value; + self + } + + /// Set KMS key ID to be used for server-side encryption. + pub fn ssekms_key_id(mut self, value: Option) -> Self { + self.ssekms_key_id = value; + self + } +} + +/// A checksum used by the object client for integrity checks on uploads. +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum UploadChecksum { + Crc32c(crate::checksums::Crc32c), +} + +impl UploadChecksum { + /// The checksum algorithm used to compute this checksum. + pub fn checksum_algorithm(&self) -> ChecksumAlgorithm { + match self { + UploadChecksum::Crc32c(_) => ChecksumAlgorithm::Crc32c, + } + } +} + /// A streaming response to a GetObject request. /// /// This struct implements [`futures::Stream`], which you can use to read the body of the object. diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index d38742edf..33f2804fb 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -11,7 +11,6 @@ use std::sync::{Arc, Mutex}; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; use mountpoint_s3_crt::auth::credentials::{ CredentialsProvider, CredentialsProviderChainDefaultOptions, CredentialsProviderProfileOptions, }; @@ -24,6 +23,7 @@ use mountpoint_s3_crt::io::channel_bootstrap::{ClientBootstrap, ClientBootstrapO use mountpoint_s3_crt::io::event_loop::EventLoopGroup; use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions}; use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions}; +use mountpoint_s3_crt::io::stream::InputStream; use mountpoint_s3_crt::s3::client::{ init_signing_config, BufferPoolUsageStats, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult, MetaRequestType, RequestMetrics, RequestType, @@ -36,10 +36,12 @@ use pin_project::{pin_project, pinned_drop}; use thiserror::Error; use tracing::{debug, error, trace, Span}; +use crate::checksums::crc32c_to_base64; use crate::endpoint_config::EndpointError; use crate::endpoint_config::{self, EndpointConfig}; +use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata}; +use crate::object_client::*; use crate::user_agent::UserAgent; -use crate::{object_client::*, S3GetObjectRequest, S3PutObjectRequest}; macro_rules! request_span { ($self:expr, $method:expr, $($field:tt)*) => {{ @@ -56,10 +58,15 @@ macro_rules! request_span { pub(crate) mod delete_object; pub(crate) mod get_object; + +pub(crate) use get_object::S3GetObjectRequest; pub(crate) mod get_object_attributes; + pub(crate) mod head_object; pub(crate) mod list_objects; + pub(crate) mod put_object; +pub(crate) use put_object::S3PutObjectRequest; pub(crate) mod head_bucket; pub use head_bucket::HeadBucketError; @@ -796,6 +803,7 @@ enum S3Operation { HeadObject, ListObjects, PutObject, + PutObjectSingle, } impl S3Operation { @@ -808,7 +816,8 @@ impl S3Operation { } } - /// The operation name to set when configuring a request, if required. + /// The operation name to set when configuring a request. Required for operations that + /// have MetaRequestType::Default (see [meta_request_type]). `None` otherwise. fn operation_name(&self) -> Option<&'static str> { match self { S3Operation::DeleteObject => Some("DeleteObject"), @@ -818,6 +827,7 @@ impl S3Operation { S3Operation::HeadObject => Some("HeadObject"), S3Operation::ListObjects => Some("ListObjectsV2"), S3Operation::PutObject => None, + S3Operation::PutObjectSingle => Some("PutObject"), } } } @@ -827,15 +837,15 @@ impl S3Operation { /// virtual-hosted-style addresses. The `path_prefix` is appended to the front of all paths, and /// need not be terminated with a `/`. #[derive(Debug)] -struct S3Message { - inner: Message, +struct S3Message<'a> { + inner: Message<'a>, uri: Uri, path_prefix: String, checksum_config: Option, signing_config: Option, } -impl S3Message { +impl<'a> S3Message<'a> { /// Add a header to this message. The header is added if necessary and any existing values for /// this header are removed. fn set_header( @@ -904,6 +914,32 @@ impl S3Message { fn set_checksum_config(&mut self, checksum_config: Option) { self.checksum_config = checksum_config; } + + /// Sets the body input stream for this message, and returns any previously set input stream. + /// If input_stream is None, unsets the body. + fn set_body_stream(&mut self, input_stream: Option>) -> Option> { + self.inner.set_body_stream(input_stream) + } + + /// Set the content length header. + fn set_content_length_header( + &mut self, + content_length: usize, + ) -> Result<(), mountpoint_s3_crt::common::error::Error> { + self.inner + .set_header(&Header::new("Content-Length", content_length.to_string())) + } + + /// Set the checksum header. + fn set_checksum_header( + &mut self, + checksum: &UploadChecksum, + ) -> Result<(), mountpoint_s3_crt::common::error::Error> { + let header = match checksum { + UploadChecksum::Crc32c(crc32c) => Header::new("x-amz-checksum-crc32c", crc32c_to_base64(crc32c)), + }; + self.inner.set_header(&header) + } } #[derive(Debug)] @@ -1056,6 +1092,7 @@ fn request_type_to_metrics_string(request_type: RequestType) -> &'static str { RequestType::AbortMultipartUpload => "AbortMultipartUpload", RequestType::CompleteMultipartUpload => "CompleteMultipartUpload", RequestType::UploadPartCopy => "UploadPartCopy", + RequestType::PutObject => "PutObject", } } @@ -1266,6 +1303,16 @@ impl ObjectClient for S3CrtClient { self.put_object(bucket, key, params).await } + async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult { + self.put_object_single(bucket, key, params, contents).await + } + async fn get_object_attributes( &self, bucket: &str, diff --git a/mountpoint-s3-client/src/s3_crt_client/put_object.rs b/mountpoint-s3-client/src/s3_crt_client/put_object.rs index cf0538042..ec0d410bf 100644 --- a/mountpoint-s3-client/src/s3_crt_client/put_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/put_object.rs @@ -1,17 +1,21 @@ use std::sync::{Arc, Mutex}; use std::time::Instant; -use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult}; -use crate::s3_crt_client::{ - emit_throughput_metric, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Operation, - S3RequestError, +use crate::object_client::{ + ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams, }; use async_trait::async_trait; use futures::channel::oneshot; use mountpoint_s3_crt::http::request_response::{Header, Headers}; +use mountpoint_s3_crt::io::stream::InputStream; use mountpoint_s3_crt::s3::client::{ChecksumConfig, RequestType, UploadReview}; use tracing::error; +use super::{ + emit_throughput_metric, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Message, + S3Operation, S3RequestError, +}; + const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption"; const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id"; @@ -23,15 +27,13 @@ impl S3CrtClient { params: &PutObjectParams, ) -> ObjectClientResult { let span = request_span!(self.inner, "put_object", bucket, key); - let mut message = self - .inner - .new_request_template("PUT", bucket) - .map_err(S3RequestError::construction_failure)?; - - let key = format!("/{}", key); - message - .set_request_path(&key) - .map_err(S3RequestError::construction_failure)?; + let mut message = self.new_put_request( + bucket, + key, + params.storage_class.as_deref(), + params.server_side_encryption.as_deref(), + params.ssekms_key_id.as_deref(), + )?; let checksum_config = match params.trailing_checksums { PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()), @@ -43,21 +45,6 @@ impl S3CrtClient { let review_callback = ReviewCallbackBox::default(); let callback = review_callback.clone(); - if let Some(storage_class) = params.storage_class.to_owned() { - message - .set_header(&Header::new("x-amz-storage-class", storage_class)) - .map_err(S3RequestError::construction_failure)?; - } - if let Some(sse) = params.server_side_encryption.as_ref() { - message - .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse)) - .map_err(S3RequestError::construction_failure)?; - } - if let Some(key_id) = params.ssekms_key_id.as_ref() { - message - .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id)) - .map_err(S3RequestError::construction_failure)?; - } // Variable `response_headers` will be accessed from different threads: from CRT thread which executes `on_headers` callback // and from our thread which executes `review_and_complete`. Callback `on_headers` is guaranteed to finish before this // variable is accessed in `review_and_complete` (see `S3HttpRequest::poll` implementation). @@ -109,6 +96,97 @@ impl S3CrtClient { state: S3PutObjectRequestState::CreatingMPU(mpu_created), }) } + + pub(super) async fn put_object_single<'a>( + &self, + bucket: &str, + key: &str, + params: &PutObjectSingleParams, + contents: impl AsRef<[u8]> + Send + 'a, + ) -> ObjectClientResult { + let span = request_span!(self.inner, "put_object_single", bucket, key); + let start_time = Instant::now(); + + // `response_headers` will be populated in the `on_headers` callback (on CRT event loop) and accessed in `extract_result` executing + // on a different thread after request completion. + let response_headers: Arc>> = Default::default(); + let slice = contents.as_ref(); + let content_length = slice.len(); + let body = { + let mut message = self.new_put_request( + bucket, + key, + params.storage_class.as_deref(), + params.server_side_encryption.as_deref(), + params.ssekms_key_id.as_deref(), + )?; + message + .set_content_length_header(content_length) + .map_err(S3RequestError::construction_failure)?; + if let Some(checksum) = ¶ms.checksum { + message + .set_checksum_header(checksum) + .map_err(S3RequestError::construction_failure)?; + } + + let body_input_stream = + InputStream::new_from_slice(&self.inner.allocator, slice).map_err(S3RequestError::CrtError)?; + message.set_body_stream(Some(body_input_stream)); + + let options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObjectSingle); + let response_headers_writer = response_headers.clone(); + let on_headers = move |headers: &Headers, _: i32| { + *response_headers_writer.lock().unwrap() = Some(headers.clone()); + }; + self.inner + .make_simple_http_request_from_options(options, span, |_| {}, |_| None, on_headers)? + }; + + body.await?; + + let elapsed = start_time.elapsed(); + emit_throughput_metric(content_length as u64, elapsed, "put_object_single"); + + Ok(extract_result(&response_headers)) + } + + fn new_put_request( + &self, + bucket: &str, + key: &str, + storage_class: Option<&str>, + server_side_encryption: Option<&str>, + ssekms_key_id: Option<&str>, + ) -> Result, S3RequestError> { + let mut message = self + .inner + .new_request_template("PUT", bucket) + .map_err(S3RequestError::construction_failure)?; + + let key = format!("/{key}"); + message + .set_request_path(&key) + .map_err(S3RequestError::construction_failure)?; + + if let Some(storage_class) = storage_class { + message + .set_header(&Header::new("x-amz-storage-class", storage_class)) + .map_err(S3RequestError::construction_failure)?; + } + + if let Some(sse) = server_side_encryption { + message + .set_header(&Header::new(SSE_TYPE_HEADER_NAME, sse)) + .map_err(S3RequestError::construction_failure)?; + } + if let Some(key_id) = ssekms_key_id { + message + .set_header(&Header::new(SSE_KEY_ID_HEADER_NAME, key_id)) + .map_err(S3RequestError::construction_failure)?; + } + + Ok(message) + } } type ReviewCallback = dyn FnOnce(UploadReview) -> bool + Send; @@ -176,6 +254,18 @@ fn try_get_header_value(headers: &Headers, key: &str) -> Option { headers.get(key).ok()?.value().clone().into_string().ok() } +fn extract_result(response_headers: &Mutex>) -> PutObjectResult { + let response_headers = response_headers + .lock() + .expect("must be able to acquire headers lock") + .take() + .expect("PUT response headers must be available at this point"); + PutObjectResult { + sse_type: try_get_header_value(&response_headers, SSE_TYPE_HEADER_NAME), + sse_kms_key_id: try_get_header_value(&response_headers, SSE_KEY_ID_HEADER_NAME), + } +} + #[cfg_attr(not(docsrs), async_trait)] impl PutObjectRequest for S3PutObjectRequest { type ClientError = S3RequestError; @@ -242,16 +332,7 @@ impl PutObjectRequest for S3PutObjectRequest { let elapsed = self.start_time.elapsed(); emit_throughput_metric(self.total_bytes, elapsed, "put_object"); - let response_headers = self - .response_headers - .lock() - .expect("must be able to acquire headers lock") - .take() - .expect("PUT response headers must be available at this point"); - Ok(PutObjectResult { - sse_type: try_get_header_value(&response_headers, SSE_TYPE_HEADER_NAME), - sse_kms_key_id: try_get_header_value(&response_headers, SSE_KEY_ID_HEADER_NAME), - }) + Ok(extract_result(&self.response_headers)) } } diff --git a/mountpoint-s3-client/tests/common/mod.rs b/mountpoint-s3-client/tests/common/mod.rs index 2c84cbccc..d5aaaa87b 100644 --- a/mountpoint-s3-client/tests/common/mod.rs +++ b/mountpoint-s3-client/tests/common/mod.rs @@ -235,7 +235,6 @@ macro_rules! object_client_test { mod $test_fn_identifier { use super::$test_fn_identifier; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; - use mountpoint_s3_client::types::PutObjectParams; use $crate::{get_test_bucket_and_prefix, get_test_client}; #[tokio::test] @@ -250,7 +249,7 @@ macro_rules! object_client_test { }); let key = format!("{prefix}hello"); - $test_fn_identifier(&client, &bucket, &key, PutObjectParams::new()).await; + $test_fn_identifier(&client, &bucket, &key, Default::default()).await; } #[tokio::test] @@ -260,7 +259,7 @@ macro_rules! object_client_test { let client = get_test_client(); let key = format!("{prefix}hello"); - $test_fn_identifier(&client, &bucket, &key, PutObjectParams::new()).await; + $test_fn_identifier(&client, &bucket, &key, Default::default()).await; } } }; diff --git a/mountpoint-s3-client/tests/put_object_single.rs b/mountpoint-s3-client/tests/put_object_single.rs new file mode 100644 index 000000000..fc74dd8a9 --- /dev/null +++ b/mountpoint-s3-client/tests/put_object_single.rs @@ -0,0 +1,238 @@ +#![cfg(feature = "s3_tests")] + +pub mod common; + +use common::*; +use mountpoint_s3_client::checksums::{crc32c, crc32c_to_base64}; +use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig}; +use mountpoint_s3_client::types::{ChecksumAlgorithm, PutObjectResult, PutObjectSingleParams, UploadChecksum}; +use mountpoint_s3_client::{ObjectClient, S3CrtClient}; +use rand::Rng; +use test_case::test_case; + +// Simple test for PUT object. Puts a single, small object as a single part and checks that the +// contents are correct with a GET. +async fn test_put_object_single( + client: &(impl ObjectClient + Sync), + bucket: &str, + key: &str, + request_params: PutObjectSingleParams, +) -> PutObjectResult { + let mut rng = rand::thread_rng(); + + let mut contents = vec![0u8; 32]; + rng.fill(&mut contents[..]); + + let put_object_result = client + .put_object_single(bucket, key, &request_params, &contents) + .await + .expect("put_object should succeed"); + + let result = client + .get_object(bucket, key, None, None) + .await + .expect("get_object should succeed"); + check_get_result(result, None, &contents[..]).await; + + put_object_result +} + +object_client_test!(test_put_object_single); + +// Simple test for PUT object. Puts a single, empty object and checks that the (empty) +// contents are correct with a GET. +async fn test_put_object_single_empty( + client: &(impl ObjectClient + Sync), + bucket: &str, + key: &str, + request_params: PutObjectSingleParams, +) -> PutObjectResult { + let put_object_result = client + .put_object_single(bucket, key, &request_params, []) + .await + .expect("put_object should succeed"); + + let result = client + .get_object(bucket, key, None, None) + .await + .expect("get_object should succeed"); + check_get_result(result, None, &[]).await; + + put_object_result +} + +object_client_test!(test_put_object_single_empty); + +#[test_case(None; "no checksum")] +#[test_case(Some(ChecksumAlgorithm::Crc32c); "crc32c")] +#[tokio::test] +async fn test_put_checksums(checksum_algorithm: Option) { + const PART_SIZE: usize = 5 * 1024 * 1024; + let (bucket, prefix) = get_test_bucket_and_prefix("test_put_checksums"); + let client_config = S3ClientConfig::new() + .part_size(PART_SIZE) + .endpoint_config(EndpointConfig::new(&get_test_region())); + let client = S3CrtClient::new(client_config).expect("could not create test client"); + let key = format!("{prefix}hello"); + + let mut rng = rand::thread_rng(); + let mut contents = vec![0u8; PART_SIZE * 2]; + rng.fill(&mut contents[..]); + + let checksum = match checksum_algorithm { + Some(ChecksumAlgorithm::Crc32c) => Some(UploadChecksum::Crc32c(crc32c::checksum(&contents))), + Some(_) => unimplemented!("checksum algorithm not supported"), + None => None, + }; + + let params = PutObjectSingleParams::new().checksum(checksum.clone()); + client + .put_object_single(&bucket, &key, ¶ms, &contents) + .await + .expect("put_object should succeed"); + + let sdk_client = get_test_sdk_client().await; + let output = sdk_client + .head_object() + .bucket(&bucket) + .key(key) + .checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled) + .send() + .await + .unwrap(); + + match checksum { + Some(UploadChecksum::Crc32c(upload_checksum)) => { + let checksum = output.checksum_crc32_c().unwrap(); + let encoded = crc32c_to_base64(&upload_checksum); + assert_eq!(checksum, encoded); + } + Some(_) => unreachable!("unexpected checksum type"), + None => { + assert!( + output.checksum_crc32_c().is_none(), + "crc32c should not be present when upload checksums are disabled" + ); + } + } +} + +#[test_case("INTELLIGENT_TIERING")] +#[test_case("GLACIER")] +#[tokio::test] +// S3 Express One Zone is a distinct storage class and can't be overridden +#[cfg(not(feature = "s3express_tests"))] +async fn test_put_object_storage_class(storage_class: &str) { + let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_abort"); + let client = get_test_client(); + let key = format!("{prefix}hello"); + + let mut rng = rand::thread_rng(); + let mut contents = vec![0u8; 32]; + rng.fill(&mut contents[..]); + + let params = PutObjectSingleParams::new().storage_class(storage_class.to_owned()); + client + .put_object_single(&bucket, &key, ¶ms, &contents) + .await + .expect("put_object should succeed"); + + let sdk_client = get_test_sdk_client().await; + let attributes = sdk_client + .get_object_attributes() + .bucket(bucket) + .key(key) + .object_attributes(aws_sdk_s3::types::ObjectAttributes::StorageClass) + .send() + .await + .unwrap(); + + assert_eq!(storage_class, attributes.storage_class.unwrap().as_str()); +} + +#[cfg(not(feature = "s3express_tests"))] +async fn check_sse( + bucket: &String, + key: &String, + expected_sse: Option<&str>, + expected_key: &Option, + put_object_result: PutObjectResult, +) { + let sdk_client = get_test_sdk_client().await; + let head_object_resp = sdk_client + .head_object() + .bucket(bucket) + .key(key) + .send() + .await + .expect("head object should succeed"); + let (expected_sse, expected_sdk_sse) = match expected_sse { + None => (Some("AES256"), aws_sdk_s3::types::ServerSideEncryption::Aes256), + Some("AES256") => (Some("AES256"), aws_sdk_s3::types::ServerSideEncryption::Aes256), + Some("aws:kms") => (Some("aws:kms"), aws_sdk_s3::types::ServerSideEncryption::AwsKms), + Some("aws:kms:dsse") => ( + Some("aws:kms:dsse"), + aws_sdk_s3::types::ServerSideEncryption::AwsKmsDsse, + ), + _ => panic!("unexpected sse type was used in a test"), + }; + let actual_sse = head_object_resp + .server_side_encryption + .expect("SSE field should always have a value for this test"); + assert_eq!( + actual_sse, expected_sdk_sse, + "unexpected sse type in HEAD_OBJECT response" + ); + assert_eq!( + put_object_result.sse_type.as_deref(), + expected_sse, + "unexpected sse type in PutObjectResult" + ); + if !matches!(expected_sdk_sse, aws_sdk_s3::types::ServerSideEncryption::Aes256) { + assert!( + head_object_resp.ssekms_key_id.is_some(), + "must have a key for non-default encryption methods", + ); + } + if expected_key.is_some() { + // do not check the value of AWS managed key + assert_eq!( + &head_object_resp.ssekms_key_id, expected_key, + "unexpected sse key in HEAD_OBJECT response" + ); + assert_eq!( + &put_object_result.sse_kms_key_id, expected_key, + "unexpected sse key in PutObjectResult" + ); + } +} + +// Test that SSE settings, which were used to create a new object, are reflected in: +// 1. HEAD_OBJECT response queried via AWS SDK; +// 2. returned `PutObjectResult`. +#[test_case(Some("aws:kms"), Some(get_test_kms_key_id()))] +#[test_case(Some("aws:kms"), None)] +#[test_case(Some("aws:kms:dsse"), Some(get_test_kms_key_id()))] +#[test_case(Some("aws:kms:dsse"), None)] +#[test_case(None, None)] +#[test_case(Some("AES256"), None)] +#[tokio::test] +#[cfg(not(feature = "s3express_tests"))] +async fn test_put_object_sse(sse_type: Option<&str>, kms_key_id: Option) { + let bucket = get_test_bucket(); + let client_config = S3ClientConfig::new().endpoint_config(EndpointConfig::new(&get_test_region())); + let client = S3CrtClient::new(client_config).expect("could not create test client"); + let request_params = PutObjectSingleParams::new() + .server_side_encryption(sse_type.map(|value| value.to_owned())) + .ssekms_key_id(kms_key_id.to_owned()); + + let prefix = get_unique_test_prefix("test_put_object_sse"); + let key = format!("{prefix}hello"); + let put_object_result = test_put_object_single(&client, &bucket, &key, request_params.clone()).await; + check_sse(&bucket, &key, sse_type, &kms_key_id, put_object_result).await; + + let prefix = get_unique_test_prefix("test_put_object_sse"); + let key = format!("{prefix}hello"); + let put_object_result = test_put_object_single(&client, &bucket, &key, request_params.clone()).await; + check_sse(&bucket, &key, sse_type, &kms_key_id, put_object_result).await; +} diff --git a/mountpoint-s3-crt/src/http/request_response.rs b/mountpoint-s3-crt/src/http/request_response.rs index c51b5161c..e3402066e 100644 --- a/mountpoint-s3-crt/src/http/request_response.rs +++ b/mountpoint-s3-crt/src/http/request_response.rs @@ -11,6 +11,7 @@ use thiserror::Error; use crate::common::allocator::Allocator; use crate::common::error::Error; use crate::http::http_library_init; +use crate::io::stream::InputStream; use crate::{aws_byte_cursor_as_slice, CrtError, ToAwsByteCursor}; /// An HTTP header. @@ -250,12 +251,15 @@ impl<'a> Iterator for HeadersIterator<'a> { /// A single HTTP message, initialized to be empty (i.e., no headers, no body). #[derive(Debug)] -pub struct Message { +pub struct Message<'a> { /// The pointer to the inner `aws_http_message`. pub(crate) inner: NonNull, + + /// Input stream for the body of the http message, if present. + body_input_stream: Option>, } -impl Message { +impl<'a> Message<'a> { /// Creates a new HTTP/1.1 request message. pub fn new_request(allocator: &Allocator) -> Result { // TODO: figure out a better place to call this @@ -264,7 +268,10 @@ impl Message { // SAFETY: `allocator.inner` is a valid `aws_allocator`. let inner = unsafe { aws_http_message_new_request(allocator.inner.as_ptr()).ok_or_last_error()? }; - Ok(Self { inner }) + Ok(Self { + inner, + body_input_stream: None, + }) } /// Add a header to this message. If the header already exists in the message, this will add a @@ -315,9 +322,31 @@ impl Message { let headers = unsafe { Headers::from_crt(header_ptr) }; Ok(headers) } + + /// Sets the body input stream for this message, and returns any previously set input stream. + /// If input_stream is None, unsets the body. + pub fn set_body_stream(&mut self, input_stream: Option>) -> Option> { + let old_input_stream = std::mem::replace(&mut self.body_input_stream, input_stream); + + let new_input_stream_ptr = self + .body_input_stream + .as_ref() + .map(|s| s.inner.as_ptr()) + .unwrap_or(std::ptr::null_mut()); + + // SAFETY: `aws_http_message_set_request_method` does _not_ take ownership of the underlying + // input stream. We take ownership of the input stream to make sure it doesn't get dropped + // while the CRT has a pointer to it. We also use lifetime parameters to enforce that this + // message does not outlive any data borrowed by the input stream. + unsafe { + aws_http_message_set_body_stream(self.inner.as_ptr(), new_input_stream_ptr); + } + + old_input_stream + } } -impl Drop for Message { +impl Drop for Message<'_> { fn drop(&mut self) { // SAFETY: `self.inner` is a valid `aws_http_message`, and on Drop it's safe to decrement // the reference count since we won't use it again through `self.` diff --git a/mountpoint-s3-crt/src/io.rs b/mountpoint-s3-crt/src/io.rs index 07f222fb3..4efb54411 100644 --- a/mountpoint-s3-crt/src/io.rs +++ b/mountpoint-s3-crt/src/io.rs @@ -11,6 +11,7 @@ pub mod event_loop; pub mod futures; pub mod host_resolver; pub mod retry_strategy; +pub mod stream; static IO_LIBRARY_INIT: Once = Once::new(); diff --git a/mountpoint-s3-crt/src/io/stream.rs b/mountpoint-s3-crt/src/io/stream.rs new file mode 100644 index 000000000..4d6a84eb1 --- /dev/null +++ b/mountpoint-s3-crt/src/io/stream.rs @@ -0,0 +1,201 @@ +//! AWS input streams. + +use crate::common::{allocator::Allocator, error::Error}; +use crate::CrtError; +use mountpoint_s3_crt_sys::*; +use std::marker::PhantomData; +use std::ptr::NonNull; + +/// Wrapper for [aws_input_stream]. +#[derive(Debug)] +pub struct InputStream<'a> { + /// The inner `aws_input_stream`. Consumers should always hold the containing `InputStream<'a>` to ensure + /// `inner` points to a valid buffer. + pub(crate) inner: NonNull, + + /// Phantom data to keep the lifetimes correct, for example, if this stream is created from an + /// aws_byte_cursor that has some lifetime. + _phantom: PhantomData<&'a [u8]>, +} + +impl<'a> InputStream<'a> { + /// Create a new [InputStream] from a slice. The slice is not copied, and so the resulting + /// [InputStream] cannot outlive the slice (enforced by a lifetime restriction on the [InputStream]). + pub fn new_from_slice(allocator: &Allocator, buffer: &'a [u8]) -> Result { + let cursor = aws_byte_cursor { + len: buffer.len(), + ptr: buffer.as_ptr() as *mut u8, + }; + + // SAFETY: allocator is a valid aws_allocator. `Self` has a lifetime of 'a, so Rust + // will ensure that the return value from this function doesn't out live the buffer. + let inner = unsafe { aws_input_stream_new_from_cursor(allocator.inner.as_ptr(), &cursor).ok_or_last_error()? }; + + Ok(Self { + inner, + _phantom: Default::default(), + }) + } +} + +impl<'a> Drop for InputStream<'a> { + fn drop(&mut self) { + // SAFETY: self.inner is a valid `aws_input_stream`. + unsafe { + aws_input_stream_release(self.inner.as_ptr()); + } + } +} + +/// Status of an [InputStream]. +#[derive(Debug)] +pub struct StreamStatus { + is_valid: bool, + is_end_of_stream: bool, +} + +impl From for StreamStatus { + fn from(status: aws_stream_status) -> Self { + Self { + is_valid: status.is_valid, + is_end_of_stream: status.is_end_of_stream, + } + } +} + +impl From for aws_stream_status { + fn from(status: StreamStatus) -> Self { + Self { + is_valid: status.is_valid, + is_end_of_stream: status.is_end_of_stream, + } + } +} + +/// Specifies where to seek from in an [InputStream]. +#[derive(Debug)] +pub enum SeekBasis { + /// Seek from the beginning of the stream. + Begin, + /// Seek from the end of the stream. + End, +} + +impl From for SeekBasis { + fn from(value: aws_stream_seek_basis) -> Self { + match value { + aws_stream_seek_basis::AWS_SSB_BEGIN => Self::Begin, + aws_stream_seek_basis::AWS_SSB_END => Self::End, + _ => panic!("invalid stream seek basis: {value:?}"), + } + } +} + +impl From for aws_stream_seek_basis { + fn from(value: SeekBasis) -> Self { + match value { + SeekBasis::Begin => aws_stream_seek_basis::AWS_SSB_BEGIN, + SeekBasis::End => aws_stream_seek_basis::AWS_SSB_END, + } + } +} + +impl<'a> InputStream<'a> { + /// Seek to the given offset. Basis is either BEGIN or END, and describes where to seek from. + pub fn seek(&self, offset: i64, basis: SeekBasis) -> Result<(), Error> { + // SAFETY: self.inner is a valid input stream. + unsafe { aws_input_stream_seek(self.inner.as_ptr(), offset, basis.into()).ok_or_last_error() } + } + + /// Read some data into `buffer`, and return how many bytes were read. + pub fn read(&self, buffer: &mut [u8]) -> Result { + let mut byte_buf = aws_byte_buf { + len: 0, + buffer: buffer.as_mut_ptr(), + capacity: buffer.len(), + allocator: std::ptr::null_mut(), + }; + + // SAFETY: we know that the aws_byte_buf we just made points to a valid buffer, and trust + // the CRT function not to write outside that buffer's capacity. Also, self.inner is a + // valid input stream. + unsafe { + aws_input_stream_read(self.inner.as_ptr(), &mut byte_buf).ok_or_last_error()?; + }; + + assert_eq!(byte_buf.capacity, buffer.len(), "capacity should not change"); + + assert!( + byte_buf.len <= buffer.len(), + "should not have written more than available" + ); + Ok(byte_buf.len) + } + + /// Get the status of this stream. Can be used to indicate if stream is at the EOF. + pub fn get_status(&self) -> Result { + let mut status: aws_stream_status = Default::default(); + + // SAFETY: self.inner is a valid input stream and status is a local variable. + unsafe { + aws_input_stream_get_status(self.inner.as_ptr(), &mut status).ok_or_last_error()?; + } + + Ok(status.into()) + } + + /// Get the length of this input stream, in bytes. If a length cannot be determined, return Err. + pub fn get_length(&self) -> Result { + let mut out_length: i64 = 0; + + // SAFETY: self.inner is a valid input stream and out_length is a pointer to a local variable. + unsafe { + aws_input_stream_get_length(self.inner.as_ptr(), &mut out_length).ok_or_last_error()?; + } + + Ok(out_length.try_into().expect("failed to convert i64 to usize")) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::common::allocator::Allocator; + + #[test] + fn test_slice_cursor() { + let allocator = Allocator::default(); + + let bytes = b"Hello world!".to_vec(); + + // Create a new CRT input stream from this slice. + let stream = + InputStream::new_from_slice(&allocator, &bytes[..]).expect("failed to make input stream from slice"); + + let mut buffer = vec![0u8; 40]; + + let nread = stream.read(&mut buffer).expect("read failed"); + assert_eq!(nread, bytes.len()); + assert_eq!(&buffer[..nread], &bytes[..]); + + let status = stream.get_status().expect("get_status failed"); + assert!(status.is_end_of_stream); + + let length = stream.get_length().expect("get_length failed"); + assert_eq!(length, bytes.len()); + + // Partial reads + let mut small_buffer = vec![0u8; 5]; + stream.seek(0, SeekBasis::Begin).expect("seek to the start failed"); + + let nread = stream.read(&mut small_buffer).expect("read prefix failed"); + assert_eq!(nread, 5); + assert_eq!(&small_buffer[..], &bytes[..5]); + + stream.seek(-5, SeekBasis::End).expect("seek -5 from the end failed"); + + let nread = stream.read(&mut small_buffer).expect("read suffix failed"); + assert_eq!(nread, 5); + assert_eq!(&small_buffer[..], &bytes[(length - 5)..]); + } +} diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index bb421bd32..bb33a0a25 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -225,12 +225,12 @@ type FinishCallback = Box; /// Options for meta requests to S3. This is not a public interface, since clients should always /// be using the [MetaRequestOptions] wrapper, which pins this struct behind a pointer. -struct MetaRequestOptionsInner { +struct MetaRequestOptionsInner<'a> { /// Inner struct to pass to CRT functions. inner: aws_s3_meta_request_options, /// Owned copy of the message, if provided. - message: Option, + message: Option>, /// Owned copy of the endpoint URI, if provided endpoint: Option, @@ -260,7 +260,7 @@ struct MetaRequestOptionsInner { _pinned: PhantomPinned, } -impl<'a> MetaRequestOptionsInner { +impl<'a> MetaRequestOptionsInner<'_> { /// Convert from user_data in a callback to a reference to this struct. /// /// ## Safety @@ -283,7 +283,7 @@ impl<'a> MetaRequestOptionsInner { } } -impl Debug for MetaRequestOptionsInner { +impl Debug for MetaRequestOptionsInner<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MetaRequestOptionsInner") .field("inner", &self.inner) @@ -296,9 +296,9 @@ impl Debug for MetaRequestOptionsInner { /// Options for a meta request to S3. // Implementation details: this wraps the inner struct in a pinned box to enforce we don't move out of it. #[derive(Debug)] -pub struct MetaRequestOptions(Pin>); +pub struct MetaRequestOptions<'a>(Pin>>); -impl MetaRequestOptions { +impl<'a> MetaRequestOptions<'a> { /// Create a new default options struct. It follows the builder pattern so clients can use /// methods to set various options. pub fn new() -> Self { @@ -352,7 +352,7 @@ impl MetaRequestOptions { } /// Set the message of the request. - pub fn message(&mut self, message: Message) -> &mut Self { + pub fn message(&mut self, message: Message<'a>) -> &mut Self { // SAFETY: we aren't moving out of the struct. let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) }; options.message = Some(message); @@ -464,7 +464,7 @@ impl MetaRequestOptions { } } -impl Default for MetaRequestOptions { +impl Default for MetaRequestOptions<'_> { fn default() -> Self { Self::new() } @@ -1359,6 +1359,8 @@ pub enum RequestType { CompleteMultipartUpload, /// UploadPartCopy: https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html UploadPartCopy, + /// PutObject: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html + PutObject, } impl From for RequestType { @@ -1373,6 +1375,7 @@ impl From for RequestType { aws_s3_request_type::AWS_S3_REQUEST_TYPE_ABORT_MULTIPART_UPLOAD => RequestType::AbortMultipartUpload, aws_s3_request_type::AWS_S3_REQUEST_TYPE_COMPLETE_MULTIPART_UPLOAD => RequestType::CompleteMultipartUpload, aws_s3_request_type::AWS_S3_REQUEST_TYPE_UPLOAD_PART_COPY => RequestType::UploadPartCopy, + aws_s3_request_type::AWS_S3_REQUEST_TYPE_PUT_OBJECT => RequestType::PutObject, _ => panic!("unknown request type {:?}", value), } }