diff --git a/mountpoint-s3/src/upload.rs b/mountpoint-s3/src/upload.rs index ce93f08f0..213d1f677 100644 --- a/mountpoint-s3/src/upload.rs +++ b/mountpoint-s3/src/upload.rs @@ -100,7 +100,10 @@ pub struct AppendUploader { mem_limiter: Arc>, buffer_size: usize, server_side_encryption: ServerSideEncryption, - checksum_algorithm: Option, + /// Default checksum algorithm, if any, to be used for new S3 objects created by an [AppendUploader]. + /// + /// For existing objects, Mountpoint will instead append using the existing checksum algorithm on the object. + default_checksum_algorithm: Option, } #[derive(Debug, Error)] @@ -134,7 +137,7 @@ where mem_limiter: Arc>, buffer_size: usize, server_side_encryption: ServerSideEncryption, - checksum_algorithm: Option, + default_checksum_algorithm: Option, ) -> Self { Self { client, @@ -142,7 +145,7 @@ where mem_limiter, buffer_size, server_side_encryption, - checksum_algorithm, + default_checksum_algorithm, } } @@ -160,7 +163,7 @@ where initial_offset, initial_etag, server_side_encryption: self.server_side_encryption.clone(), - checksum_algorithm: self.checksum_algorithm.clone(), + default_checksum_algorithm: self.default_checksum_algorithm.clone(), capacity: MAX_BYTES_IN_QUEUE / self.buffer_size, }; AppendUploadRequest::new( diff --git a/mountpoint-s3/src/upload/incremental.rs b/mountpoint-s3/src/upload/incremental.rs index 632b9beee..20d270ca0 100644 --- a/mountpoint-s3/src/upload/incremental.rs +++ b/mountpoint-s3/src/upload/incremental.rs @@ -20,6 +20,12 @@ use crate::ServerSideEncryption; use super::hasher::ChecksumHasher; use super::{AppendUploadError, BoxRuntime, ChecksumHasherError}; +/// Handle for appending data to an S3 object. +/// +/// This request contains a buffer that can be written to, +/// before being pushed to the [AppendUploadQueue] +/// which may queue some number of buffers while appending data to the S3 object +/// over multiple subsequent S3 PutObject with offset requests. #[derive(Debug)] pub struct AppendUploadRequest { /// The current buffer, initialized lazily on write. @@ -107,19 +113,35 @@ where } } +/// Output from the [AppendUploadQueue], +/// providing information about the last PutObject request +/// or providing the required checksum algorithm when first initialized. #[derive(Debug)] enum Output { + /// Initial output state, dictating which checksum algorithm should be used for all append requests. ChecksumAlgorithm(Option), + /// Result of a PutObject with offset request. Result(PutObjectResult), } +/// Queue for an active 'append' to an S3 object. +/// +/// This struct has message channels whose queues may contain a number of buffered parts to be appended to an object. +/// The struct is responsible for taking requests from the queue and initiating the S3 PutObject with offset requests. +/// +/// Requests should be sent to this struct using [AppendUploadRequest::write]. #[derive(Debug)] struct AppendUploadQueue { + /// Channel handle for sending buffers to be appended to the object. request_sender: Sender>, + /// Channel handle for receiving the result of S3 requests via [Output] messages. output_receiver: Receiver>>, mem_limiter: Arc>, _task_handle: RemoteHandle<()>, /// Algorithm used to compute checksums. Initialized asynchronously in [get_buffer]. + /// + /// Outer [Option] represents if the algorithm configuration is known yet, + /// while the inner [Option] having value [None] indicates that no algorithm should be used. checksum_algorithm: Option>, /// Stores the last successful result to return in [join]. last_known_result: Option, @@ -133,7 +155,10 @@ pub struct AppendUploadQueueParams { pub initial_offset: u64, pub initial_etag: Option, pub server_side_encryption: ServerSideEncryption, - pub checksum_algorithm: Option, + /// Preferred checksum algorithm for new objects. + /// + /// If the object already exists, its current algorithm will be used instead. + pub default_checksum_algorithm: Option, pub capacity: usize, } @@ -157,6 +182,10 @@ where let task_handle = runtime .spawn_with_handle( async move { + /// Send result of S3 requests to the [AppendUploadQueue]. + /// + /// Returns `true` if output was sent successfully. + /// When the output cannot be sent, buffer receiver will be shut down. async fn send_output( sender: &Sender>>, receiver: &Receiver>, @@ -186,7 +215,7 @@ where let first_output = if offset == 0 { // If we are creating a new object or overwriting (truncate), use the default checksum algorithm. - Ok(Output::ChecksumAlgorithm(params.checksum_algorithm)) + Ok(Output::ChecksumAlgorithm(params.default_checksum_algorithm)) } else { // We are appending to an existing object, find out which checksum algorithm it uses. match client @@ -217,6 +246,7 @@ where return; } + // Main loop waiting on new buffers to append to the S3 object. while let Ok(buffer) = request_receiver.recv().await { let buffer_len = buffer.len(); let result = append(&client, &bucket, &key, buffer, offset, etag.take(), sse.clone()) @@ -320,6 +350,9 @@ where )?) } + /// Wait on output, updating the state of the [AppendUploadQueue] when next output arrives. + /// + /// Returns `true` when next output is successfully consumed, or `false` when no more output is available. async fn consume_next_output(&mut self) -> Result> { let Ok(output) = self.output_receiver.recv().await else { return Ok(false);