From af5997cf7dd8cce97b07b5c8c78e23cb7092549f Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 9 Jun 2024 15:28:42 -0400 Subject: [PATCH] chore: fix final python errors, cargo fmt --- crates/aws/tests/repair_s3_rename_test.rs | 5 +-- crates/azure/tests/integration.rs | 5 ++- python/src/filesystem.rs | 44 ++++++++++------------- python/tests/test_update.py | 2 +- 4 files changed, 25 insertions(+), 31 deletions(-) diff --git a/crates/aws/tests/repair_s3_rename_test.rs b/crates/aws/tests/repair_s3_rename_test.rs index b7e11a22df..d9e19de7b7 100644 --- a/crates/aws/tests/repair_s3_rename_test.rs +++ b/crates/aws/tests/repair_s3_rename_test.rs @@ -228,10 +228,7 @@ impl ObjectStore for DelayedObjectStore { self.inner.rename_if_not_exists(from, to).await } - async fn put_multipart( - &self, - location: &Path, - ) -> ObjectStoreResult> { + async fn put_multipart(&self, location: &Path) -> ObjectStoreResult> { self.inner.put_multipart(location).await } diff --git a/crates/azure/tests/integration.rs b/crates/azure/tests/integration.rs index 732c5aaa8e..3ffaa00cc5 100644 --- a/crates/azure/tests/integration.rs +++ b/crates/azure/tests/integration.rs @@ -75,7 +75,10 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T let expected = Bytes::from_static(b"test world from delta-rs on friday"); - delta_store.put(path, expected.clone().into()).await.unwrap(); + delta_store + .put(path, expected.clone().into()) + .await + .unwrap(); let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap(); assert_eq!(expected, fetched); diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index 1fa07b0165..bdf3596d78 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -1,7 +1,7 @@ use crate::error::PythonError; use crate::utils::{delete_dir, rt, walk_tree}; use crate::RawDeltaTable; -use deltalake::storage::object_store::MultipartUpload; +use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut}; use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path}; use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; -const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024; +const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024; #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct FsConfig { @@ -292,7 +292,7 @@ impl DeltaFileSystemHandler { .options .get("max_buffer_size") .map_or(DEFAULT_MAX_BUFFER_SIZE, |v| { - v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) + v.parse::().unwrap_or(DEFAULT_MAX_BUFFER_SIZE) }); let file = rt() .block_on(ObjectOutputStream::try_new( @@ -489,7 +489,6 @@ impl ObjectInputFile { } // TODO the C++ implementation track an internal lock on all random access files, DO we need this here? -// TODO add buffer to store data ... #[pyclass(weakref, module = "deltalake._internal")] pub struct ObjectOutputStream { upload: Box, @@ -498,15 +497,15 @@ pub struct ObjectOutputStream { closed: bool, #[pyo3(get)] mode: String, - max_buffer_size: i64, - buffer_size: i64, + max_buffer_size: usize, + buffer: PutPayloadMut, } impl ObjectOutputStream { pub async fn try_new( store: Arc, path: Path, - max_buffer_size: i64, + max_buffer_size: usize, ) -> Result { let upload = store.put_multipart(&path).await?; Ok(Self { @@ -514,8 +513,8 @@ impl ObjectOutputStream { pos: 0, closed: false, mode: "wb".into(), + buffer: PutPayloadMut::default(), max_buffer_size, - buffer_size: 0, }) } @@ -532,7 +531,8 @@ impl ObjectOutputStream { impl ObjectOutputStream { fn close(&mut self, py: Python<'_>) -> PyResult<()> { self.closed = true; - py.allow_threads(|| match rt().block_on(self.upload.abort()) { + self.flush(py)?; + py.allow_threads(|| match rt().block_on(self.upload.complete()) { Ok(_) => Ok(()), Err(err) => Err(PyIOError::new_err(err.to_string())), }) @@ -581,28 +581,22 @@ impl ObjectOutputStream { fn write(&mut self, data: &PyBytes) -> PyResult { self.check_closed()?; let py = data.py(); - let bytes = data.as_bytes().to_owned(); - let len = bytes.len() as i64; - let res = py.allow_threads(|| match rt().block_on(self.upload.put_part(bytes.into())) { - Ok(_) => Ok(len), - Err(err) => { - rt().block_on(self.upload.abort()) - .map_err(PythonError::from)?; - Err(PyIOError::new_err(err.to_string())) - } - })?; - self.buffer_size += len; - if self.buffer_size >= self.max_buffer_size { - let _ = self.flush(py); - self.buffer_size = 0; + let bytes = data.as_bytes(); + self.buffer.extend_from_slice(bytes); + let len = bytes.len(); + if self.max_buffer_size >= self.buffer.content_length() { + self.flush(py)?; } - Ok(res) + Ok(len as i64) } fn flush(&mut self, py: Python<'_>) -> PyResult<()> { - py.allow_threads(|| match rt().block_on(self.upload.abort()) { + let payload = std::mem::take(&mut self.buffer).freeze(); + py.allow_threads(|| match rt().block_on(self.upload.put_part(payload)) { Ok(_) => Ok(()), Err(err) => { + rt().block_on(self.upload.abort()) + .map_err(|err| PythonError::from(err))?; Err(PyIOError::new_err(err.to_string())) } }) diff --git a/python/tests/test_update.py b/python/tests/test_update.py index 74ae130224..85e3fe38ec 100644 --- a/python/tests/test_update.py +++ b/python/tests/test_update.py @@ -119,7 +119,7 @@ def test_update_wrong_types_cast(tmp_path: pathlib.Path, sample_table: pa.Table) assert ( str(excinfo.value) - == "Cast error: Cannot cast value 'hello_world' to value of Boolean type" + == "Generic DeltaTable error: Error during planning: Failed to coerce then ([Utf8]) and else (Some(Boolean)) to common types in CASE WHEN expression" )