Skip to content

Commit

Permalink
refactor: post_commit_hook_properties
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 30, 2024
1 parent 62d4887 commit 4108a57
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 67 deletions.
26 changes: 13 additions & 13 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union
import pyarrow
import pyarrow.fs as fs

from deltalake.writer import AddAction, WriterProperties
from deltalake.writer import AddAction, PostCommitHookProperties, WriterProperties

__version__: str

Expand Down Expand Up @@ -58,7 +58,7 @@ class RawDeltaTable:
retention_hours: Optional[int],
enforce_retention_duration: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> List[str]: ...
def compact_optimize(
self,
Expand All @@ -68,7 +68,7 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def z_order_optimize(
self,
Expand All @@ -80,26 +80,26 @@ class RawDeltaTable:
min_commit_interval: Optional[int],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def add_columns(
self,
fields: List[Field],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def add_constraints(
self,
constraints: Dict[str, str],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def drop_constraints(
self,
name: str,
raise_if_not_exists: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def set_table_properties(
self,
Expand All @@ -126,13 +126,13 @@ class RawDeltaTable:
predicate: Optional[str],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def repair(
self,
dry_run: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def update(
self,
Expand All @@ -141,7 +141,7 @@ class RawDeltaTable:
writer_properties: Optional[WriterProperties],
safe_cast: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> str: ...
def create_merge_builder(
self,
Expand All @@ -151,7 +151,7 @@ class RawDeltaTable:
target_alias: Optional[str],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
safe_cast: bool,
) -> PyMergeBuilder: ...
def merge_execute(self, merge_builder: PyMergeBuilder) -> str: ...
Expand All @@ -166,7 +166,7 @@ class RawDeltaTable:
schema: pyarrow.Schema,
partitions_filters: Optional[FilterType],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def cleanup_metadata(self) -> None: ...
def check_can_write_timestamp_ntz(self, schema: pyarrow.Schema) -> None: ...
Expand Down Expand Up @@ -206,7 +206,7 @@ def write_to_deltalake(
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def convert_to_deltalake(
uri: str,
Expand Down
36 changes: 10 additions & 26 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ def vacuum(
retention_hours,
enforce_retention_duration,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)

def update(
Expand Down Expand Up @@ -852,9 +852,7 @@ def update(
writer_properties,
safe_cast=not error_on_type_mismatch,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
return json.loads(metrics)

Expand Down Expand Up @@ -961,9 +959,7 @@ def merge(
safe_cast=not error_on_type_mismatch,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
return TableMerger(py_merge_builder, self._table)

Expand Down Expand Up @@ -1246,10 +1242,7 @@ def delete(
the metrics from delete.
"""
metrics = self._table.delete(
predicate,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
predicate, writer_properties, custom_metadata, post_commithook_properties
)
return json.loads(metrics)

Expand Down Expand Up @@ -1286,9 +1279,7 @@ def repair(
```
"""
metrics = self._table.repair(
dry_run,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
dry_run, custom_metadata, post_commithook_properties
)
return json.loads(metrics)

Expand Down Expand Up @@ -1706,9 +1697,7 @@ def add_columns(
fields = [fields]

self.table._table.add_columns(
fields,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
fields, custom_metadata, post_commithook_properties
)

def add_constraint(
Expand Down Expand Up @@ -1747,9 +1736,7 @@ def add_constraint(
)

self.table._table.add_constraints(
constraints,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
constraints, custom_metadata, post_commithook_properties
)

def drop_constraint(
Expand Down Expand Up @@ -1788,10 +1775,7 @@ def drop_constraint(
```
"""
self.table._table.drop_constraints(
name,
raise_if_not_exists,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
name, raise_if_not_exists, custom_metadata, post_commithook_properties
)

def set_table_properties(
Expand Down Expand Up @@ -1897,7 +1881,7 @@ def compact(
min_commit_interval,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)
self.table.update_incremental()
return json.loads(metrics)
Expand Down Expand Up @@ -1967,7 +1951,7 @@ def z_order(
min_commit_interval,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
post_commithook_properties,
)
self.table.update_incremental()
return json.loads(metrics)
8 changes: 2 additions & 6 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,7 @@ def write_deltalake(
storage_options=storage_options,
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
Expand Down Expand Up @@ -549,9 +547,7 @@ def validate_batch(batch: pa.RecordBatch) -> pa.RecordBatch:
schema,
partition_filters,
custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
else None,
post_commithook_properties=post_commithook_properties,
)
table.update_incremental()
else:
Expand Down
44 changes: 24 additions & 20 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl RawDeltaTable {
retention_hours: Option<u64>,
enforce_retention_duration: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<Vec<String>> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = VacuumBuilder::new(
Expand Down Expand Up @@ -364,7 +364,7 @@ impl RawDeltaTable {
writer_properties: Option<PyWriterProperties>,
safe_cast: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = UpdateBuilder::new(
Expand Down Expand Up @@ -420,7 +420,7 @@ impl RawDeltaTable {
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = OptimizeBuilder::new(
Expand Down Expand Up @@ -480,7 +480,7 @@ impl RawDeltaTable {
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = OptimizeBuilder::new(
Expand Down Expand Up @@ -526,7 +526,7 @@ impl RawDeltaTable {
py: Python,
fields: Vec<Field>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = AddColumnBuilder::new(
Expand Down Expand Up @@ -559,7 +559,7 @@ impl RawDeltaTable {
py: Python,
constraints: HashMap<String, String>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = ConstraintBuilder::new(
Expand Down Expand Up @@ -590,7 +590,7 @@ impl RawDeltaTable {
name: String,
raise_if_not_exists: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
let table = py.allow_threads(|| {
let mut cmd = DropConstraintBuilder::new(
Expand Down Expand Up @@ -699,7 +699,7 @@ impl RawDeltaTable {
target_alias: Option<String>,
safe_cast: bool,
writer_properties: Option<PyWriterProperties>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
custom_metadata: Option<HashMap<String, String>>,
) -> PyResult<PyMergeBuilder> {
py.allow_threads(|| {
Expand Down Expand Up @@ -925,7 +925,7 @@ impl RawDeltaTable {
schema: PyArrowType<ArrowSchema>,
partitions_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
py.allow_threads(|| {
let mode = mode.parse().map_err(PythonError::from)?;
Expand Down Expand Up @@ -1095,7 +1095,7 @@ impl RawDeltaTable {
predicate: Option<String>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let (table, metrics) = py.allow_threads(|| {
let mut cmd = DeleteBuilder::new(
Expand Down Expand Up @@ -1154,7 +1154,7 @@ impl RawDeltaTable {
&mut self,
dry_run: bool,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<String> {
let mut cmd = FileSystemCheckBuilder::new(
self._table.log_store(),
Expand All @@ -1178,14 +1178,12 @@ impl RawDeltaTable {

fn set_post_commithook_properties(
mut commit_properties: CommitProperties,
post_commithook_properties: HashMap<String, Option<bool>>,
post_commithook_properties: PyPostCommitHookProperties,
) -> CommitProperties {
if let Some(Some(create_checkpoint)) = post_commithook_properties.get("create_checkpoint") {
commit_properties = commit_properties.with_create_checkpoint(*create_checkpoint)
}
if let Some(cleanup_expired_logs) = post_commithook_properties.get("cleanup_expired_logs") {
commit_properties = commit_properties.with_cleanup_expired_logs(*cleanup_expired_logs)
}
commit_properties =
commit_properties.with_create_checkpoint(post_commithook_properties.create_checkpoint);
commit_properties = commit_properties
.with_cleanup_expired_logs(post_commithook_properties.cleanup_expired_logs);
commit_properties
}

Expand Down Expand Up @@ -1305,7 +1303,7 @@ fn convert_partition_filters(

fn maybe_create_commit_properties(
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> Option<CommitProperties> {
if custom_metadata.is_none() && post_commithook_properties.is_none() {
return None;
Expand Down Expand Up @@ -1587,6 +1585,12 @@ pub struct PyWriterProperties {
column_properties: Option<HashMap<String, Option<ColumnProperties>>>,
}

#[derive(FromPyObject)]
pub struct PyPostCommitHookProperties {
create_checkpoint: bool,
cleanup_expired_logs: Option<bool>,
}

#[pyfunction]
#[allow(clippy::too_many_arguments)]
fn write_to_deltalake(
Expand All @@ -1604,7 +1608,7 @@ fn write_to_deltalake(
storage_options: Option<HashMap<String, String>>,
writer_properties: Option<PyWriterProperties>,
custom_metadata: Option<HashMap<String, String>>,
post_commithook_properties: Option<HashMap<String, Option<bool>>>,
post_commithook_properties: Option<PyPostCommitHookProperties>,
) -> PyResult<()> {
py.allow_threads(|| {
let batches = data.0.map(|batch| batch.unwrap()).collect::<Vec<_>>();
Expand Down
Loading

0 comments on commit 4108a57

Please sign in to comment.