Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(python): post_commit_hook_properties derive #2824

Merged
merged 2 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union
import pyarrow
import pyarrow.fs as fs

from deltalake.writer import AddAction, WriterProperties
from deltalake.writer import AddAction, PostCommitHookProperties, WriterProperties

__version__: str

Expand Down Expand Up @@ -58,7 +58,7 @@ class RawDeltaTable:
retention_hours: Optional[int],
enforce_retention_duration: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> List[str]: ...
def compact_optimize(
self,
Expand All @@ -68,7 +68,7 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def z_order_optimize(
self,
Expand All @@ -80,26 +80,26 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def add_columns(
self,
fields: List[Field],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def add_constraints(
self,
constraints: Dict[str, str],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def drop_constraints(
self,
name: str,
raise_if_not_exists: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def set_table_properties(
self,
Expand All @@ -126,13 +126,13 @@ class RawDeltaTable:
predicate: Optional[str],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def repair(
self,
dry_run: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def update(
self,
Expand All @@ -141,7 +141,7 @@ class RawDeltaTable:
writer_properties: Optional[WriterProperties],
safe_cast: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def create_merge_builder(
self,
Expand All @@ -151,7 +151,7 @@ class RawDeltaTable:
target_alias: Optional[str],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
safe_cast: bool,
) -> PyMergeBuilder: ...
def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ...
Expand All @@ -166,7 +166,7 @@ class RawDeltaTable:
schema: pyarrow.Schema,
partitions_filters: Optional[FilterType],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def cleanup_metadata(self) -> None: ...
def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ...
Expand Down Expand Up @@ -206,7 +206,7 @@ def write_to_deltalake(
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def convert_to_deltalake(
uri: str,
Expand Down
36 changes: 10 additions & 26 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def vacuum(
retention_hours,
enforce_retention_duration,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)

def update(
Expand Down Expand Up @@ -852,9 +852,7 @@ def update(
writer_properties,
safe_cast=not error_on_type_mismatch,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
return json.loads(metrics)

Expand Down Expand Up @@ -961,9 +959,7 @@ def merge(
safe_cast=not error_on_type_mismatch,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
return TableMerger(py_merge_builder, self._table)

Expand Down Expand Up @@ -1246,10 +1242,7 @@ def delete(
the metrics from delete.
"""
metrics = self._table.delete(
predicate,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
predicate, writer_properties, custom_metadata, post_commithook_properties
)
return json.loads(metrics)

Expand Down Expand Up @@ -1286,9 +1279,7 @@ def repair(
```
"""
metrics = self._table.repair(
dry_run,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
dry_run, custom_metadata, post_commithook_properties
)
return json.loads(metrics)

Expand Down Expand Up @@ -1706,9 +1697,7 @@ def add_columns(
fields = [fields]

self.table._table.add_columns(
fields,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
fields, custom_metadata, post_commithook_properties
)

def add_constraint(
Expand Down Expand Up @@ -1747,9 +1736,7 @@ def add_constraint(
)

self.table._table.add_constraints(
constraints,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
constraints, custom_metadata, post_commithook_properties
)

def drop_constraint(
Expand Down Expand Up @@ -1788,10 +1775,7 @@ def drop_constraint(
```
"""
self.table._table.drop_constraints(
name,
raise_if_not_exists,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
name, raise_if_not_exists, custom_metadata, post_commithook_properties
)

def set_table_properties(
Expand Down Expand Up @@ -1897,7 +1881,7 @@ def compact(
min_commit_interval,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)
self.table.update_incremental()
return json.loads(metrics)
Expand Down Expand Up @@ -1967,7 +1951,7 @@ def z_order(
min_commit_interval,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)
self.table.update_incremental()
return json.loads(metrics)
8 changes: 2 additions & 6 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ def write_deltalake(
storage_options=storage_options,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
Expand Down Expand Up @@ -549,9 +547,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
schema,
partition_filters,
custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
table.update_incremental()
else:
Expand Down
44 changes: 24 additions & 20 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl RawDeltaTable {
retention_hours: Option<u64>,
enforce_retention_duration: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<Vec<String>> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = VacuumBuilder::new(
Expand Down Expand Up @@ -364,7 +364,7 @@ impl RawDeltaTable {
writer_properties: Option<PyWriterProperties>,
safe_cast: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = UpdateBuilder::new(
Expand Down Expand Up @@ -420,7 +420,7 @@ impl RawDeltaTable {
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = OptimizeBuilder::new(
Expand Down Expand Up @@ -480,7 +480,7 @@ impl RawDeltaTable {
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = OptimizeBuilder::new(
Expand Down Expand Up @@ -526,7 +526,7 @@ impl RawDeltaTable {
py: Python,
fields: Vec<Field>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = AddColumnBuilder::new(
Expand Down Expand Up @@ -559,7 +559,7 @@ impl RawDeltaTable {
py: Python,
constraints: HashMap<String, String>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = ConstraintBuilder::new(
Expand Down Expand Up @@ -590,7 +590,7 @@ impl RawDeltaTable {
name: String,
raise_if_not_exists: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = DropConstraintBuilder::new(
Expand Down Expand Up @@ -699,7 +699,7 @@ impl RawDeltaTable {
target_alias: Option<String>,
safe_cast: bool,
writer_properties: Option<PyWriterProperties>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<PyMergeBuilder> {
py.allow_threads(|| {
Expand Down Expand Up @@ -925,7 +925,7 @@ impl RawDeltaTable {
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
py.allow_threads(|| {
let mode = mode.parse().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1095,7 +1095,7 @@ impl RawDeltaTable {
predicate: Option<String>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = DeleteBuilder::new(
Expand Down Expand Up @@ -1154,7 +1154,7 @@ impl RawDeltaTable {
&mut self,
dry_run: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let mut cmd = FileSystemCheckBuilder::new(
self._table.log_store(),
Expand All @@ -1178,14 +1178,12 @@ impl RawDeltaTable {

fn set_post_commithook_properties(
mut commit_properties: CommitProperties,
post_commithook_properties: HashMap<String, Option<bool>>,
post_commithook_properties: PyPostCommitHookProperties,
) -> CommitProperties {
if let Some(Some(create_checkpoint)) = post_commithook_properties.get("create_checkpoint") {
commit_properties = commit_properties.with_create_checkpoint(*create_checkpoint)
}
if let Some(cleanup_expired_logs) = post_commithook_properties.get("cleanup_expired_logs") {
commit_properties = commit_properties.with_cleanup_expired_logs(*cleanup_expired_logs)
}
commit_properties =
commit_properties.with_create_checkpoint(post_commithook_properties.create_checkpoint);
commit_properties = commit_properties
.with_cleanup_expired_logs(post_commithook_properties.cleanup_expired_logs);
commit_properties
}

Expand Down Expand Up @@ -1305,7 +1303,7 @@ fn convert_partition_filters(

fn maybe_create_commit_properties(
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> Option<CommitProperties> {
if custom_metadata.is_none() && post_commithook_properties.is_none() {
return None;
Expand Down Expand Up @@ -1587,6 +1585,12 @@ pub struct PyWriterProperties {
column_properties: Option<HashMap<String, Option<ColumnProperties>>>,
}

#[derive(FromPyObject)]
pub struct PyPostCommitHookProperties {
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn write_to_deltalake(
Expand All @@ -1604,7 +1608,7 @@ fn write_to_deltalake(
storage_options: Option<HashMap<String, String>>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
py.allow_threads(|| {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
Expand Down
Loading
Loading