Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Sep 19, 2023
1 parent a21574f commit 17d918d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 34 deletions.
2 changes: 1 addition & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class ObjectOutputStream:
class DeltaFileSystemHandler:
"""Implementation of pyarrow.fs.FileSystemHandler for use with pyarrow.fs.PyFileSystem"""

def __init__(self, root: str, options: dict[str, str] | None = None) -> None: ...
def __init__(self, root: str, options: dict[str, str] | None = None, known_sizes: dict[str, int] | None = None) -> None: ...
def get_type_name(self) -> str: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
Expand Down
34 changes: 5 additions & 29 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,24 @@ class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
"""
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

known_sizes: Dict[str, int] = {}

def __new__( # type:ignore
cls,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
return super().__new__(
cls, table_uri=table_uri, options=storage_options # type:ignore
)

def __init__(
self,
table_uri: str,
storage_options: Optional[Dict[str, str]] = None,
known_sizes: Optional[Dict[str, int]] = None,
):
if known_sizes:
self.known_sizes = known_sizes
return

def open_input_file(self, path: str, size: Optional[int] = None) -> pa.PythonFile:

def open_input_file(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
:param source: The source to open for reading.
:return: NativeFile
"""
size = self.known_sizes.get(path)
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size))
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))

def open_input_stream(self, path: str, size: Optional[int] = None) -> pa.PythonFile:
def open_input_stream(self, path: str) -> pa.PythonFile:
"""
Open an input stream for sequential reading.
:param source: The source to open for reading.
:return: NativeFile
"""
size = self.known_sizes.get(path)
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path, size))
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
Expand Down
19 changes: 15 additions & 4 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct DeltaFileSystemHandler {
pub(crate) inner: Arc<DynObjectStore>,
pub(crate) rt: Arc<Runtime>,
pub(crate) config: FsConfig,
pub(crate) known_sizes: Option<HashMap<String, i64>>,
}

impl DeltaFileSystemHandler {
Expand All @@ -41,8 +42,12 @@ impl DeltaFileSystemHandler {
#[pymethods]
impl DeltaFileSystemHandler {
#[new]
#[pyo3(signature = (table_uri, options = None))]
fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> PyResult<Self> {
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
fn new(
table_uri: &str,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
Expand All @@ -54,6 +59,7 @@ impl DeltaFileSystemHandler {
root_url: table_uri.into(),
options: options.unwrap_or_default(),
},
known_sizes: known_sizes,
})
}

Expand Down Expand Up @@ -236,15 +242,20 @@ impl DeltaFileSystemHandler {
Ok(())
}

fn open_input_file(&self, path: String, size: Option<i64>) -> PyResult<ObjectInputFile> {
fn open_input_file(&self, path: String) -> PyResult<ObjectInputFile> {
let size = match &self.known_sizes {
Some(sz) => sz.get(&path),
None => None,
};

let path = Self::parse_path(&path);
let file = self
.rt
.block_on(ObjectInputFile::try_new(
Arc::clone(&self.rt),
self.inner.clone(),
path,
size,
size.copied(),
))
.map_err(PythonError::from)?;
Ok(file)
Expand Down
1 change: 1 addition & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ impl RawDeltaTable {
inner: self._table.object_store(),
rt: Arc::new(rt()?),
config: self._config.clone(),
known_sizes: None
})
}

Expand Down

0 comments on commit 17d918d

Please sign in to comment.