Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel S3 requests when dropped #794

Merged
merged 4 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 58 additions & 30 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequestOptions, MetaRequestResult, MetaRequestType,
RequestType,
init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestType,
};

use async_trait::async_trait;
use futures::channel::oneshot;
use percent_encoding::{percent_encode, AsciiSet, NON_ALPHANUMERIC};
use pin_project::pin_project;
use pin_project::{pin_project, pinned_drop};
use thiserror::Error;
use tracing::{debug, error, trace, Span};

Expand Down Expand Up @@ -455,7 +455,8 @@ impl S3CrtClientInner {
let _guard = span_telemetry.enter();

let http_status = metrics.status_code();
let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(true);
let request_canceled = metrics.is_canceled();
let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled);
let crt_error = Some(metrics.error()).filter(|e| e.is_err());
let request_type = request_type_to_metrics_string(metrics.request_type());
let request_id = metrics.request_id().unwrap_or_else(|| "<unknown>".into());
Expand All @@ -465,6 +466,8 @@ impl S3CrtClientInner {

let message = if request_failure {
"CRT request failed"
} else if request_canceled {
"CRT request canceled"
} else {
"CRT request finished"
};
Expand All @@ -479,6 +482,8 @@ impl S3CrtClientInner {
metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1);
if request_failure {
metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1);
} else if request_canceled {
metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1);
}
})
.on_headers(move |headers, response_status| {
Expand Down Expand Up @@ -522,7 +527,12 @@ impl S3CrtClientInner {
metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64);
}

let log_level = status_code_to_log_level(request_result.response_status);
let status_code = request_result.response_status;
let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() {
tracing::Level::DEBUG
} else {
tracing::Level::WARN
};

// The `on_finish` callback has a choice of whether to give us an error or not. If
// not, fall back to generic error parsing (e.g. for permissions errors), or just no
Expand All @@ -535,20 +545,29 @@ impl S3CrtClientInner {
Ok(t)
}
Err(maybe_err) => {
let message = if request_result.is_canceled() {
"meta request canceled"
} else {
"meta request failed"
};
if let Some(error) = &maybe_err {
event!(log_level, ?duration, ?error, "meta request failed");
debug!("failed meta request result: {:?}", request_result);
event!(log_level, ?duration, ?error, message);
debug!("meta request result: {:?}", request_result);
} else {
event!(log_level, ?duration, ?request_result, "meta request failed");
event!(log_level, ?duration, ?request_result, message);
}

// If it's not a real HTTP status, encode the CRT error in the metric instead
let error_status = if request_result.response_status >= 100 {
request_result.response_status
if request_result.is_canceled() {
metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1);
} else {
-request_result.crt_error.raw_error()
};
metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
// If it's not a real HTTP status, encode the CRT error in the metric instead
let error_status = if request_result.response_status >= 100 {
request_result.response_status
} else {
-request_result.crt_error.raw_error()
};
metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1);
}

// Fill in a generic error if we weren't able to parse one
Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result))))
Expand All @@ -558,12 +577,14 @@ impl S3CrtClientInner {
let _ = tx.send(result);
});

// Issue the HTTP request using the CRT's S3 meta request API. We don't need to hold on to
// the resulting meta request, as it's a reference-counted object.
self.s3_client.make_meta_request(options)?;
// Issue the HTTP request using the CRT's S3 meta request API
let meta_request = self.s3_client.make_meta_request(options)?;
Self::poll_client_metrics(&self.s3_client);

Ok(S3HttpRequest { receiver: rx })
Ok(S3HttpRequest {
receiver: rx,
meta_request,
})
}

/// Make an HTTP request using this S3 client that returns the body on success or invokes the
Expand Down Expand Up @@ -729,10 +750,11 @@ impl S3Message {
}

#[derive(Debug)]
#[pin_project]
#[pin_project(PinnedDrop)]
struct S3HttpRequest<T, E> {
#[pin]
receiver: oneshot::Receiver<ObjectClientResult<T, E, S3RequestError>>,
meta_request: MetaRequest,
}

