Skip to content

Commit

Permalink
chore: fix final python errors, cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
abhiaagarwal committed Jun 9, 2024
1 parent 639b3dd commit af5997c
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
5 changes: 1 addition & 4 deletions crates/aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,7 @@ impl ObjectStore for DelayedObjectStore {
self.inner.rename_if_not_exists(from, to).await
}

async fn put_multipart(
&self,
location: &Path,
) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
async fn put_multipart(&self, location: &Path) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
self.inner.put_multipart(location).await
}

Expand Down
5 changes: 4 additions & 1 deletion crates/azure/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T

let expected = Bytes::from_static(b"test world from delta-rs on friday");

delta_store.put(path, expected.clone().into()).await.unwrap();
delta_store
.put(path, expected.clone().into())
.await
.unwrap();
let fetched = delta_store.get(path).await.unwrap().bytes().await.unwrap();
assert_eq!(expected, fetched);

Expand Down
44 changes: 19 additions & 25 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};
use crate::RawDeltaTable;
use deltalake::storage::object_store::MultipartUpload;
use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut};
use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
Expand All @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;

const DEFAULT_MAX_BUFFER_SIZE: i64 = 4 * 1024 * 1024;
const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FsConfig {
Expand Down Expand Up @@ -292,7 +292,7 @@ impl DeltaFileSystemHandler {
.options
.get("max_buffer_size")
.map_or(DEFAULT_MAX_BUFFER_SIZE, |v| {
v.parse::<i64>().unwrap_or(DEFAULT_MAX_BUFFER_SIZE)
v.parse::<usize>().unwrap_or(DEFAULT_MAX_BUFFER_SIZE)
});
let file = rt()
.block_on(ObjectOutputStream::try_new(
Expand Down Expand Up @@ -489,7 +489,6 @@ impl ObjectInputFile {
}

// TODO the C++ implementation track an internal lock on all random access files, DO we need this here?
// TODO add buffer to store data ...
#[pyclass(weakref, module = "deltalake._internal")]
pub struct ObjectOutputStream {
upload: Box<dyn MultipartUpload>,
Expand All @@ -498,24 +497,24 @@ pub struct ObjectOutputStream {
closed: bool,
#[pyo3(get)]
mode: String,
max_buffer_size: i64,
buffer_size: i64,
max_buffer_size: usize,
buffer: PutPayloadMut,
}

impl ObjectOutputStream {
pub async fn try_new(
store: Arc<DynObjectStore>,
path: Path,
max_buffer_size: i64,
max_buffer_size: usize,
) -> Result<Self, ObjectStoreError> {
let upload = store.put_multipart(&path).await?;
Ok(Self {
upload,
pos: 0,
closed: false,
mode: "wb".into(),
buffer: PutPayloadMut::default(),
max_buffer_size,
buffer_size: 0,
})
}

Expand All @@ -532,7 +531,8 @@ impl ObjectOutputStream {
impl ObjectOutputStream {
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
self.closed = true;
py.allow_threads(|| match rt().block_on(self.upload.abort()) {
self.flush(py)?;
py.allow_threads(|| match rt().block_on(self.upload.complete()) {
Ok(_) => Ok(()),
Err(err) => Err(PyIOError::new_err(err.to_string())),
})
Expand Down Expand Up @@ -581,28 +581,22 @@ impl ObjectOutputStream {
fn write(&mut self, data: &PyBytes) -> PyResult<i64> {
self.check_closed()?;
let py = data.py();
let bytes = data.as_bytes().to_owned();
let len = bytes.len() as i64;
let res = py.allow_threads(|| match rt().block_on(self.upload.put_part(bytes.into())) {
Ok(_) => Ok(len),
Err(err) => {
rt().block_on(self.upload.abort())
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
})?;
self.buffer_size += len;
if self.buffer_size >= self.max_buffer_size {
let _ = self.flush(py);
self.buffer_size = 0;
let bytes = data.as_bytes();
self.buffer.extend_from_slice(bytes);
let len = bytes.len();
if self.max_buffer_size >= self.buffer.content_length() {
self.flush(py)?;
}
Ok(res)
Ok(len as i64)
}

fn flush(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| match rt().block_on(self.upload.abort()) {
let payload = std::mem::take(&mut self.buffer).freeze();
py.allow_threads(|| match rt().block_on(self.upload.put_part(payload)) {
Ok(_) => Ok(()),
Err(err) => {
rt().block_on(self.upload.abort())
.map_err(|err| PythonError::from(err))?;
Err(PyIOError::new_err(err.to_string()))
}
})
Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def test_update_wrong_types_cast(tmp_path: pathlib.Path, sample_table: pa.Table)

assert (
str(excinfo.value)
== "Cast error: Cannot cast value 'hello_world' to value of Boolean type"
== "Generic DeltaTable error: Error during planning: Failed to coerce then ([Utf8]) and else (Some(Boolean)) to common types in CASE WHEN expression"
)


Expand Down

0 comments on commit af5997c

Please sign in to comment.