Skip to content

Commit

Permalink
Refactor incremental upload queue (#1181)
Browse files Browse the repository at this point in the history
Internal refactor of the append queue for incremental uploads. Splits up
the initial `HeadObject` request and return the checksum algorithm of
the existing object separately from the `PutObject` responses.

### Does this change impact existing behavior?

No, internal change only.

### Does this change need a changelog entry?

No.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Dec 5, 2024
1 parent 489f2e5 commit 30297eb
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 209 deletions.
70 changes: 70 additions & 0 deletions mountpoint-s3/src/async_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;

use futures::task::{Spawn, SpawnError};

/// Type-erasure for a [Spawn] implementation.
pub struct BoxRuntime(Box<dyn Spawn + Send + Sync>);

impl Spawn for BoxRuntime {
fn spawn_obj(&self, future: futures::task::FutureObj<'static, ()>) -> Result<(), SpawnError> {
self.0.spawn_obj(future)
}
}

impl Debug for BoxRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("BoxRuntime").field(&"dyn").finish()
}
}

impl BoxRuntime {
pub fn new(runtime: impl Spawn + Sync + Send + 'static) -> Self {
BoxRuntime(Box::new(runtime))
}
}

/// Holds a value lazily initialized when awaiting a future.
pub struct Lazy<T, E> {
future: Option<PinFuture<T, E>>,
value: Option<T>,
}

type PinFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + Send>>;

impl<T, E> Lazy<T, E> {
pub fn new(f: impl Future<Output = Result<T, E>> + Send + 'static) -> Self {
Self {
future: Some(Box::pin(f)),
value: None,
}
}

async fn force(&mut self) -> Result<(), E> {
if let Some(f) = self.future.take() {
self.value = Some(f.await?);
}
Ok(())
}

pub async fn get_mut(&mut self) -> Result<&mut T, E> {
self.force().await?;
Ok(self.value.as_mut().unwrap())
}
}

impl<T, E> Debug for Lazy<T, E>
where
T: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut s = f.debug_struct("Lazy");
if let Some(value) = &self.value {
s.field("value", value);
} else {
s.field("future", &"<pending>");
}
s.finish()
}
}
1 change: 1 addition & 0 deletions mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod async_util;
pub mod autoconfigure;
mod build_info;
mod checksums;
Expand Down
33 changes: 3 additions & 30 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::fmt::Debug;
use std::future::Future;

use futures::future::RemoteHandle;
use futures::task::{Spawn, SpawnError, SpawnExt};
use futures::task::Spawn;

use mountpoint_s3_client::error::{HeadObjectError, ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{ChecksumAlgorithm, ETag};
Expand All @@ -11,6 +9,7 @@ use mountpoint_s3_client::ObjectClient;
use thiserror::Error;
use tracing::error;

use crate::async_util::BoxRuntime;
use crate::fs::{ServerSideEncryption, SseCorruptedError};
use crate::mem_limiter::MemoryLimiter;
use crate::sync::Arc;
Expand Down Expand Up @@ -81,7 +80,7 @@ where
) -> Self {
Self {
client,
runtime: runtime.into(),
runtime: BoxRuntime::new(runtime),
mem_limiter,
storage_class,
server_side_encryption,
Expand Down Expand Up @@ -137,29 +136,3 @@ where
/// The limit may slow down writes eventually, but the overall upload throughput
/// is already capped by a single PutObject request.
const MAX_BYTES_IN_QUEUE: usize = 2 * 1024 * 1024 * 1024;

struct BoxRuntime(Box<dyn Spawn + Send + Sync>);
impl BoxRuntime {
fn spawn_with_handle<Fut>(&self, future: Fut) -> Result<RemoteHandle<Fut::Output>, SpawnError>
where
Fut: Future + Send + 'static,
Fut::Output: Send,
{
self.0.spawn_with_handle(future)
}
}

impl Debug for BoxRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("BoxRuntime").field(&"dyn").finish()
}
}

impl<Runtime> From<Runtime> for BoxRuntime
where
Runtime: Spawn + Sync + Send + 'static,
{
fn from(value: Runtime) -> Self {
BoxRuntime(Box::new(value))
}
}
Loading

0 comments on commit 30297eb

Please sign in to comment.