Skip to content

Commit

Permalink
feat: improve write perfromance of DeltaFileSystemHandler (#943)
Browse files Browse the repository at this point in the history
# Description

This PR builds in top of the changes to handling the runtime in #933. In
my local tests this fixed #915. Additionally, I added the runtime as a
property on the fs handler to avoid re-creating it on every call. In
some non-representative tests with a large number of very small
partitions it cut the runtime in about half.

cc @wjones127 

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Nov 17, 2022
1 parent a8f2ff2 commit 7ef3663
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 68 deletions.
143 changes: 87 additions & 56 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::utils::{delete_dir, wait_for_future, walk_tree};
use crate::utils::{delete_dir, rt, walk_tree};
use crate::PyDeltaTableError;

use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
Expand All @@ -10,11 +10,13 @@ use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;

#[pyclass(subclass)]
#[derive(Debug, Clone)]
pub struct DeltaFileSystemHandler {
pub(crate) inner: Arc<DynObjectStore>,
pub(crate) rt: Arc<Runtime>,
}

#[pymethods]
Expand All @@ -26,7 +28,10 @@ impl DeltaFileSystemHandler {
.with_storage_options(options.unwrap_or_default())
.build_storage()
.map_err(PyDeltaTableError::from_raw)?;
Ok(Self { inner: storage })
Ok(Self {
inner: storage,
rt: Arc::new(rt()?),
})
}

fn get_type_name(&self) -> String {
Expand All @@ -39,10 +44,11 @@ impl DeltaFileSystemHandler {
Ok(format!("{}{}", path, suffix))
}

fn copy_file(&self, src: String, dest: String, py: Python) -> PyResult<()> {
fn copy_file(&self, src: String, dest: String) -> PyResult<()> {
let from_path = Path::from(src);
let to_path = Path::from(dest);
wait_for_future(py, self.inner.copy(&from_path, &to_path))
self.rt
.block_on(self.inner.copy(&from_path, &to_path))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(())
}
Expand All @@ -52,16 +58,18 @@ impl DeltaFileSystemHandler {
Ok(())
}

fn delete_dir(&self, path: String, py: Python) -> PyResult<()> {
fn delete_dir(&self, path: String) -> PyResult<()> {
let path = Path::from(path);
wait_for_future(py, delete_dir(self.inner.as_ref(), &path))
self.rt
.block_on(delete_dir(self.inner.as_ref(), &path))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(())
}

fn delete_file(&self, path: String, py: Python) -> PyResult<()> {
fn delete_file(&self, path: String) -> PyResult<()> {
let path = Path::from(path);
wait_for_future(py, self.inner.delete(&path))
self.rt
.block_on(self.inner.delete(&path))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(())
}
Expand All @@ -81,12 +89,14 @@ impl DeltaFileSystemHandler {
let mut infos = Vec::new();
for file_path in paths {
let path = Path::from(file_path);
let listed = wait_for_future(py, self.inner.list_with_delimiter(Some(&path)))
let listed = self
.rt
.block_on(self.inner.list_with_delimiter(Some(&path)))
.map_err(PyDeltaTableError::from_object_store)?;

// TODO is there a better way to figure out if we are in a directory?
if listed.objects.is_empty() && listed.common_prefixes.is_empty() {
let maybe_meta = wait_for_future(py, self.inner.head(&path));
let maybe_meta = self.rt.block_on(self.inner.head(&path));
match maybe_meta {
Ok(meta) => {
let kwargs = HashMap::from([
Expand Down Expand Up @@ -138,22 +148,24 @@ impl DeltaFileSystemHandler {
};

let path = Path::from(base_dir);
let list_result =
match wait_for_future(py, walk_tree(self.inner.clone(), &path, recursive)) {
Ok(res) => Ok(res),
Err(ObjectStoreError::NotFound { path, source }) => {
if allow_not_found {
Ok(ListResult {
common_prefixes: vec![],
objects: vec![],
})
} else {
Err(ObjectStoreError::NotFound { path, source })
}
let list_result = match self
.rt
.block_on(walk_tree(self.inner.clone(), &path, recursive))
{
Ok(res) => Ok(res),
Err(ObjectStoreError::NotFound { path, source }) => {
if allow_not_found {
Ok(ListResult {
common_prefixes: vec![],
objects: vec![],
})
} else {
Err(ObjectStoreError::NotFound { path, source })
}
Err(err) => Err(err),
}
.map_err(PyDeltaTableError::from_object_store)?;
Err(err) => Err(err),
}
.map_err(PyDeltaTableError::from_object_store)?;

let mut infos = vec![];
infos.extend(
Expand Down Expand Up @@ -190,18 +202,25 @@ impl DeltaFileSystemHandler {
Ok(infos)
}

fn move_file(&self, src: String, dest: String, py: Python) -> PyResult<()> {
fn move_file(&self, src: String, dest: String) -> PyResult<()> {
let from_path = Path::from(src);
let to_path = Path::from(dest);
// TODO check the if not exists semantics
wait_for_future(py, self.inner.rename(&from_path, &to_path))
self.rt
.block_on(self.inner.rename(&from_path, &to_path))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(())
}

fn open_input_file(&self, path: String, py: Python) -> PyResult<ObjectInputFile> {
fn open_input_file(&self, path: String) -> PyResult<ObjectInputFile> {
let path = Path::from(path);
let file = wait_for_future(py, ObjectInputFile::try_new(self.inner.clone(), path))
let file = self
.rt
.block_on(ObjectInputFile::try_new(
self.rt.clone(),
self.inner.clone(),
path,
))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(file)
}
Expand All @@ -211,10 +230,15 @@ impl DeltaFileSystemHandler {
&self,
path: String,
#[allow(unused)] metadata: Option<HashMap<String, String>>,
py: Python,
) -> PyResult<ObjectOutputStream> {
let path = Path::from(path);
let file = wait_for_future(py, ObjectOutputStream::try_new(self.inner.clone(), path))
let file = self
.rt
.block_on(ObjectOutputStream::try_new(
self.rt.clone(),
self.inner.clone(),
path,
))
.map_err(PyDeltaTableError::from_object_store)?;
Ok(file)
}
Expand All @@ -226,6 +250,7 @@ impl DeltaFileSystemHandler {
#[derive(Debug, Clone)]
pub struct ObjectInputFile {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
path: Path,
content_length: i64,
#[pyo3(get)]
Expand All @@ -236,7 +261,11 @@ pub struct ObjectInputFile {
}

impl ObjectInputFile {
pub async fn try_new(store: Arc<DynObjectStore>, path: Path) -> Result<Self, ObjectStoreError> {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
) -> Result<Self, ObjectStoreError> {
// Issue a HEAD Object to get the content-length and ensure any
// errors (e.g. file not found) don't wait until the first read() call.
let meta = store.head(&path).await?;
Expand All @@ -245,6 +274,7 @@ impl ObjectInputFile {
// https://github.com/apache/arrow/blob/f184255cbb9bf911ea2a04910f711e1a924b12b8/cpp/src/arrow/filesystem/s3fs.cc#L1083
Ok(Self {
store,
rt,
path,
content_length,
closed: false,
Expand Down Expand Up @@ -338,7 +368,7 @@ impl ObjectInputFile {
}

#[args(nbytes = "None")]
fn read<'py>(&mut self, nbytes: Option<i64>, py: Python<'py>) -> PyResult<&'py PyBytes> {
fn read(&mut self, nbytes: Option<i64>) -> PyResult<Py<PyBytes>> {
self.check_closed()?;
let range = match nbytes {
Some(len) => {
Expand All @@ -356,13 +386,14 @@ impl ObjectInputFile {
let nbytes = (range.end - range.start) as i64;
self.pos += nbytes;
let obj = if nbytes > 0 {
wait_for_future(py, self.store.get_range(&self.path, range))
self.rt
.block_on(self.store.get_range(&self.path, range))
.map_err(PyDeltaTableError::from_object_store)?
.to_vec()
} else {
Vec::new()
};
Ok(PyBytes::new(py, &obj))
Python::with_gil(|py| Ok(PyBytes::new(py, &obj).into_py(py)))
}

fn fileno(&self) -> PyResult<()> {
Expand All @@ -389,6 +420,7 @@ impl ObjectInputFile {
#[pyclass(weakref)]
pub struct ObjectOutputStream {
store: Arc<DynObjectStore>,
rt: Arc<Runtime>,
path: Path,
writer: Box<dyn AsyncWrite + Send + Unpin>,
multipart_id: MultipartId,
Expand All @@ -400,10 +432,15 @@ pub struct ObjectOutputStream {
}

impl ObjectOutputStream {
pub async fn try_new(store: Arc<DynObjectStore>, path: Path) -> Result<Self, ObjectStoreError> {
pub async fn try_new(
rt: Arc<Runtime>,
store: Arc<DynObjectStore>,
path: Path,
) -> Result<Self, ObjectStoreError> {
let (multipart_id, writer) = store.put_multipart(&path).await?;
Ok(Self {
store,
rt,
path,
writer,
multipart_id,
Expand All @@ -424,16 +461,14 @@ impl ObjectOutputStream {

#[pymethods]
impl ObjectOutputStream {
fn close(&mut self, py: Python) -> PyResult<()> {
fn close(&mut self) -> PyResult<()> {
self.closed = true;
match wait_for_future(py, self.writer.shutdown()) {
match self.rt.block_on(self.writer.shutdown()) {
Ok(_) => Ok(()),
Err(err) => {
wait_for_future(
py,
self.store.abort_multipart(&self.path, &self.multipart_id),
)
.map_err(PyDeltaTableError::from_object_store)?;
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
}
}
Expand Down Expand Up @@ -477,31 +512,27 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'read' not implemented"))
}

fn write(&mut self, data: Vec<u8>, py: Python) -> PyResult<i64> {
fn write(&mut self, data: Vec<u8>) -> PyResult<i64> {
self.check_closed()?;
let len = data.len() as i64;
match wait_for_future(py, self.writer.write_all(&data)) {
match self.rt.block_on(self.writer.write_all(&data)) {
Ok(_) => Ok(len),
Err(err) => {
wait_for_future(
py,
self.store.abort_multipart(&self.path, &self.multipart_id),
)
.map_err(PyDeltaTableError::from_object_store)?;
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
}
}
}

fn flush(&mut self, py: Python) -> PyResult<()> {
match wait_for_future(py, self.writer.flush()) {
fn flush(&mut self) -> PyResult<()> {
match self.rt.block_on(self.writer.flush()) {
Ok(_) => Ok(()),
Err(err) => {
wait_for_future(
py,
self.store.abort_multipart(&self.path, &self.multipart_id),
)
.map_err(PyDeltaTableError::from_object_store)?;
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
}
}
Expand Down
8 changes: 5 additions & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use pyo3::types::PyType;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;

Expand Down Expand Up @@ -394,10 +395,11 @@ impl RawDeltaTable {
Ok(())
}

pub fn get_py_storage_backend(&self) -> filesystem::DeltaFileSystemHandler {
filesystem::DeltaFileSystemHandler {
pub fn get_py_storage_backend(&self) -> PyResult<filesystem::DeltaFileSystemHandler> {
Ok(filesystem::DeltaFileSystemHandler {
inner: self._table.object_store(),
}
rt: Arc::new(rt()?),
})
}
}

Expand Down
13 changes: 4 additions & 9 deletions python/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use std::future::Future;
use std::sync::Arc;

use deltalake::storage::{ListResult, ObjectStore, ObjectStoreError, ObjectStoreResult, Path};
use futures::future::{join_all, BoxFuture, FutureExt};
use futures::StreamExt;
use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use tokio::runtime::Runtime;

/// Utility to collect rust futures with GIL released
pub fn wait_for_future<F: Future>(py: Python, f: F) -> F::Output
where
F: Send,
F::Output: Send,
{
let rt = Runtime::new().unwrap();
py.allow_threads(|| rt.block_on(f))
#[inline]
pub fn rt() -> PyResult<tokio::runtime::Runtime> {
Runtime::new().map_err(|_| PyRuntimeError::new_err("Couldn't start a new tokio runtime."))
}

/// walk the "directory" tree along common prefixes in object store
Expand Down

0 comments on commit 7ef3663

Please sign in to comment.