Skip to content

Commit

Permalink
Replace AsyncWrite with Upload trait (apache#5458)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Mar 13, 2024
1 parent ad3b4c9 commit 521e918
Show file tree
Hide file tree
Showing 15 changed files with 539 additions and 755 deletions.
10 changes: 3 additions & 7 deletions object_store/src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
//!
//! ## Streaming uploads
//!
//! [ObjectStore::put_multipart] will upload data in blocks and write a blob from those
//! blocks. Data is buffered internally to make blocks of at least 5MB and blocks
//! are uploaded concurrently.
//! [ObjectStore::upload] will upload data in blocks and write a blob from those blocks.
//!
//! [ObjectStore::abort_multipart] is a no-op, since Azure Blob Store doesn't provide
//! a way to drop old blocks. Instead unused blocks are automatically cleaned up
//! after 7 days.
//! Unused blocks will automatically be dropped after 7 days.
use crate::{
multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart},
multipart::{MultiPartStore, PartId},
path::Path,
signer::Signer,
GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, PutOptions, PutResult,
Expand Down
70 changes: 32 additions & 38 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Utilities for performing tokio-style buffered IO
use crate::path::Path;
use crate::{MultipartId, ObjectMeta, ObjectStore};
use crate::{ChunkedUpload, ObjectMeta, ObjectStore};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
Expand Down Expand Up @@ -213,30 +213,26 @@ impl AsyncBufRead for BufReader {
pub struct BufWriter {
capacity: usize,
state: BufWriterState,
multipart_id: Option<MultipartId>,
store: Arc<dyn ObjectStore>,
}

impl std::fmt::Debug for BufWriter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BufWriter")
.field("capacity", &self.capacity)
.field("multipart_id", &self.multipart_id)
.finish()
}
}

type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);

