Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiPartStore for InMemory #5495

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion object_store/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! An in-memory object store implementation
use crate::multipart::{MultiPartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
path::Path, GetRange, GetResult, GetResultPayload, ListResult, ObjectMeta, ObjectStore,
Expand All @@ -28,8 +29,8 @@ use chrono::{DateTime, Utc};
use futures::{stream::BoxStream, StreamExt};
use parking_lot::RwLock;
use snafu::{OptionExt, ResultExt, Snafu};
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::collections::{BTreeMap, HashMap};
use std::io;
use std::ops::Range;
use std::pin::Pin;
Expand All @@ -52,6 +53,12 @@ enum Error {

#[snafu(display("ETag required for conditional update"))]
MissingETag,

#[snafu(display("MultipartUpload not found: {id}"))]
UploadNotFound { id: String },

#[snafu(display("Missing part at index: {part}"))]
MissingPart { part: usize },
}

impl From<Error> for super::Error {
Expand Down Expand Up @@ -101,6 +108,12 @@ impl Entry {
struct Storage {
next_etag: usize,
map: BTreeMap<Path, Entry>,
uploads: HashMap<usize, PartStorage>,
}

#[derive(Debug, Default, Clone)]
struct PartStorage {
parts: Vec<Option<Bytes>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it worth nothing in a doc comment somewhere that unfinished multi part uploads are never cleaned up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is implicit in the contract of MultipartStore

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}

type SharedStorage = Arc<RwLock<Storage>>;
Expand Down Expand Up @@ -154,6 +167,24 @@ impl Storage {
}
}
}

fn upload_mut(&mut self, id: &MultipartId) -> Result<&mut PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.get_mut(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}

fn remove_upload(&mut self, id: &MultipartId) -> Result<PartStorage> {
let parts = id
.parse()
.ok()
.and_then(|x| self.uploads.remove(&x))
.context(UploadNotFoundSnafu { id })?;
Ok(parts)
}
}

impl std::fmt::Display for InMemory {
Expand Down Expand Up @@ -359,6 +390,64 @@ impl ObjectStore for InMemory {
}
}

#[async_trait]
impl MultiPartStore for InMemory {
async fn create_multipart(&self, _path: &Path) -> Result<MultipartId> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
storage.next_etag += 1;
storage.uploads.insert(etag, Default::default());
Ok(etag.to_string())
}

async fn put_part(
&self,
_path: &Path,
id: &MultipartId,
part_idx: usize,
data: Bytes,
) -> Result<PartId> {
let mut storage = self.storage.write();
let upload = storage.upload_mut(id)?;
if part_idx <= upload.parts.len() {
upload.parts.resize(part_idx + 1, None);
}
upload.parts[part_idx] = Some(data);
Ok(PartId {
content_id: Default::default(),
})
}

async fn complete_multipart(
&self,
path: &Path,
id: &MultipartId,
_parts: Vec<PartId>,
) -> Result<PutResult> {
let mut storage = self.storage.write();
let upload = storage.remove_upload(id)?;

let mut cap = 0;
for (part, x) in upload.parts.iter().enumerate() {
cap += x.as_ref().context(MissingPartSnafu { part })?.len();
}
let mut buf = Vec::with_capacity(cap);
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
let etag = storage.insert(path, buf.into());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
})
}

async fn abort_multipart(&self, _path: &Path, id: &MultipartId) -> Result<()> {
self.storage.write().remove_upload(id)?;
Ok(())
}
}

impl InMemory {
/// Create new in-memory storage.
pub fn new() -> Self {
Expand Down Expand Up @@ -444,6 +533,7 @@ mod tests {
copy_if_not_exists(&integration).await;
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
}

#[tokio::test]
Expand Down
Loading