From e6cb8b34e30bc7470905eb45890b6f2eb282ef21 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Fri, 1 Mar 2024 23:51:44 +0000 Subject: [PATCH 1/4] Cancel S3 requests when dropped Today we don't cancel S3 requests when dropped. For our prefetcher that means we keep streaming (up to) 2GB of data that will never be used. This change cancels in-flight requests when dropped, so that the CRT will stop streaming them. Some bytes might still be in flight or delivered, which is fine. Canceling requests is a no-op if they've already completed. The tricky case for this change is PutObject. Our current implementation of `PutObjectRequest::write` blocks until the bytes it provides are consumed by the client. But sometimes the client might stop reading from the stream because the request has failed. That case happens to work today because we don't retain a reference to the meta request ourselves, and so the failed request's destructors run immediately after the failure, which unblocks the writer and returns it an error. But now we do hold onto a reference, and the destructors can't run until the last reference is released, so the writer is never unblocked. To fix this, we make the `write` and `complete` methods of the `PutObjectRequest` poll _both_ the write stream and the request itself in parallel. If the request completes, this gives us a chance to bail out of the write rather than blocking forever. Signed-off-by: James Bornholt --- mountpoint-s3-client/src/s3_crt_client.rs | 26 +++++++--- .../src/s3_crt_client/put_object.rs | 45 ++++++++++++----- mountpoint-s3-client/tests/get_object.rs | 45 +++++++++++++++++ mountpoint-s3-client/tests/put_object.rs | 50 +++++++++++++++++-- mountpoint-s3-crt/src/s3/client.rs | 19 +++++++ 5 files changed, 161 insertions(+), 24 deletions(-) diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index b89ff5da8..c93027aa2 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -23,14 +23,14 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup; use mountpoint_s3_crt::io::host_resolver::{AddressKinds, HostResolver, HostResolverDefaultOptions}; use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions}; use mountpoint_s3_crt::s3::client::{ - init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequestOptions, MetaRequestResult, MetaRequestType, - RequestType, + init_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequest, MetaRequestOptions, MetaRequestResult, + MetaRequestType, RequestType, }; use async_trait::async_trait; use futures::channel::oneshot; use percent_encoding::{percent_encode, AsciiSet, NON_ALPHANUMERIC}; -use pin_project::pin_project; +use pin_project::{pin_project, pinned_drop}; use thiserror::Error; use tracing::{debug, error, trace, Span}; @@ -558,12 +558,14 @@ impl S3CrtClientInner { let _ = tx.send(result); }); - // Issue the HTTP request using the CRT's S3 meta request API. We don't need to hold on to - // the resulting meta request, as it's a reference-counted object. - self.s3_client.make_meta_request(options)?; + // Issue the HTTP request using the CRT's S3 meta request API + let meta_request = self.s3_client.make_meta_request(options)?; Self::poll_client_metrics(&self.s3_client); - Ok(S3HttpRequest { receiver: rx }) + Ok(S3HttpRequest { + receiver: rx, + meta_request, + }) } /// Make an HTTP request using this S3 client that returns the body on success or invokes the @@ -729,10 +731,11 @@ impl S3Message { } #[derive(Debug)] -#[pin_project] +#[pin_project(PinnedDrop)] struct S3HttpRequest { #[pin] receiver: oneshot::Receiver>, + meta_request: MetaRequest, } impl Future for S3HttpRequest { @@ -750,6 +753,13 @@ impl Future for S3HttpRequest { } } +#[pinned_drop] +impl PinnedDrop for S3HttpRequest { + fn drop(self: Pin<&mut Self>) { + self.meta_request.cancel(); + } +} + /// Failures to construct a new S3 client #[derive(Error, Debug)] #[non_exhaustive] diff --git a/mountpoint-s3-client/src/s3_crt_client/put_object.rs b/mountpoint-s3-client/src/s3_crt_client/put_object.rs index 0421732df..423acdb6f 100644 --- a/mountpoint-s3-client/src/s3_crt_client/put_object.rs +++ b/mountpoint-s3-client/src/s3_crt_client/put_object.rs @@ -4,6 +4,7 @@ use std::time::Instant; use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult}; use crate::s3_crt_client::{emit_throughput_metric, S3CrtClient, S3RequestError}; use async_trait::async_trait; +use futures::{select_biased, FutureExt as _}; use mountpoint_s3_crt::http::request_response::{Header, Headers}; use mountpoint_s3_crt::io::async_stream::{self, AsyncStreamWriter}; use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType, UploadReview}; @@ -165,11 +166,19 @@ impl PutObjectRequest for S3PutObjectRequest { type ClientError = S3RequestError; async fn write(&mut self, slice: &[u8]) -> ObjectClientResult<(), PutObjectError, Self::ClientError> { - self.total_bytes += slice.len() as u64; - self.writer - .write(slice) - .await - .map_err(|e| S3RequestError::InternalError(Box::new(e)).into()) + // Check if the request has already finished (which can only happen because of an error in + // the request), and fail the write if so. Ordering doesn't matter here as it should be + // impossible for the request to succeed while we still hold `&mut self`, as no one can call + // `complete()`. + select_biased! { + result = self.writer.write(slice).fuse() => { + self.total_bytes += slice.len() as u64; + result.map_err(|e| S3RequestError::InternalError(Box::new(e)).into()) + } + + // Request can't have succeeded if we still own `&mut self` + result = (&mut self.body).fuse() => Err(result.expect_err("request can't succeed while still writing")), + } } async fn complete(self) -> ObjectClientResult { @@ -184,15 +193,25 @@ impl PutObjectRequest for S3PutObjectRequest { review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static, ) -> ObjectClientResult { self.review_callback.set(review_callback); - let body = { - self.writer - .complete() - .await - .map_err(|e| S3RequestError::InternalError(Box::new(e)))?; - self.body - }; - let _ = body.await?; + let mut request = self.body.fuse(); + + // Check if the request has already finished (because of an error), and fail the write if + // so. Ordering here is significant: if both futures are ready (which could happen if we + // were not polled for a long time, and so the request succeeded _because_ of the + // `self.writer.complete()` call), we want to proceed with the complete path. + select_biased! { + result = self.writer.complete().fuse() => { + let _ = result.map_err(|e| S3RequestError::InternalError(Box::new(e)))?; + + // Now wait for the request to finish. + let _ = request.await?; + } + + // Request can't have succeeded if we still own it and the `complete()` future is still + // incomplete, which we know it is because this is a biased select. + result = request => return Err(result.expect_err("request can't succeed while still completing")), + }; let elapsed = self.start_time.elapsed(); emit_throughput_metric(self.total_bytes, elapsed, "put_object"); diff --git a/mountpoint-s3-client/tests/get_object.rs b/mountpoint-s3-client/tests/get_object.rs index 472facb44..0ae08dc5b 100644 --- a/mountpoint-s3-client/tests/get_object.rs +++ b/mountpoint-s3-client/tests/get_object.rs @@ -152,3 +152,48 @@ async fn test_get_object_412_if_match() { Err(ObjectClientError::ServiceError(GetObjectError::PreconditionFailed)) )); } + +#[test_case(false; "early")] +#[test_case(true; "after read")] +#[tokio::test] +async fn test_get_object_cancel(read: bool) { + const OBJECT_SIZE: usize = 30_000_000; + + let sdk_client = get_test_sdk_client().await; + let (bucket, prefix) = get_test_bucket_and_prefix("test_get_object_cancel"); + + // Create one large object named "hello" + let key = format!("{prefix}/hello"); + let body = vec![0x42; OBJECT_SIZE]; + sdk_client + .put_object() + .bucket(&bucket) + .key(&key) + .body(ByteStream::from(body)) + .send() + .await + .unwrap(); + + let client: S3CrtClient = get_test_client(); + + let mut request = client + .get_object(&bucket, &key, None, None) + .await + .expect("get_object should succeed"); + + if read { + let mut bytes = 0; + while let Some(next) = request.next().await { + let (_offset, body) = next.expect("part download should succeed"); + bytes += body.len(); + } + assert_eq!(bytes, OBJECT_SIZE); + } else { + // Wait a bit for the request to make some progress. This is very racy, but should be short + // enough that there are still requests outstanding. + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + // Explicitly cancel the request. + drop(request); +} diff --git a/mountpoint-s3-client/tests/put_object.rs b/mountpoint-s3-client/tests/put_object.rs index e1e4899ba..7de74f4c1 100644 --- a/mountpoint-s3-client/tests/put_object.rs +++ b/mountpoint-s3-client/tests/put_object.rs @@ -142,14 +142,16 @@ async fn test_put_object_dropped(client: &impl ObjectClient, bucket: &str, key: object_client_test!(test_put_object_dropped); // Test for abort PUT object. +#[test_case(30; "small")] +#[test_case(30_000_000; "large")] #[tokio::test] -async fn test_put_object_abort() { +async fn test_put_object_abort(size: usize) { let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_abort"); let client = get_test_client(); let key = format!("{prefix}hello"); let mut rng = rand::thread_rng(); - let mut contents = vec![0u8; 32]; + let mut contents = vec![0u8; size]; rng.fill(&mut contents[..]); let mut request = client @@ -167,9 +169,51 @@ async fn test_put_object_abort() { drop(request); // Drop without calling complete(). - // Allow for the AbortMultipartUpload to complete. + // Try to wait a while for the async abort to complete. For the larger object, this might be + // quite slow, especially in CI. + for _ in 0..20 { + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key) + .await + .unwrap(); + if uploads_in_progress == 0 { + return; + } + } + panic!("upload did not get cleaned up"); +} + +#[tokio::test] +async fn test_put_object_initiate_failure() { + let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_initiate_failure"); + let client = get_test_client(); + let key = format!("{prefix}hello"); + + let params = PutObjectParams::new().storage_class("INVALID_STORAGE_CLASS".into()); + + let mut request = client + .put_object(&bucket, &key, ¶ms) + .await + .expect("put_object should succeed"); + + // The MPU initiation should fail, so we should get an error when we try to write. + let _err = request.write(&[1, 2, 3, 4]).await.expect_err("write should fail"); + + // Try again just to make sure the failure is fused correctly and doesn't block forever if + // someone (incorrectly) tries to write again after a failure. + let _err = request + .write(&[1, 2, 3, 4]) + .await + .expect_err("second write should fail"); + + // Abort the request (which should already have been canceled) + drop(request); + + // Wait a bit to let any requests settle. tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + let sdk_client = get_test_sdk_client().await; let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key) .await .unwrap(); diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index 6054dfbae..6a713216a 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -521,6 +521,20 @@ pub struct MetaRequest { inner: NonNull, } +impl MetaRequest { + /// Cancel the meta request. Does nothing (but does not fail/panic) if the request has already + /// completed. If the request has not already completed, parts may still be delivered to the + /// `body_callback` after this method completes, and the `finish_callback` will still be + /// invoked, but with the `crt_error` field set to `AWS_ERROR_S3_CANCELED`. + pub fn cancel(&self) { + // SAFETY: `self.inner` is a valid `aws_s3_meta_request`, even if the request has otherwise + // finished, since we hold a ref count to it + unsafe { + aws_s3_meta_request_cancel(self.inner.as_ptr()); + } + } +} + impl Drop for MetaRequest { fn drop(&mut self) { // SAFETY: we will no longer use the pointer after this MetaRequest is dropped, so it's safe @@ -531,6 +545,11 @@ impl Drop for MetaRequest { } } +// SAFETY: `aws_s3_meta_request` is thread-safe +unsafe impl Send for MetaRequest {} +// SAFETY: `aws_s3_meta_request` is thread safe +unsafe impl Sync for MetaRequest {} + /// Client metrics which represent current workload of a client. /// Overall, num_requests_tracked_requests shows total number of requests being processed by the client at a time. /// It can be broken down into these numbers by states of the client. From 129aa18314a59f51945d12bbc3b6c02e818a36a9 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Sat, 2 Mar 2024 01:58:58 +0000 Subject: [PATCH 2/4] Adjust client metrics to account for canceled requests Signed-off-by: James Bornholt --- mountpoint-s3-client/src/s3_crt_client.rs | 62 +++++++++++++++-------- mountpoint-s3-crt/src/s3/client.rs | 10 ++++ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/mountpoint-s3-client/src/s3_crt_client.rs b/mountpoint-s3-client/src/s3_crt_client.rs index c93027aa2..9aa715cd3 100644 --- a/mountpoint-s3-client/src/s3_crt_client.rs +++ b/mountpoint-s3-client/src/s3_crt_client.rs @@ -455,7 +455,8 @@ impl S3CrtClientInner { let _guard = span_telemetry.enter(); let http_status = metrics.status_code(); - let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(true); + let request_canceled = metrics.is_canceled(); + let request_failure = http_status.map(|status| !(200..299).contains(&status)).unwrap_or(!request_canceled); let crt_error = Some(metrics.error()).filter(|e| e.is_err()); let request_type = request_type_to_metrics_string(metrics.request_type()); let request_id = metrics.request_id().unwrap_or_else(|| "".into()); @@ -465,6 +466,8 @@ impl S3CrtClientInner { let message = if request_failure { "CRT request failed" + } else if request_canceled { + "CRT request canceled" } else { "CRT request finished" }; @@ -479,6 +482,8 @@ impl S3CrtClientInner { metrics::counter!("s3.requests", "op" => op, "type" => request_type).increment(1); if request_failure { metrics::counter!("s3.requests.failures", "op" => op, "type" => request_type, "status" => http_status.unwrap_or(-1).to_string()).increment(1); + } else if request_canceled { + metrics::counter!("s3.requests.canceled", "op" => op, "type" => request_type).increment(1); } }) .on_headers(move |headers, response_status| { @@ -522,7 +527,12 @@ impl S3CrtClientInner { metrics::gauge!("s3.client.host_count", "host" => hostname).set(host_count as f64); } - let log_level = status_code_to_log_level(request_result.response_status); + let status_code = request_result.response_status; + let log_level = if (200..=399).contains(&status_code) || status_code == 404 || request_result.is_canceled() { + tracing::Level::DEBUG + } else { + tracing::Level::WARN + }; // The `on_finish` callback has a choice of whether to give us an error or not. If // not, fall back to generic error parsing (e.g. for permissions errors), or just no @@ -535,20 +545,29 @@ impl S3CrtClientInner { Ok(t) } Err(maybe_err) => { + let message = if request_result.is_canceled() { + "meta request canceled" + } else { + "meta request failed" + }; if let Some(error) = &maybe_err { - event!(log_level, ?duration, ?error, "meta request failed"); - debug!("failed meta request result: {:?}", request_result); + event!(log_level, ?duration, ?error, message); + debug!("meta request result: {:?}", request_result); } else { - event!(log_level, ?duration, ?request_result, "meta request failed"); + event!(log_level, ?duration, ?request_result, message); } - // If it's not a real HTTP status, encode the CRT error in the metric instead - let error_status = if request_result.response_status >= 100 { - request_result.response_status + if request_result.is_canceled() { + metrics::counter!("s3.meta_requests.canceled", "op" => op).increment(1); } else { - -request_result.crt_error.raw_error() - }; - metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1); + // If it's not a real HTTP status, encode the CRT error in the metric instead + let error_status = if request_result.response_status >= 100 { + request_result.response_status + } else { + -request_result.crt_error.raw_error() + }; + metrics::counter!("s3.meta_requests.failures", "op" => op, "status" => format!("{error_status}")).increment(1); + } // Fill in a generic error if we weren't able to parse one Err(maybe_err.unwrap_or_else(|| ObjectClientError::ClientError(S3RequestError::ResponseError(request_result)))) @@ -808,6 +827,10 @@ pub enum S3RequestError { /// No signing credential is set for requests #[error("No signing credentials found")] NoSigningCredentials, + + /// The request was canceled + #[error("Request canceled")] + RequestCanceled, } impl S3RequestError { @@ -827,16 +850,6 @@ pub enum ConstructionError { InvalidEndpoint(#[from] EndpointError), } -/// Some requests are expected failures, and we want to log those at a different level to unexpected -/// ones. -fn status_code_to_log_level(status_code: i32) -> tracing::Level { - if (200..=399).contains(&status_code) || status_code == 404 { - tracing::Level::DEBUG - } else { - tracing::Level::WARN - } -} - /// Return a string version of a [RequestType] for use in metrics /// /// TODO: Replace this method with `aws_s3_request_metrics_get_operation_name`, @@ -924,13 +937,18 @@ fn try_parse_generic_error(request_result: &MetaRequestResult) -> Option Option { + request_result.is_canceled().then_some(S3RequestError::RequestCanceled) + } + match request_result.response_status { 301 => try_parse_redirect(request_result), // 400 is overloaded, it can be an access error (invalid token) or (for MRAP) a bucket // redirect 400 => try_parse_forbidden(request_result).or_else(|| try_parse_redirect(request_result)), 403 => try_parse_forbidden(request_result), - 0 => try_parse_no_credentials(request_result), + 0 => try_parse_no_credentials(request_result).or_else(|| try_parse_canceled_request(request_result)), _ => None, } } diff --git a/mountpoint-s3-crt/src/s3/client.rs b/mountpoint-s3-crt/src/s3/client.rs index 6a713216a..8df63879b 100644 --- a/mountpoint-s3-crt/src/s3/client.rs +++ b/mountpoint-s3-crt/src/s3/client.rs @@ -748,6 +748,11 @@ impl MetaRequestResult { self.crt_error.is_err() } + /// Return whether this request was canceled according to its error code. + pub fn is_canceled(&self) -> bool { + self.crt_error.raw_error() == mountpoint_s3_crt_sys::aws_s3_errors::AWS_ERROR_S3_CANCELED as i32 + } + /// Convert the CRT's meta request result struct into a safe, owned result. /// SAFETY: This copies from the raw pointer inside of the request result, so only call on /// results given to us from the CRT. @@ -1028,6 +1033,11 @@ impl RequestMetrics { }; Some(Duration::from_nanos(receive_start.saturating_sub(send_end))) } + + /// Return whether the request was canceled according to its error code + pub fn is_canceled(&self) -> bool { + self.error().raw_error() == mountpoint_s3_crt_sys::aws_s3_errors::AWS_ERROR_S3_CANCELED as i32 + } } impl Debug for RequestMetrics { From c43e7d533d54e19d16879c13d99dc79e7e9d8fd9 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Tue, 5 Mar 2024 22:23:08 +0000 Subject: [PATCH 3/4] Disable large object PUT abort test The CRT abort is best-effort -- part uploads can succeed after the Abort succeeds, which effectively recreates the MPU. This is mentioned in the AbortMultipartUpload documentation. Signed-off-by: James Bornholt --- mountpoint-s3-client/tests/put_object.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/mountpoint-s3-client/tests/put_object.rs b/mountpoint-s3-client/tests/put_object.rs index 7de74f4c1..e41d32028 100644 --- a/mountpoint-s3-client/tests/put_object.rs +++ b/mountpoint-s3-client/tests/put_object.rs @@ -142,8 +142,9 @@ async fn test_put_object_dropped(client: &impl ObjectClient, bucket: &str, key: object_client_test!(test_put_object_dropped); // Test for abort PUT object. + #[test_case(30; "small")] -#[test_case(30_000_000; "large")] +// #[test_case(30_000_000; "large")] // The Abort and in-flight parts can race and cause some parts to be left behind, recreating the MPU #[tokio::test] async fn test_put_object_abort(size: usize) { let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_abort"); @@ -169,19 +170,13 @@ async fn test_put_object_abort(size: usize) { drop(request); // Drop without calling complete(). - // Try to wait a while for the async abort to complete. For the larger object, this might be - // quite slow, especially in CI. - for _ in 0..20 { - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + // Allow for the AbortMultipartUpload to complete. + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; - let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key) - .await - .unwrap(); - if uploads_in_progress == 0 { - return; - } - } - panic!("upload did not get cleaned up"); + let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &prefix, &key) + .await + .unwrap(); + assert_eq!(uploads_in_progress, 0); } #[tokio::test] From fa6122d970f5c4bbee04c5014f359a02c47caca0 Mon Sep 17 00:00:00 2001 From: James Bornholt Date: Tue, 5 Mar 2024 22:55:52 +0000 Subject: [PATCH 4/4] Expand a comment Signed-off-by: James Bornholt --- mountpoint-s3-client/tests/get_object.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mountpoint-s3-client/tests/get_object.rs b/mountpoint-s3-client/tests/get_object.rs index 0ae08dc5b..172e2f26b 100644 --- a/mountpoint-s3-client/tests/get_object.rs +++ b/mountpoint-s3-client/tests/get_object.rs @@ -194,6 +194,7 @@ async fn test_get_object_cancel(read: bool) { tokio::time::sleep(std::time::Duration::from_millis(500)).await; } - // Explicitly cancel the request. + // Explicitly cancel the request. We don't have a good way to test that any inflight requests + // were actually cancelled, but we can at least check that the drop doesn't panic/deadlock. drop(request); }