enum BufWriterState {
/// Buffer up to capacity bytes
Buffer(Path, Vec<u8>),
/// [`ObjectStore::put_multipart`]
Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
Prepare(BoxFuture<'static, std::io::Result<ChunkedUpload>>),
/// Write to a multipart upload
Write(Box<dyn AsyncWrite + Send + Unpin>),
Write(Option<ChunkedUpload>),
/// [`ObjectStore::put`]
Put(BoxFuture<'static, std::io::Result<()>>),
Flush(BoxFuture<'static, std::io::Result<()>>),
}

impl BufWriter {
Expand All @@ -251,14 +247,8 @@ impl BufWriter {
capacity,
store,
state: BufWriterState::Buffer(path, Vec::new()),
multipart_id: None,
}
}

/// Returns the [`MultipartId`] if multipart upload
pub fn multipart_id(&self) -> Option<&MultipartId> {
self.multipart_id.as_ref()
}
}

impl AsyncWrite for BufWriter {
Expand All @@ -270,12 +260,15 @@ impl AsyncWrite for BufWriter {
let cap = self.capacity;
loop {
return match &mut self.state {
BufWriterState::Write(write) => Pin::new(write).poll_write(cx, buf),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Write(Some(write)) => {
write.write(buf);
Poll::Ready(Ok(buf.len()))
}
BufWriterState::Write(None) | BufWriterState::Flush(_) => {
panic!("Already shut down")
}
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
BufWriterState::Buffer(path, b) => {
Expand All @@ -284,9 +277,10 @@ impl AsyncWrite for BufWriter {
let path = std::mem::take(path);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let (id, mut writer) = store.put_multipart(&path).await?;
writer.write_all(&buffer).await?;
Ok((id, writer))
let upload = store.upload(&path).await?;
let mut chunked = ChunkedUpload::new(upload);
chunked.write(&buffer);
Ok(chunked)
}));
continue;
}
Expand All @@ -300,13 +294,10 @@ impl AsyncWrite for BufWriter {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
loop {
return match &mut self.state {
BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
BufWriterState::Put(_) => panic!("Already shut down"),
BufWriterState::Write(_) | BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
BufWriterState::Flush(_) => panic!("Already shut down"),
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
};
Expand All @@ -317,21 +308,28 @@ impl AsyncWrite for BufWriter {
loop {
match &mut self.state {
BufWriterState::Prepare(f) => {
let (id, w) = ready!(f.poll_unpin(cx)?);
self.state = BufWriterState::Write(w);
self.multipart_id = Some(id);
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
}
BufWriterState::Buffer(p, b) => {
let buf = std::mem::take(b);
let path = std::mem::take(p);
let store = Arc::clone(&self.store);
self.state = BufWriterState::Put(Box::pin(async move {
self.state = BufWriterState::Flush(Box::pin(async move {
store.put(&path, buf.into()).await?;
Ok(())
}));
}
BufWriterState::Put(f) => return f.poll_unpin(cx),
BufWriterState::Write(w) => return Pin::new(w).poll_shutdown(cx),
BufWriterState::Flush(f) => return f.poll_unpin(cx),
BufWriterState::Write(x) => {
let upload = x.take().unwrap();
self.state = BufWriterState::Flush(
async move {
upload.finish().await?;
Ok(())
}
.boxed(),
)
}
}
}
}
Expand Down Expand Up @@ -443,19 +441,15 @@ mod tests {
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 5]).await.unwrap();
assert!(writer.multipart_id().is_none());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_none());
assert_eq!(store.head(&path).await.unwrap().size, 25);

// Test multipart
let mut writer = BufWriter::with_capacity(Arc::clone(&store), path.clone(), 30);
writer.write_all(&[0; 20]).await.unwrap();
writer.flush().await.unwrap();
writer.write_all(&[0; 20]).await.unwrap();
assert!(writer.multipart_id().is_some());
writer.shutdown().await.unwrap();
assert!(writer.multipart_id().is_some());

assert_eq!(store.head(&path).await.unwrap().size, 40);
}
Expand Down
16 changes: 4 additions & 12 deletions object_store/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
use futures::stream::BoxStream;
use futures::StreamExt;
use tokio::io::AsyncWrite;

use crate::path::Path;
use crate::Result;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore, PutOptions,
PutResult,
PutResult, Upload,
};
use crate::{MultipartId, Result};

/// Wraps a [`ObjectStore`] and makes its get response return chunks
/// in a controllable manner.
Expand Down Expand Up @@ -67,15 +66,8 @@ impl ObjectStore for ChunkedStore {
self.inner.put_opts(location, bytes, opts).await
}

async fn put_multipart(
&self,
location: &Path,
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
self.inner.put_multipart(location).await
}

async fn abort_multipart(&self, location: &Path, multipart_id: &MultipartId) -> Result<()> {
self.inner.abort_multipart(location, multipart_id).await
async fn upload(&self, location: &Path) -> Result<Box<dyn Upload>> {
self.inner.upload(location).await
}

async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
Expand Down
3 changes: 3 additions & 0 deletions object_store/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ pub mod header;
#[cfg(any(feature = "aws", feature = "gcp"))]
pub mod s3;

#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub mod parts;

use async_trait::async_trait;
use std::collections::HashMap;
use std::str::FromStr;
Expand Down
48 changes: 48 additions & 0 deletions object_store/src/client/parts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use parking_lot::Mutex;
use crate::multipart::PartId;

/// An interior mutable collection of upload parts and their corresponding part index
#[derive(Debug, Default)]
pub(crate) struct Parts(Mutex<Vec<(usize, PartId)>>);

impl Parts {
/// Record the [`PartId`] for a given index
///
/// Note: calling this method multiple times with the same `part_idx`
/// will result in multiple [`PartId`] in the final output
pub(crate) fn put(&self, part_idx: usize, id: PartId) {
self.0.lock().push((part_idx, id))
}

/// Produce the final list of [`PartId`] ordered by `part_idx`
///
/// `expected` is the number of parts expected in the final result
pub(crate) fn finish(&self, expected: usize) -> crate::Result<Vec<PartId>> {
let mut parts = self.0.lock();
if parts.len() != expected {
return Err(crate::Error::Generic {
store: "Parts",
source: "Missing part".to_string().into(),
});
}
parts.sort_unstable_by_key(|(idx, _)| *idx);
Ok(parts.drain(..).map(|(_, v)| v).collect())
}
}
Loading

0 comments on commit 521e918

Please sign in to comment.