Skip to content

Commit

Permalink
Replace AsyncWrite with Upload trait and rename MultiPartStore to Mul…
Browse files Browse the repository at this point in the history
…tipartStore (#5458) (#5500)

* Replace AsyncWrite with Upload trait (#5458)

* Make BufWriter abortable

* Flesh out cloud implementations

* Review feedback

* Misc tweaks and fixes

* Format

* Replace multi-part with multipart

* More docs

* Clippy

* Rename to MultipartUpload

* Rename ChunkedUpload to WriteMultipart

* Doc tweaks

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Docs

* Format

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
tustvold and alamb authored Mar 19, 2024
1 parent f41c2a4 commit 96c4c0b
Show file tree
Hide file tree
Showing 18 changed files with 691 additions and 883 deletions.
104 changes: 59 additions & 45 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@

//! An object store implementation for S3
//!
//! ## Multi-part uploads
//! ## Multipart uploads
//!
//! Multi-part uploads can be initiated with the [ObjectStore::put_multipart] method.
//! Data passed to the writer is automatically buffered to meet the minimum size
//! requirements for a part. Multiple parts are uploaded concurrently.
//! Multipart uploads can be initiated with the [ObjectStore::put_multipart] method.
//!
//! If the writer fails for any reason, you may have parts uploaded to AWS but not
//! used that you may be charged for. Use the [ObjectStore::abort_multipart] method
//! to abort the upload and drop those unneeded parts. In addition, you may wish to
//! consider implementing [automatic cleanup] of unused parts that are older than one
//! week.
//! used that you will be charged for. [`MultipartUpload::abort`] may be invoked to drop
//! these unneeded parts, however, it is recommended that you consider implementing
//! [automatic cleanup] of unused parts that are older than some threshold.
//!
//! [automatic cleanup]: https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/

Expand All @@ -38,18 +35,17 @@ use futures::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
use tokio::io::AsyncWrite;
use url::Url;

use crate::aws::client::{RequestError, S3Client};
use crate::client::get::GetClientExt;
use crate::client::list::ListClientExt;
use crate::client::CredentialProvider;
use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
use crate::multipart::{MultipartStore, PartId};
use crate::signer::Signer;
use crate::{
Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Path, PutMode,
PutOptions, PutResult, Result,
Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta,
ObjectStore, Path, PutMode, PutOptions, PutResult, Result, UploadPart,
};

static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging");
Expand Down Expand Up @@ -85,6 +81,7 @@ const STORE: &str = "S3";

/// [`CredentialProvider`] for [`AmazonS3`]
pub type AwsCredentialProvider = Arc<dyn CredentialProvider<Credential = AwsCredential>>;
use crate::client::parts::Parts;
pub use credential::{AwsAuthorizer, AwsCredential};

/// Interface for [Amazon S3](https://aws.amazon.com/s3/).
Expand Down Expand Up @@ -211,25 +208,18 @@ impl ObjectStore for AmazonS3 {
}
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let id = self.client.create_multipart(location).await?;

let upload = S3MultiPartUpload {
location: location.clone(),
upload_id: id.clone(),
client: Arc::clone(&self.client),
};

Ok((id, Box::new(WriteMultiPart::new(upload, 8))))
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.client
.delete_request(location, &[("uploadId", multipart_id)])
.await
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
let upload_id = self.client.create_multipart(location).await?;

Ok(Box::new(S3MultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
upload_id: upload_id.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -319,30 +309,55 @@ impl ObjectStore for AmazonS3 {
}
}

#[derive(Debug)]
struct S3MultiPartUpload {
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
parts: Parts,
location: Path,
upload_id: String,
client: Arc<S3Client>,
}

#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
self.client
.put_part(&self.location, &self.upload_id, part_idx, buf.into())
impl MultipartUpload for S3MultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state
.client
.put_part(&state.location, &state.upload_id, idx, data)
.await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.complete_multipart(&self.state.location, &self.state.upload_id, parts)
.await
}

async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
self.client
.complete_multipart(&self.location, &self.upload_id, completed_parts)
.await?;
Ok(())
async fn abort(&mut self) -> Result<()> {
self.state
.client
.delete_request(&self.state.location, &[("uploadId", &self.state.upload_id)])
.await
}
}

#[async_trait]
impl MultiPartStore for AmazonS3 {
impl MultipartStore for AmazonS3 {
async fn create_multipart(&self, path: &Path) -> Result<MultipartId> {
self.client.create_multipart(path).await
}
Expand Down Expand Up @@ -377,7 +392,6 @@ mod tests {
use crate::{client::get::GetClient, tests::*};
use bytes::Bytes;
use hyper::HeaderMap;
use tokio::io::AsyncWriteExt;

const NON_EXISTENT_NAME: &str = "nonexistentname";

Expand Down Expand Up @@ -542,9 +556,9 @@ mod tests {
store.put(&locations[0], data.clone()).await.unwrap();
store.copy(&locations[0], &locations[1]).await.unwrap();

let (_, mut writer) = store.put_multipart(&locations[2]).await.unwrap();
writer.write_all(&data).await.unwrap();
writer.shutdown().await.unwrap();
let mut upload = store.put_multipart(&locations[2]).await.unwrap();
upload.put_part(data.clone()).await.unwrap();
upload.complete().await.unwrap();

for location in &locations {
let res = store
Expand Down
80 changes: 47 additions & 33 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,15 @@
//!
//! ## Streaming uploads
//!
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
//! are uploaded concurrently.
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those blocks.
//!
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
multipart::{MultipartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Result,
GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore,
PutOptions, PutResult, Result, UploadPart,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -40,7 +36,6 @@ use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncWrite;
use url::Url;

use crate::client::get::GetClientExt;
Expand All @@ -54,6 +49,8 @@ mod credential;

/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = AzureCredential>>;
use crate::azure::client::AzureClient;
use crate::client::parts::Parts;
pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
pub use credential::AzureCredential;

Expand Down Expand Up @@ -94,21 +91,15 @@ impl ObjectStore for MicrosoftAzure {
self.client.put_blob(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let inner = AzureMultiPartUpload {
client: Arc::clone(&self.client),
location: location.to_owned(),
};
Ok((String::new(), Box::new(WriteMultiPart::new(inner, 8))))
}

async fn abort_multipart(&self, _location: &Path, _multipart_id: &MultipartId) -> Result<()> {
// There is no way to drop blocks that have been uploaded. Instead, they simply
// expire in 7 days.
Ok(())
async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
Ok(Box::new(AzureMultiPartUpload {
part_idx: 0,
state: Arc::new(UploadState {
client: Arc::clone(&self.client),
location: location.clone(),
parts: Default::default(),
}),
}))
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down Expand Up @@ -197,26 +188,49 @@ impl Signer for MicrosoftAzure {
/// put_multipart_part -> PUT block
/// complete -> PUT block list
/// abort -> No equivalent; blocks are simply dropped after 7 days
#[derive(Debug, Clone)]
#[derive(Debug)]
struct AzureMultiPartUpload {
client: Arc<client::AzureClient>,
part_idx: usize,
state: Arc<UploadState>,
}

#[derive(Debug)]
struct UploadState {
location: Path,
parts: Parts,
client: Arc<AzureClient>,
}

#[async_trait]
impl PutPart for AzureMultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, idx: usize) -> Result<PartId> {
self.client.put_block(&self.location, idx, buf.into()).await
impl MultipartUpload for AzureMultiPartUpload {
fn put_part(&mut self, data: Bytes) -> UploadPart {
let idx = self.part_idx;
self.part_idx += 1;
let state = Arc::clone(&self.state);
Box::pin(async move {
let part = state.client.put_block(&state.location, idx, data).await?;
state.parts.put(idx, part);
Ok(())
})
}

async fn complete(&mut self) -> Result<PutResult> {
let parts = self.state.parts.finish(self.part_idx)?;

self.state
.client
.put_block_list(&self.state.location, parts)
.await
}

async fn complete(&self, parts: Vec<PartId>) -> Result<()> {
self.client.put_block_list(&self.location, parts).await?;
async fn abort(&mut self) -> Result<()> {
// Nothing to do
Ok(())
}
}

#[async_trait]
impl MultiPartStore for MicrosoftAzure {
impl MultipartStore for MicrosoftAzure {
async fn create_multipart(&self, _: &Path) -> Result<MultipartId> {
Ok(String::new())
}
Expand Down
Loading

0 comments on commit 96c4c0b

Please sign in to comment.