From 2b2f5e3f175d01dbd82a917ca9f0a97d1ea87a3f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 29 Nov 2022 20:15:27 -0800 Subject: [PATCH 1/4] fix: use better minimum part size --- object_store/CONTRIBUTING.md | 6 +++--- object_store/src/lib.rs | 2 +- object_store/src/multipart.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/object_store/CONTRIBUTING.md b/object_store/CONTRIBUTING.md index e780ec5c9b09..4e6b3afe3859 100644 --- a/object_store/CONTRIBUTING.md +++ b/object_store/CONTRIBUTING.md @@ -46,9 +46,9 @@ Setup environment ``` export TEST_INTEGRATION=1 -export AWS_DEFAULT_REGION=us-east-1 -export AWS_ACCESS_KEY_ID=test -export AWS_SECRET_ACCESS_KEY=test +export OBJECT_STORE_AWS_DEFAULT_REGION=us-east-1 +export OBJECT_STORE_AWS_ACCESS_KEY_ID=test +export OBJECT_STORE_AWS_SECRET_ACCESS_KEY=test export AWS_ENDPOINT=http://128.0.0.1:4566 export OBJECT_STORE_BUCKET=test-bucket ``` diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 6278d827b0c7..560af2fa2aac 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -769,7 +769,7 @@ mod tests { assert_eq!(bytes_expected, bytes_written); // Can overwrite some storage - let data = get_vec_of_bytes(5_000, 5); + let data = get_vec_of_bytes(5_123_000, 5); let bytes_expected = data.concat(); let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); for chunk in &data { diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 102d8bedaa46..01cb524e8708 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -81,7 +81,7 @@ where current_buffer: Vec::new(), // TODO: Should self vary by provider? // TODO: Should we automatically increase then when part index gets large? - min_part_size: 5_000_000, + min_part_size: 6_000_000, current_part_idx: 0, completion_task: None, } From eb4bfca894d13829740497c40ef1958353b09442 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 30 Nov 2022 20:03:47 -0800 Subject: [PATCH 2/4] test: don't make the test larger than necessary --- object_store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 560af2fa2aac..8d434df6481f 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -769,7 +769,7 @@ mod tests { assert_eq!(bytes_expected, bytes_written); // Can overwrite some storage - let data = get_vec_of_bytes(5_123_000, 5); + let data = get_vec_of_bytes(1_123_000, 5); let bytes_expected = data.concat(); let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); for chunk in &data { From d58372ad6849bae115c5f48d12b0650d51e2965c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 1 Dec 2022 13:38:03 +0000 Subject: [PATCH 3/4] Further tweaks --- object_store/src/lib.rs | 3 ++- object_store/src/multipart.rs | 14 +++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 8d434df6481f..a36bb5fb8de4 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -769,7 +769,8 @@ mod tests { assert_eq!(bytes_expected, bytes_written); // Can overwrite some storage - let data = get_vec_of_bytes(1_123_000, 5); + // Sizes carefully chosen to exactly hit min limit of 5 MiB + let data = get_vec_of_bytes(242_880, 22); let bytes_expected = data.concat(); let (_, mut writer) = storage.put_multipart(&location).await.unwrap(); for chunk in &data { diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 01cb524e8708..1027699b20af 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -81,7 +81,11 @@ where current_buffer: Vec::new(), // TODO: Should self vary by provider? // TODO: Should we automatically increase then when part index gets large? - min_part_size: 6_000_000, + + // Minimum size of 5 MiB + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + // https://cloud.google.com/storage/quotas#requests + min_part_size: 5_242_880, current_part_idx: 0, completion_task: None, } @@ -113,13 +117,13 @@ where mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8], - ) -> std::task::Poll> { + ) -> Poll> { // Poll current tasks self.as_mut().poll_tasks(cx)?; // If adding buf to pending buffer would trigger send, check // whether we have capacity for another task. - let enough_to_send = (buf.len() + self.current_buffer.len()) > self.min_part_size; + let enough_to_send = (buf.len() + self.current_buffer.len()) >= self.min_part_size; if enough_to_send && self.tasks.len() < self.max_concurrency { // If we do, copy into the buffer and submit the task, and return ready. self.current_buffer.extend_from_slice(buf); @@ -149,7 +153,7 @@ where fn poll_flush( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { // Poll current tasks self.as_mut().poll_tasks(cx)?; @@ -177,7 +181,7 @@ where fn poll_shutdown( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { // First, poll flush match self.as_mut().poll_flush(cx) { Poll::Pending => return Poll::Pending, From 3302216c21b831938c48c6555e1a0411f420c57b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 1 Dec 2022 13:44:55 +0000 Subject: [PATCH 4/4] Format --- object_store/src/multipart.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/object_store/src/multipart.rs b/object_store/src/multipart.rs index 1027699b20af..de8591462500 100644 --- a/object_store/src/multipart.rs +++ b/object_store/src/multipart.rs @@ -123,7 +123,8 @@ where // If adding buf to pending buffer would trigger send, check // whether we have capacity for another task. - let enough_to_send = (buf.len() + self.current_buffer.len()) >= self.min_part_size; + let enough_to_send = + (buf.len() + self.current_buffer.len()) >= self.min_part_size; if enough_to_send && self.tasks.len() < self.max_concurrency { // If we do, copy into the buffer and submit the task, and return ready. self.current_buffer.extend_from_slice(buf);