Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(object_store,aws,gcp): multipart upload enforce size limit of 5 MiB not 5MB #3234

Merged
merged 4 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions object_store/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Setup environment

```
export TEST_INTEGRATION=1
export AWS_DEFAULT_REGION=us-east-1
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
export OBJECT_STORE_AWS_DEFAULT_REGION=us-east-1
export OBJECT_STORE_AWS_ACCESS_KEY_ID=test
export OBJECT_STORE_AWS_SECRET_ACCESS_KEY=test
export AWS_ENDPOINT=http://128.0.0.1:4566
export OBJECT_STORE_BUCKET=test-bucket
```
Expand Down
3 changes: 2 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ mod tests {
assert_eq!(bytes_expected, bytes_written);

// Can overwrite some storage
let data = get_vec_of_bytes(5_000, 5);
// Sizes carefully chosen to exactly hit min limit of 5 MiB
let data = get_vec_of_bytes(242_880, 22);
let bytes_expected = data.concat();
let (_, mut writer) = storage.put_multipart(&location).await.unwrap();
for chunk in &data {
Expand Down
15 changes: 10 additions & 5 deletions object_store/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ where
current_buffer: Vec::new(),
// TODO: Should self vary by provider?
// TODO: Should we automatically increase then when part index gets large?
min_part_size: 5_000_000,

// Minimum size of 5 MiB
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
// https://cloud.google.com/storage/quotas#requests
min_part_size: 5_242_880,
current_part_idx: 0,
completion_task: None,
}
Expand Down Expand Up @@ -113,13 +117,14 @@ where
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, io::Error>> {
) -> Poll<Result<usize, io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

// If adding buf to pending buffer would trigger send, check
// whether we have capacity for another task.
let enough_to_send = (buf.len() + self.current_buffer.len()) > self.min_part_size;
let enough_to_send =
(buf.len() + self.current_buffer.len()) >= self.min_part_size;
if enough_to_send && self.tasks.len() < self.max_concurrency {
// If we do, copy into the buffer and submit the task, and return ready.
self.current_buffer.extend_from_slice(buf);
Expand Down Expand Up @@ -149,7 +154,7 @@ where
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
) -> Poll<Result<(), io::Error>> {
// Poll current tasks
self.as_mut().poll_tasks(cx)?;

Expand Down Expand Up @@ -177,7 +182,7 @@ where
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
) -> Poll<Result<(), io::Error>> {
// First, poll flush
match self.as_mut().poll_flush(cx) {
Poll::Pending => return Poll::Pending,
Expand Down