Skip to content

Commit

Permalink
Add implementation for load_with_datetime in Python package.
Browse files Browse the repository at this point in the history
  • Loading branch information
zijie0 committed Aug 24, 2021
1 parent e7e549d commit 07216e1
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ env_logger = "0"
reqwest = { version = "*", features = ["native-tls-vendored"] }
serde_json = "1"
arrow = { version = "5" }
chrono = "0"

[dependencies.pyo3]
version = "0.14"
Expand Down
8 changes: 8 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ def load_version(self, version: int) -> None:
"""
self._table.load_version(version)

def load_with_datetime(self, ds: str) -> None:
"""
Time travel Delta table to latest version that's created at or before provided `ds` argument.
:param ds: the identifier of the datetime point of the DeltaTable to load
"""
self._table.load_with_datetime(ds)

def schema(self) -> Schema:
"""
Get the current schema of the DeltaTable.
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ extern crate arrow;
extern crate pyo3;

use arrow::datatypes::Schema as ArrowSchema;
use chrono::{DateTime, FixedOffset, Utc};
use deltalake::partitions::PartitionFilter;
use pyo3::create_exception;
use pyo3::exceptions::PyException;
Expand All @@ -25,6 +26,10 @@ impl PyDeltaTableError {
fn from_tokio(err: tokio::io::Error) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}

fn from_chrono(err: chrono::ParseError) -> pyo3::PyErr {
PyDeltaTableError::new_err(err.to_string())
}
}

#[inline]
Expand Down Expand Up @@ -100,6 +105,16 @@ impl RawDeltaTable {
.map_err(PyDeltaTableError::from_raw)
}

pub fn load_with_datetime(&mut self, ds: &str) -> PyResult<()> {
let datetime = DateTime::<Utc>::from(
DateTime::<FixedOffset>::parse_from_rfc3339(ds)
.map_err(PyDeltaTableError::from_chrono)?,
);
rt()?
.block_on(self._table.load_with_datetime(datetime))
.map_err(PyDeltaTableError::from_raw)
}

pub fn files_by_partitions(
&self,
partitions_filters: Vec<(&str, &str, PartitionFilterValue)>,
Expand Down
11 changes: 11 additions & 0 deletions python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ def test_read_simple_table_by_version_to_dict():
assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]}


def test_load_with_datetime():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path)
dt.load_with_datetime("2020-05-01T00:47:31-07:00")
assert dt.version() == 0
dt.load_with_datetime("2020-05-02T22:47:31-07:00")
assert dt.version() == 1
dt.load_with_datetime("2020-05-25T22:47:31-07:00")
assert dt.version() == 4


def test_read_simple_table_update_incremental():
table_path = "../rust/tests/data/simple_table"
dt = DeltaTable(table_path, version=0)
Expand Down

0 comments on commit 07216e1

Please sign in to comment.