Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional Rustdoc to incremental upload module #1169

Merged
merged 1 commit into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ pub struct AppendUploader<Client: ObjectClient> {
mem_limiter: Arc<MemoryLimiter<Client>>,
buffer_size: usize,
server_side_encryption: ServerSideEncryption,
checksum_algorithm: Option<ChecksumAlgorithm>,
/// Default checksum algorithm, if any, to be used for new S3 objects created by an [AppendUploader].
///
/// For existing objects, Mountpoint will instead append using the existing checksum algorithm on the object.
default_checksum_algorithm: Option<ChecksumAlgorithm>,
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -134,15 +137,15 @@ where
mem_limiter: Arc<MemoryLimiter<Client>>,
buffer_size: usize,
server_side_encryption: ServerSideEncryption,
checksum_algorithm: Option<ChecksumAlgorithm>,
default_checksum_algorithm: Option<ChecksumAlgorithm>,
) -> Self {
Self {
client,
runtime: runtime.into(),
mem_limiter,
buffer_size,
server_side_encryption,
checksum_algorithm,
default_checksum_algorithm,
}
}

Expand All @@ -160,7 +163,7 @@ where
initial_offset,
initial_etag,
server_side_encryption: self.server_side_encryption.clone(),
checksum_algorithm: self.checksum_algorithm.clone(),
default_checksum_algorithm: self.default_checksum_algorithm.clone(),
capacity: MAX_BYTES_IN_QUEUE / self.buffer_size,
};
AppendUploadRequest::new(
Expand Down
37 changes: 35 additions & 2 deletions mountpoint-s3/src/upload/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ use crate::ServerSideEncryption;
use super::hasher::ChecksumHasher;
use super::{AppendUploadError, BoxRuntime, ChecksumHasherError};

/// Handle for appending data to an S3 object.
///
/// This request contains a buffer that can be written to,
/// before being pushed to the [AppendUploadQueue]
/// which may queue some number of buffers while appending data to the S3 object
/// over multiple subsequent S3 PutObject with offset requests.
#[derive(Debug)]
pub struct AppendUploadRequest<Client: ObjectClient> {
/// The current buffer, initialized lazily on write.
Expand Down Expand Up @@ -107,19 +113,35 @@ where
}
}

/// Output from the [AppendUploadQueue],
/// providing information about the last PutObject request
/// or providing the required checksum algorithm when first initialized.
#[derive(Debug)]
enum Output {
/// Initial output state, dictating which checksum algorithm should be used for all append requests.
ChecksumAlgorithm(Option<ChecksumAlgorithm>),
/// Result of a PutObject with offset request.
Result(PutObjectResult),
}

/// Queue for an active 'append' to an S3 object.
///
/// This struct has message channels whose queues may contain a number of buffered parts to be appended to an object.
/// The struct is responsible for taking requests from the queue and initiating the S3 PutObject with offset requests.
///
/// Requests should be sent to this struct using [AppendUploadRequest::write].
#[derive(Debug)]
struct AppendUploadQueue<Client: ObjectClient> {
/// Channel handle for sending buffers to be appended to the object.
request_sender: Sender<UploadBuffer<Client>>,
/// Channel handle for receiving the result of S3 requests via [Output] messages.
output_receiver: Receiver<Result<Output, AppendUploadError<Client::ClientError>>>,
mem_limiter: Arc<MemoryLimiter<Client>>,
_task_handle: RemoteHandle<()>,
/// Algorithm used to compute checksums. Initialized asynchronously in [get_buffer].
///
/// Outer [Option] represents if the algorithm configuration is known yet,
/// while the inner [Option] having value [None] indicates that no algorithm should be used.
checksum_algorithm: Option<Option<ChecksumAlgorithm>>,
/// Stores the last successful result to return in [join].
last_known_result: Option<PutObjectResult>,
Expand All @@ -133,7 +155,10 @@ pub struct AppendUploadQueueParams {
pub initial_offset: u64,
pub initial_etag: Option<ETag>,
pub server_side_encryption: ServerSideEncryption,
pub checksum_algorithm: Option<ChecksumAlgorithm>,
/// Preferred checksum algorithm for new objects.
///
/// If the object already exists, its current algorithm will be used instead.
pub default_checksum_algorithm: Option<ChecksumAlgorithm>,
pub capacity: usize,
}

Expand All @@ -157,6 +182,10 @@ where
let task_handle = runtime
.spawn_with_handle(
async move {
/// Send result of S3 requests to the [AppendUploadQueue].
///
/// Returns `true` if output was sent successfully.
/// When the output cannot be sent, buffer receiver will be shut down.
async fn send_output<Client: ObjectClient>(
sender: &Sender<Result<Output, AppendUploadError<Client::ClientError>>>,
receiver: &Receiver<UploadBuffer<Client>>,
Expand Down Expand Up @@ -186,7 +215,7 @@ where

let first_output = if offset == 0 {
// If we are creating a new object or overwriting (truncate), use the default checksum algorithm.
Ok(Output::ChecksumAlgorithm(params.checksum_algorithm))
Ok(Output::ChecksumAlgorithm(params.default_checksum_algorithm))
} else {
// We are appending to an existing object, find out which checksum algorithm it uses.
match client
Expand Down Expand Up @@ -217,6 +246,7 @@ where
return;
}

// Main loop waiting on new buffers to append to the S3 object.
while let Ok(buffer) = request_receiver.recv().await {
let buffer_len = buffer.len();
let result = append(&client, &bucket, &key, buffer, offset, etag.take(), sse.clone())
Expand Down Expand Up @@ -320,6 +350,9 @@ where
)?)
}

/// Wait on output, updating the state of the [AppendUploadQueue] when next output arrives.
///
/// Returns `true` when next output is successfully consumed, or `false` when no more output is available.
async fn consume_next_output(&mut self) -> Result<bool, AppendUploadError<Client::ClientError>> {
let Ok(output) = self.output_receiver.recv().await else {
return Ok(false);
Expand Down
Loading