-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC to buffer write requests #854
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,12 @@ use std::time::Instant; | |
|
||
use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult}; | ||
use crate::s3_crt_client::{emit_throughput_metric, S3CrtClient, S3RequestError}; | ||
use async_channel::{unbounded, Receiver, Sender}; | ||
use async_trait::async_trait; | ||
use futures::channel::oneshot; | ||
use futures::task::SpawnExt; | ||
use mountpoint_s3_crt::http::request_response::{Header, Headers}; | ||
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType, RequestType, UploadReview}; | ||
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequest, MetaRequestType, RequestType, UploadReview}; | ||
use tracing::error; | ||
|
||
use super::{S3CrtClientInner, S3HttpRequest}; | ||
|
@@ -96,13 +98,34 @@ impl S3CrtClient { | |
on_headers, | ||
)?; | ||
|
||
let (write_completed_sender, write_completed) = oneshot::channel(); | ||
let (part_sender, part_receiver) = unbounded::<(Vec<u8>, bool)>(); | ||
let meta_request = body.meta_request.clone(); | ||
_ = self.event_loop_group().spawn(async move { | ||
async fn process( | ||
mut meta_request: MetaRequest, | ||
part_receiver: Receiver<(Vec<u8>, bool)>, | ||
) -> Result<(), S3RequestError> { | ||
while let Ok((part, eof)) = part_receiver.recv().await { | ||
// Write will fail if the request has already finished (because of an error). | ||
meta_request.write(&part, eof).await.map_err(S3RequestError::CrtError)?; | ||
} | ||
Ok(()) | ||
} | ||
|
||
_ = write_completed_sender.send(process(meta_request, part_receiver).await); | ||
}); | ||
|
||
Ok(S3PutObjectRequest { | ||
body, | ||
review_callback, | ||
start_time: Instant::now(), | ||
total_bytes: 0, | ||
response_headers, | ||
pending_create_mpu: Some(mpu_created), | ||
part_sender, | ||
buffer: Vec::with_capacity(self.inner.part_size), | ||
write_completed, | ||
}) | ||
} | ||
} | ||
|
@@ -155,6 +178,9 @@ pub struct S3PutObjectRequest { | |
/// Signal indicating that CreateMultipartUpload completed successfully, or that the MPU failed. | ||
/// Set to [None] once awaited on the first write, meaning the MPU was already created or failed. | ||
pending_create_mpu: Option<oneshot::Receiver<Result<(), S3RequestError>>>, | ||
part_sender: Sender<(Vec<u8>, bool)>, | ||
buffer: Vec<u8>, | ||
write_completed: oneshot::Receiver<Result<(), S3RequestError>>, | ||
} | ||
|
||
fn try_get_header_value(headers: &Headers, key: &str) -> Option<String> { | ||
|
@@ -172,12 +198,30 @@ impl PutObjectRequest for S3PutObjectRequest { | |
create_mpu.await.unwrap()?; | ||
} | ||
|
||
// Write will fail if the request has already finished (because of an error). | ||
self.body | ||
.meta_request | ||
.write(slice, false) | ||
.await | ||
.map_err(S3RequestError::CrtError)?; | ||
// Send data to the write queue. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the backpressure is not implemented in any form with this change, right? was it the same before switching to async write API? By not having backpressure I mean that |
||
// TODO: review/extract into fn | ||
{ | ||
let mut slice = slice; | ||
while !slice.is_empty() { | ||
let split = slice | ||
.len() | ||
.min(self.buffer.capacity().saturating_sub(self.buffer.len())); | ||
self.buffer.extend_from_slice(&slice[..split]); | ||
slice = &slice[split..]; | ||
|
||
if self.buffer.len() == self.buffer.capacity() { | ||
let mut buffer = Vec::with_capacity(self.buffer.capacity()); | ||
std::mem::swap(&mut self.buffer, &mut buffer); | ||
_ = self.part_sender.send((buffer, false)).await; | ||
} | ||
} | ||
|
||
// Check whether any write so far reported an error. | ||
if let Ok(Some(Err(error))) = self.write_completed.try_recv() { | ||
return Err(error.into()); | ||
} | ||
} | ||
|
||
self.total_bytes += slice.len() as u64; | ||
Ok(()) | ||
} | ||
|
@@ -192,12 +236,15 @@ impl PutObjectRequest for S3PutObjectRequest { | |
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> { | ||
self.review_callback.set(review_callback); | ||
|
||
// Write will fail if the request has already finished (because of an error). | ||
self.body | ||
.meta_request | ||
.write(&[], true) | ||
.await | ||
.map_err(S3RequestError::CrtError)?; | ||
// Complete the write. | ||
// TODO: review/extract into fn | ||
_ = self.part_sender.send((self.buffer, true)).await; | ||
drop(self.part_sender); | ||
|
||
// Check whether any write so far reported an error. | ||
if let Ok(result) = self.write_completed.await { | ||
result?; | ||
} | ||
|
||
// Now wait for the request to finish. | ||
let _ = self.body.await?; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the channel provided by tokio (which we already have a dep on) useful here?
https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html