Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move and update Optimize operation #1154

Merged
merged 8 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ doc = false
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "31", features = ["serde"] }
arrow-schema = { version = "32", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand All @@ -33,7 +33,7 @@ tokio = { version = "1", features = ["rt-multi-thread"] }
reqwest = { version = "*", features = ["native-tls-vendored"] }

[dependencies.pyo3]
version = "0.17"
version = "0.18"
features = ["extension-module", "abi3", "abi3-py37"]

[dependencies.deltalake]
Expand Down
2 changes: 1 addition & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def to_pyarrow_dataset(
partition_expression=part_expression,
)
for file, part_expression in self._table.dataset_partitions(
partitions, self.schema().to_pyarrow()
self.schema().to_pyarrow(), partitions
)
]

Expand Down
20 changes: 11 additions & 9 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DeltaFileSystemHandler {
#[pymethods]
impl DeltaFileSystemHandler {
#[new]
#[args(options = "None")]
#[pyo3(signature = (table_uri, options = None))]
fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> PyResult<Self> {
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(options.clone().unwrap_or_default())
Expand Down Expand Up @@ -144,7 +144,7 @@ impl DeltaFileSystemHandler {
Ok(infos)
}

#[args(allow_not_found = "false", recursive = "false")]
#[pyo3(signature = (base_dir, allow_not_found = false, recursive = false))]
fn get_file_info_selector<'py>(
&self,
base_dir: String,
Expand Down Expand Up @@ -237,7 +237,7 @@ impl DeltaFileSystemHandler {
Ok(file)
}

#[args(metadata = "None")]
#[pyo3(signature = (path, metadata = None))]
fn open_output_stream(
&self,
path: String,
Expand Down Expand Up @@ -358,7 +358,7 @@ impl ObjectInputFile {
Ok(self.content_length)
}

#[args(whence = "0")]
#[pyo3(signature = (offset, whence = 0))]
fn seek(&mut self, offset: i64, whence: i64) -> PyResult<i64> {
self.check_closed()?;
self.check_position(offset, "seek")?;
Expand All @@ -384,7 +384,7 @@ impl ObjectInputFile {
Ok(self.pos)
}

#[args(nbytes = "None")]
#[pyo3(signature = (nbytes = None))]
fn read(&mut self, nbytes: Option<i64>) -> PyResult<Py<PyBytes>> {
self.check_closed()?;
let range = match nbytes {
Expand Down Expand Up @@ -516,14 +516,16 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'size' not implemented"))
}

#[args(whence = "0")]
fn seek(&mut self, _offset: i64, _whence: i64) -> PyResult<i64> {
#[allow(unused_variables)]
#[pyo3(signature = (offset, whence = 0))]
fn seek(&mut self, offset: i64, whence: i64) -> PyResult<i64> {
self.check_closed()?;
Err(PyNotImplementedError::new_err("'seek' not implemented"))
}

#[args(nbytes = "None")]
fn read(&mut self, _nbytes: Option<i64>) -> PyResult<()> {
#[allow(unused_variables)]
#[pyo3(signature = (nbytes = None))]
fn read(&mut self, nbytes: Option<i64>) -> PyResult<()> {
self.check_closed()?;
Err(PyNotImplementedError::new_err("'read' not implemented"))
}
Expand Down
7 changes: 5 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ struct RawDeltaTableMetaData {
#[pymethods]
impl RawDeltaTable {
#[new]
#[pyo3(signature = (table_uri, version = None, storage_options = None, without_files = false))]
fn new(
table_uri: &str,
version: Option<deltalake::DeltaDataTypeLong>,
Expand Down Expand Up @@ -282,7 +283,9 @@ impl RawDeltaTable {
schema_to_pyobject(schema, py)
}

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced
/// by the Delta table and are older than the retention threshold.
#[pyo3(signature = (dry_run, retention_hours = None, enforce_retention_duration = true))]
pub fn vacuum(
&mut self,
dry_run: bool,
Expand Down Expand Up @@ -334,8 +337,8 @@ impl RawDeltaTable {
pub fn dataset_partitions<'py>(
&mut self,
py: Python<'py>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
schema: PyArrowType<ArrowSchema>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
) -> PyResult<Vec<(String, Option<&'py PyAny>)>> {
let path_set = match partition_filters {
Some(filters) => Some(HashSet::<_>::from_iter(
Expand Down
6 changes: 3 additions & 3 deletions python/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl TryFrom<SchemaDataType> for ArrayType {
#[pymethods]
impl ArrayType {
#[new]
#[args(contains_null = true)]
#[pyo3(signature = (element_type, contains_null = true))]
fn new(element_type: PyObject, contains_null: bool, py: Python) -> PyResult<Self> {
let inner_type = SchemaTypeArray::new(
Box::new(python_type_to_schema(element_type, py)?),
Expand Down Expand Up @@ -444,7 +444,7 @@ impl TryFrom<SchemaDataType> for MapType {
#[pymethods]
impl MapType {
#[new]
#[args(value_contains_null = true)]
#[pyo3(signature = (key_type, value_type, value_contains_null = true))]
fn new(
key_type: PyObject,
value_type: PyObject,
Expand Down Expand Up @@ -608,7 +608,7 @@ pub struct Field {
#[pymethods]
impl Field {
#[new]
#[args(nullable = true)]
#[pyo3(signature = (name, ty, nullable = true, metadata = None))]
fn new(
name: String,
ty: PyObject,
Expand Down
20 changes: 10 additions & 10 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
edition = "2021"

[dependencies]
arrow = { version = "31", optional = true }
arrow = { version = "32", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = { version = "0.4.22", default-features = false, features = ["clock"] }
Expand All @@ -29,7 +29,10 @@ num-traits = "0.2.15"
object_store = "0.5.3"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet = { version = "31", features = ["async"], optional = true }
parquet = { version = "32", features = [
"async",
"object_store",
], optional = true }
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
Expand All @@ -50,10 +53,10 @@ rusoto_dynamodb = { version = "0.47", default-features = false, optional = true
rusoto_glue = { version = "0.47", default-features = false, optional = true }

# Datafusion
datafusion = { version = "17", optional = true }
datafusion-expr = { version = "17", optional = true }
datafusion-common = { version = "17", optional = true }
datafusion-proto = { version = "17", optional = true }
datafusion = { version = "18", optional = true }
datafusion-expr = { version = "18", optional = true }
datafusion-common = { version = "18", optional = true }
datafusion-proto = { version = "18", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down Expand Up @@ -113,10 +116,7 @@ s3 = [
"object_store/aws_profile",
]
glue-native-tls = ["s3-native-tls", "rusoto_glue"]
glue = [
"s3",
"rusoto_glue/rustls"
]
glue = ["s3", "rusoto_glue/rustls"]
python = ["arrow/pyarrow"]

# used only for integration testing
Expand Down
2 changes: 1 addition & 1 deletion rust/examples/recordbatch-writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ fn fetch_readings() -> Vec<WeatherRecord> {

for i in 1..=5 {
let mut wx = WeatherRecord::default();
wx.temp = wx.temp - i;
wx.temp -= i;
readings.push(wx);
}
readings
Expand Down
2 changes: 0 additions & 2 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ pub mod delta_arrow;
#[cfg(feature = "datafusion")]
pub mod delta_datafusion;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod optimize;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

pub use self::builder::*;
Expand Down
17 changes: 13 additions & 4 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};

pub mod create;
pub mod filesystem_check;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod optimize;
pub mod transaction;
pub mod vacuum;

Expand All @@ -24,15 +26,15 @@ use self::{load::LoadBuilder, write::WriteBuilder};
use arrow::record_batch::RecordBatch;
#[cfg(feature = "datafusion")]
pub use datafusion::physical_plan::common::collect as collect_sendable_stream;
#[cfg(all(feature = "arrow", feature = "parquet"))]
use optimize::OptimizeBuilder;

#[cfg(feature = "datafusion")]
mod load;
#[cfg(feature = "datafusion")]
pub mod write;
// TODO the writer module does not actually depend on datafusion,
// eventually we should consolidate with the record batch writer
#[cfg(feature = "datafusion")]
mod writer;
#[cfg(all(feature = "arrow", feature = "parquet"))]
pub mod writer;

/// Maximum supported writer version
pub const MAX_SUPPORTED_WRITER_VERSION: i32 = 1;
Expand Down Expand Up @@ -123,6 +125,13 @@ impl DeltaOps {
pub fn filesystem_check(self) -> FileSystemCheckBuilder {
FileSystemCheckBuilder::new(self.0.object_store(), self.0.state)
}

/// Audit active files with files present on the filesystem
#[cfg(all(feature = "arrow", feature = "parquet"))]
#[must_use]
pub fn optimize<'a>(self) -> OptimizeBuilder<'a> {
OptimizeBuilder::new(self.0.object_store(), self.0.state)
}
}

impl From<DeltaTable> for DeltaOps {
Expand Down
Loading