impl<T: Send, E: Send> Future for S3HttpRequest<T, E> {
Expand All @@ -750,6 +772,13 @@ impl<T: Send, E: Send> Future for S3HttpRequest<T, E> {
}
}

#[pinned_drop]
impl<T, E> PinnedDrop for S3HttpRequest<T, E> {
fn drop(self: Pin<&mut Self>) {
self.meta_request.cancel();
}
}

/// Failures to construct a new S3 client
#[derive(Error, Debug)]
#[non_exhaustive]
Expand Down Expand Up @@ -798,6 +827,10 @@ pub enum S3RequestError {
/// No signing credential is set for requests
#[error("No signing credentials found")]
NoSigningCredentials,

/// The request was canceled
#[error("Request canceled")]
RequestCanceled,
}

impl S3RequestError {
Expand All @@ -817,16 +850,6 @@ pub enum ConstructionError {
InvalidEndpoint(#[from] EndpointError),
}

/// Some requests are expected failures, and we want to log those at a different level to unexpected
/// ones.
fn status_code_to_log_level(status_code: i32) -> tracing::Level {
if (200..=399).contains(&status_code) || status_code == 404 {
tracing::Level::DEBUG
} else {
tracing::Level::WARN
}
}

/// Return a string version of a [RequestType] for use in metrics
///
/// TODO: Replace this method with `aws_s3_request_metrics_get_operation_name`,
Expand Down Expand Up @@ -914,13 +937,18 @@ fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option<S3Reque
}
}

/// Handle canceled requests
fn try_parse_canceled_request(request_result: &MetaRequestResult) -> Option<S3RequestError> {
request_result.is_canceled().then_some(S3RequestError::RequestCanceled)
}

match request_result.response_status {
301 => try_parse_redirect(request_result),
// 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket
// redirect
400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)),
403 => try_parse_forbidden(request_result),
0 => try_parse_no_credentials(request_result),
0 => try_parse_no_credentials(request_result).or_else(|| try_parse_canceled_request(request_result)),
_ => None,
}
}
Expand Down
45 changes: 32 additions & 13 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Instant;
use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult};
use crate::s3_crt_client::{emit_throughput_metric, S3CrtClient, S3RequestError};
use async_trait::async_trait;
use futures::{select_biased, FutureExt as _};
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use mountpoint_s3_crt::io::async_stream::{self, AsyncStreamWriter};
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType, UploadReview};
Expand Down Expand Up @@ -165,11 +166,19 @@ impl PutObjectRequest for S3PutObjectRequest {
type ClientError = S3RequestError;

async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> {
self.total_bytes += slice.len() as u64;
self.writer
.write(slice)
.await
.map_err(|e| S3RequestError::InternalError(Box::new(e)).into())
// Check if the request has already finished (which can only happen because of an error in
// the request), and fail the write if so. Ordering doesn't matter here as it should be
// impossible for the request to succeed while we still hold `&mut self`, as no one can call
// `complete()`.
select_biased! {
result = self.writer.write(slice).fuse() => {
self.total_bytes += slice.len() as u64;
result.map_err(|e| S3RequestError::InternalError(Box::new(e)).into())
}

// Request can't have succeeded if we still own `&mut self`
result = (&mut self.body).fuse() => Err(result.expect_err("request can't succeed while still writing")),
}
}

async fn complete(self) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
Expand All @@ -184,15 +193,25 @@ impl PutObjectRequest for S3PutObjectRequest {
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
self.review_callback.set(review_callback);
let body = {
self.writer
.complete()
.await
.map_err(|e| S3RequestError::InternalError(Box::new(e)))?;
self.body
};

let _ = body.await?;
let mut request = self.body.fuse();

// Check if the request has already finished (because of an error), and fail the write if
// so. Ordering here is significant: if both futures are ready (which could happen if we
// were not polled for a long time, and so the request succeeded _because_ of the
// `self.writer.complete()` call), we want to proceed with the complete path.
select_biased! {
result = self.writer.complete().fuse() => {
let _ = result.map_err(|e| S3RequestError::InternalError(Box::new(e)))?;

// Now wait for the request to finish.
let _ = request.await?;
}

// Request can't have succeeded if we still own it and the `complete()` future is still
// incomplete, which we know it is because this is a biased select.
result = request => return Err(result.expect_err("request can't succeed while still completing")),
};

let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");
Expand Down
45 changes: 45 additions & 0 deletions mountpoint-s3-client/tests/get_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,48 @@ async fn test_get_object_412_if_match() {
Err(ObjectClientError::ServiceError(GetObjectError::PreconditionFailed))
));
}

