Skip to content

Commit

Permalink
Adopt polling API for uploading data
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
Alessandro Passaro authored and passaro committed May 10, 2024
1 parent a2d0af6 commit 82eeeb7
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 233 deletions.
21 changes: 13 additions & 8 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,17 @@ impl PutObjectRequest for S3PutObjectRequest {
create_mpu.await.unwrap()?;
}

// Write will fail if the request has already finished (because of an error).
self.body
.meta_request
.write(slice, false)
.await
.map_err(S3RequestError::CrtError)?;
self.total_bytes += slice.len() as u64;
let meta_request = &mut self.body.meta_request;
let mut slice = slice;
while !slice.is_empty() {
// Write will fail if the request has already finished (because of an error).
let remaining = meta_request
.write(slice, false)
.await
.map_err(S3RequestError::CrtError)?;
self.total_bytes += slice.len() as u64;
slice = remaining;
}
Ok(())
}

Expand All @@ -195,7 +199,8 @@ impl PutObjectRequest for S3PutObjectRequest {
self.review_callback.set(review_callback);

// Write will fail if the request has already finished (because of an error).
self.body
_ = self
.body
.meta_request
.write(&[], true)
.await
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mountpoint_s3_client::types::{
};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt_sys::aws_s3_errors;
use mountpoint_s3_crt_sys::aws_common_error;
use rand::Rng;
use test_case::test_case;

Expand Down Expand Up @@ -236,7 +236,7 @@ async fn test_put_object_write_cancelled() {
.await
.expect_err("further writes should fail");
assert!(
matches!(err, ObjectClientError::ClientError(S3RequestError::CrtError(e)) if e.raw_error() == aws_s3_errors::AWS_ERROR_S3_REQUEST_HAS_COMPLETED as i32)
matches!(err, ObjectClientError::ClientError(S3RequestError::CrtError(e)) if e.raw_error() == aws_common_error::AWS_ERROR_INVALID_STATE as i32)
);
}

Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-crt-sys/crt/aws-c-s3
Submodule aws-c-s3 updated 46 files
+0 −66 benchmarks/README.md
+0 −15 benchmarks/benchmark-config.json
+0 −13 benchmarks/benchmarks-stack/benchmarks-stack/.gitignore
+0 −6 benchmarks/benchmarks-stack/benchmarks-stack/.npmignore
+0 −14 benchmarks/benchmarks-stack/benchmarks-stack/bin/benchmarks-stack.ts
+0 −15 benchmarks/benchmarks-stack/benchmarks-stack/buildspec.yml
+0 −16 benchmarks/benchmarks-stack/benchmarks-stack/cdk.json
+0 −50 benchmarks/benchmarks-stack/benchmarks-stack/deploy/benchmarks_deploy.js
+0 −7 benchmarks/benchmarks-stack/benchmarks-stack/jest.config.js
+0 −163 benchmarks/benchmarks-stack/benchmarks-stack/lib/benchmarks-stack.ts
+0 −83 benchmarks/benchmarks-stack/benchmarks-stack/lib/canary-policy-doc.json
+0 −29 benchmarks/benchmarks-stack/benchmarks-stack/lib/canary.sh
+0 −42 benchmarks/benchmarks-stack/benchmarks-stack/lib/get_p90.py
+0 −151 benchmarks/benchmarks-stack/benchmarks-stack/lib/init_instance.sh
+0 −65 benchmarks/benchmarks-stack/benchmarks-stack/lib/project_scripts/run_aws_c_s3.sh
+0 −46 benchmarks/benchmarks-stack/benchmarks-stack/lib/project_scripts/run_java_crt.sh
+0 −18 benchmarks/benchmarks-stack/benchmarks-stack/lib/run_project_template.sh
+0 −59 benchmarks/benchmarks-stack/benchmarks-stack/lib/show_instance_dashboard.sh
+0 −33 benchmarks/benchmarks-stack/benchmarks-stack/package.json
+0 −0 benchmarks/benchmarks-stack/benchmarks-stack/response.json
+0 −40 benchmarks/benchmarks-stack/benchmarks-stack/test/benchmarks.test.ts
+0 −23 benchmarks/benchmarks-stack/benchmarks-stack/tsconfig.json
+0 −14 benchmarks/dashboard-stack/.gitignore
+0 −6 benchmarks/dashboard-stack/.npmignore
+0 −15 benchmarks/dashboard-stack/bin/benchmarks.ts
+0 −12 benchmarks/dashboard-stack/cdk.json
+0 −7 benchmarks/dashboard-stack/jest.config.js
+0 −68 benchmarks/dashboard-stack/lambda/benchmarkManager.py
+0 −367 benchmarks/dashboard-stack/lib/dashboard-stack.ts
+0 −10 benchmarks/dashboard-stack/lib/policy-doc/admin-policy-doc.json
+0 −39 benchmarks/dashboard-stack/package.json
+0 −0 benchmarks/dashboard-stack/response.json
+0 −23 benchmarks/dashboard-stack/tsconfig.json
+6 −0 docs/memory_aware_request_execution.md
+27 −8 include/aws/s3/private/s3_buffer_pool.h
+11 −19 include/aws/s3/private/s3_meta_request_impl.h
+71 −4 include/aws/s3/s3_client.h
+16 −3 source/s3_auto_ranged_put.c
+91 −15 source/s3_buffer_pool.c
+182 −116 source/s3_meta_request.c
+6 −1 source/s3_request_messages.c
+4 −2 tests/CMakeLists.txt
+13 −87 tests/s3_asyncwrite_tests.c
+142 −0 tests/s3_buffer_pool_tests.c
+1 −1 tests/s3_data_plane_tests.c
+6 −2 tests/s3_request_messages_tests.c
189 changes: 1 addition & 188 deletions mountpoint-s3-crt/src/io/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,13 @@
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::task::{Context, Poll};

use futures::channel::oneshot;
use futures::future::BoxFuture;
use futures::task::ArcWake;
use futures::{FutureExt, TryFutureExt};
use mountpoint_s3_crt_sys::{
aws_future_void, aws_future_void_get_error, aws_future_void_is_done, aws_future_void_register_callback,
aws_future_void_release,
};
use thiserror::Error;

use crate::common::allocator::Allocator;
Expand Down Expand Up @@ -226,125 +220,17 @@ pub enum JoinError {
InternalError(#[from] crate::common::error::Error),
}

/// Wraps a [aws_future_void].
#[derive(Debug)]
pub struct FutureVoid {
inner: NonNull<aws_future_void>,
waker: Arc<Mutex<Option<Waker>>>,
}

// SAFETY: `aws_future_void` is thread-safe
unsafe impl Send for FutureVoid {}

impl Drop for FutureVoid {
fn drop(&mut self) {
// SAFETY: `self.inner` contains a valid `aws_future_void`.
unsafe {
aws_future_void_release(self.inner.as_ptr());
}
}
}

impl FutureVoid {
/// Return whether the future is done
pub fn is_done(&self) -> bool {
// SAFETY: `self.inner` contains a valid `aws_future_void`.
unsafe { aws_future_void_is_done(self.inner.as_ptr()) }
}

/// Create a [FutureVoid] from a [aws_future_void].
///
/// ## Safety
///
/// `inner` must be a valid [aws_future_void] with no registered callbacks.
pub unsafe fn from_crt(inner: NonNull<aws_future_void>) -> Self {
Self {
inner,
waker: Arc::new(Mutex::new(None)),
}
}

/// Get the result of this future if completed.
fn try_get_result(&self) -> Option<Result<(), crate::common::error::Error>> {
if !self.is_done() {
return None;
}

let result = {
// SAFETY: `self.inner` has completed.
let future_result = unsafe { aws_future_void_get_error(self.inner.as_ptr()) };
let error_result: crate::common::error::Error = future_result.into();
if error_result.is_err() {
Err(error_result)
} else {
Ok(())
}
};
Some(result)
}
}

impl Future for FutureVoid {
type Output = Result<(), crate::common::error::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut waker = self.waker.lock().unwrap();
if let Some(result) = self.try_get_result() {
// The future has completed. Remove the waker, if any, and return the result.
_ = waker.take();
Poll::Ready(result)
} else {
// The future has not completed yet. Do we need to register the callback?
match *waker {
Some(ref mut waker) => {
// The callback has already been registered, just replace the waker.
waker.clone_from(cx.waker());
}
None => {
// Store the waker. Drop the lock in case the callback runs synchronously during registration.
*waker = Some(cx.waker().clone());
drop(waker);

// `user_data` will be cleaned up in `future_void_callback`.
let user_data = Arc::into_raw(self.waker.clone()) as *mut ::libc::c_void;

// SAFETY: `self.inner.as_ptr()` is a valid `aws_future_void` and this is the only callback we are registering.
unsafe {
aws_future_void_register_callback(self.inner.as_ptr(), Some(future_void_callback), user_data);
}
}
}
Poll::Pending
}
}
}

/// Safety: Don't call this function directly, only called by the CRT as a callback.
unsafe extern "C" fn future_void_callback(user_data: *mut ::libc::c_void) {
// Take ownership of the `Arc` in `user_data`.
let waker = Arc::from_raw(user_data as *mut Mutex<Option<Waker>>);
let Some(waker) = waker.lock().unwrap().take() else {
// Waker removed on `poll` finding that the future had already completed.
// Nothing to do here.
return;
};
// Notify the waker that the future has completed.
waker.wake();
}

#[cfg(test)]
mod test {
use futures::executor::block_on;
use futures::future::join_all;
use mountpoint_s3_crt_sys::{aws_future_void_new, aws_future_void_set_error, aws_future_void_set_result};
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::time::Duration;

use super::*;
use crate::common::allocator::Allocator;
use crate::io::event_loop::{EventLoopGroup, EventLoopTimer};
use std::sync::atomic::Ordering;
use test_case::test_case;

/// Test that running a small future on an event loop works correctly.
#[test]
Expand Down Expand Up @@ -451,77 +337,4 @@ mod test {
"flag should still be false after cancellation"
);
}

#[test_case(Ok(()))]
#[test_case(Err(42))]
fn test_future_void_already_done(value: Result<(), i32>) {
let allocator = Allocator::default();
let aws_future = new_aws_future_void(&allocator);
set_aws_future_void_value(aws_future, value);

// SAFETY: `aws_future` is a valid `aws_future_void`.
let future_void = unsafe { FutureVoid::from_crt(aws_future) };

// Verify that the wrapper has completed and contains the set value.
assert!(future_void.is_done());
let Some(result) = future_void.try_get_result() else {
panic!("result should be available when the future is done");
};
assert_eq!(result.map_err(|e| e.raw_error()), value);

// Verify that the wrapper returns the set value when awaited.
let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
let future_handle = el_group.spawn_future(future_void);
let result = future_handle.wait().unwrap();
assert_eq!(result.map_err(|e| e.raw_error()), value);
}

#[test_case(Ok(()))]
#[test_case(Err(42))]
fn test_future_void_wake_up(value: Result<(), i32>) {
let allocator = Allocator::default();

let aws_future = new_aws_future_void(&allocator);
// SAFETY: `aws_future` is a valid `aws_future_void`.
let future_void = unsafe { FutureVoid::from_crt(aws_future) };

// Set up a flag that will set to true after awaiting future_void.
let flag = Arc::new(AtomicBool::new(false));

let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
let future_handle = {
let flag = flag.clone();
el_group.spawn_future(async move {
let result = future_void.await;
flag.store(true, Ordering::SeqCst);
result
})
};
assert!(
!flag.load(Ordering::SeqCst),
"the spawned future should not have completed and set the flag"
);
set_aws_future_void_value(aws_future, value);
let result = future_handle.wait().unwrap();
assert!(
flag.load(Ordering::SeqCst),
"the spawned future should have set the flag"
);
assert_eq!(result.map_err(|e| e.raw_error()), value);
}

fn new_aws_future_void(allocator: &Allocator) -> NonNull<aws_future_void> {
// SAFETY: `allocator` is a valid `aws_allocator` and `aws_future_void_new` returns a
// pointer to a valid `aws_future_void`.
unsafe { NonNull::new_unchecked(aws_future_void_new(allocator.inner.as_ptr())) }
}

fn set_aws_future_void_value(aws_future: NonNull<aws_future_void>, value: Result<(), i32>) {
match value {
// SAFETY: `aws_future` is a valid `aws_future_void`.
Ok(()) => unsafe { aws_future_void_set_result(aws_future.as_ptr()) },
// SAFETY: `aws_future` is a valid `aws_future_void`.
Err(code) => unsafe { aws_future_void_set_error(aws_future.as_ptr(), code) },
}
}
}
Loading

0 comments on commit 82eeeb7

Please sign in to comment.