Skip to content

Commit

Permalink
Detect incomplete writes
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro committed May 10, 2024
1 parent 82eeeb7 commit 162880b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
35 changes: 27 additions & 8 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl S3CrtClient {
start_time: Instant::now(),
total_bytes: 0,
response_headers,
pending_create_mpu: Some(mpu_created),
state: S3PutObjectRequestState::CreatingMPU(mpu_created),
})
}
}
Expand Down Expand Up @@ -154,9 +154,19 @@ pub struct S3PutObjectRequest {
total_bytes: u64,
/// Headers of the CompleteMultipartUpload response, available after the request was finished
response_headers: Arc<Mutex<Option<Headers>>>,
/// Signal indicating that CreateMultipartUpload completed successfully, or that the MPU failed.
/// Set to [None] once awaited on the first write, meaning the MPU was already created or failed.
pending_create_mpu: Option<oneshot::Receiver<Result<(), S3RequestError>>>,
state: S3PutObjectRequestState,
}

/// Internal state for a [S3PutObjectRequest].
#[derive(Debug)]
enum S3PutObjectRequestState {
/// Initial state indicating that CreateMultipartUpload may still be in progress. To be awaited on first write.
/// The signal indicates that CreateMultipartUpload completed successfully, or that the MPU failed.
CreatingMPU(oneshot::Receiver<Result<(), S3RequestError>>),
/// A write operation is in progress or was interrupted before completion.
PendingWrite,
/// Idle state.
Idle,
}

fn try_get_header_value(headers: &Headers, key: &str) -> Option<String> {
Expand All @@ -168,10 +178,14 @@ impl PutObjectRequest for S3PutObjectRequest {
type ClientError = S3RequestError;

async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
// On first write, check the pending CreateMultipartUpload.
if let Some(create_mpu) = self.pending_create_mpu.take() {
// Wait for CreateMultipartUpload to complete successfully, or the MPU to fail.
create_mpu.await.unwrap()?;
match std::mem::replace(&mut self.state, S3PutObjectRequestState::PendingWrite) {
S3PutObjectRequestState::CreatingMPU(create_mpu) => {
// On first write, check the pending CreateMultipartUpload.
// Wait for CreateMultipartUpload to complete successfully, or the MPU to fail.
create_mpu.await.unwrap()?;
}
S3PutObjectRequestState::PendingWrite => return Err(S3RequestError::RequestCanceled.into()),
S3PutObjectRequestState::Idle => {}
}

let meta_request = &mut self.body.meta_request;
Expand All @@ -185,6 +199,7 @@ impl PutObjectRequest for S3PutObjectRequest {
self.total_bytes += slice.len() as u64;
slice = remaining;
}
self.state = S3PutObjectRequestState::Idle;
Ok(())
}

Expand All @@ -196,6 +211,10 @@ impl PutObjectRequest for S3PutObjectRequest {
mut self,
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
if matches!(self.state, S3PutObjectRequestState::PendingWrite) {
return Err(S3RequestError::RequestCanceled.into());
}

self.review_callback.set(review_callback);

// Write will fail if the request has already finished (because of an error).
Expand Down
10 changes: 5 additions & 5 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use mountpoint_s3_client::types::{
};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt_sys::aws_common_error;
use rand::Rng;
use test_case::test_case;

Expand Down Expand Up @@ -221,7 +220,7 @@ async fn test_put_object_write_cancelled() {
request.write(&[1, 2, 3, 4]).await.expect("write should succeed");

{
// Write a multiple of `part_size` to ensure the copy is deferred.
// Write a multiple of `part_size` to ensure it will not complete immediately.
let size = client.part_size().unwrap() * 10;
let buffer = vec![0u8; size];
let write = request.write(&buffer);
Expand All @@ -235,9 +234,10 @@ async fn test_put_object_write_cancelled() {
.write(&[1, 2, 3, 4])
.await
.expect_err("further writes should fail");
assert!(
matches!(err, ObjectClientError::ClientError(S3RequestError::CrtError(e)) if e.raw_error() == aws_common_error::AWS_ERROR_INVALID_STATE as i32)
);
assert!(matches!(
err,
ObjectClientError::ClientError(S3RequestError::RequestCanceled)
));
}

#[tokio::test]
Expand Down

0 comments on commit 162880b

Please sign in to comment.