#[test_case(false; "early")]
#[test_case(true; "after read")]
#[tokio::test]
async fn test_get_object_cancel(read: bool) {
const OBJECT_SIZE: usize = 30_000_000;

let sdk_client = get_test_sdk_client().await;
let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_cancel");

// Create one large object named "hello"
let key = format!("{prefix}/hello");
let body = vec![0x42; OBJECT_SIZE];
sdk_client
.put_object()
.bucket(&bucket)
.key(&key)
.body(ByteStream::from(body))
.send()
.await
.unwrap();

let client: S3CrtClient = get_test_client();

let mut request = client
.get_object(&bucket, &key, None, None)
.await
.expect("get_object should succeed");

if read {
let mut bytes = 0;
while let Some(next) = request.next().await {
let (_offset, body) = next.expect("part download should succeed");
bytes += body.len();
}
assert_eq!(bytes, OBJECT_SIZE);
} else {
// Wait a bit for the request to make some progress. This is very racy, but should be short
// enough that there are still requests outstanding.
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}

// Explicitly cancel the request.
drop(request);
passaro marked this conversation as resolved.
Show resolved Hide resolved
}
50 changes: 47 additions & 3 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,16 @@ async fn test_put_object_dropped(client: &impl ObjectClient, bucket: &str, key:
object_client_test!(test_put_object_dropped);

// Test for abort PUT object.
#[test_case(30; "small")]
#[test_case(30_000_000; "large")]
#[tokio::test]
async fn test_put_object_abort() {
async fn test_put_object_abort(size: usize) {
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_abort");
let client = get_test_client();
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; 32];
let mut contents = vec![0u8; size];
rng.fill(&mut contents[..]);

let mut request = client
Expand All @@ -167,9 +169,51 @@ async fn test_put_object_abort() {

drop(request); // Drop without calling complete().

// Allow for the AbortMultipartUpload to complete.
// Try to wait a while for the async abort to complete. For the larger object, this might be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still messing with it to confirm, but it looks like a CRT-side issue — the AbortMultipartUpload can race with in-flight UploadParts, which might succeed after the Abort succeeds and so re-create the upload. I'm not sure if that's worth fixing here (since we're not actually changing how PUT works), so I might just disable the test for large objects.

// quite slow, especially in CI.
for _ in 0..20 {
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key)
.await
.unwrap();
if uploads_in_progress == 0 {
return;
}
}
panic!("upload did not get cleaned up");
}

#[tokio::test]
async fn test_put_object_initiate_failure() {
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_initiate_failure");
let client = get_test_client();
let key = format!("{prefix}hello");

let params = PutObjectParams::new().storage_class("INVALID_STORAGE_CLASS".into());

let mut request = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

// The MPU initiation should fail, so we should get an error when we try to write.
let _err = request.write(&[1, 2, 3, 4]).await.expect_err("write should fail");

// Try again just to make sure the failure is fused correctly and doesn't block forever if
// someone (incorrectly) tries to write again after a failure.
let _err = request
.write(&[1, 2, 3, 4])
.await
.expect_err("second write should fail");

// Abort the request (which should already have been canceled)
drop(request);

// Wait a bit to let any requests settle.
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let sdk_client = get_test_sdk_client().await;
let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key)
.await
.unwrap();
Expand Down
Loading
Loading