Skip to content

Commit

Permalink
feat(python): expose z-order in Python (#1443)
Browse files Browse the repository at this point in the history
# Description

Updated the API to:

```python
DeltaTable.optimize.compact()
DeltaTable.optimize.z_order()
```

The old API of `DeltaTable.optimize()` still works, but now issues a
deprecation warning.

# Related Issue(s)

- closes #1442


# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
wjones127 authored Jun 7, 2023
1 parent 197db59 commit 5dc89b3
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 33 deletions.
115 changes: 88 additions & 27 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
NamedTuple,
Optional,
Expand Down Expand Up @@ -437,35 +438,11 @@ def vacuum(
max_concurrent_requests,
)

@property
def optimize(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has
not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append,
it will fail.
:param partition_filters: the partition filters that will be used for getting the matched files
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:return: the metrics from optimize
"""
metrics = self._table.optimize(
partition_filters, target_size, max_concurrent_tasks
)
self.update_incremental()
return json.loads(metrics)
) -> "TableOptimizer":
return TableOptimizer(self)

def pyarrow_schema(self) -> pyarrow.Schema:
"""
Expand Down Expand Up @@ -638,3 +615,87 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch:
2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4
"""
return self._table.get_add_actions(flatten)


class TableOptimizer:
"""API for various table optimization commands."""

def __init__(self, table: DeltaTable):
self.table = table

def __call__(
self,
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
.. deprecated:: 0.10.0
Use :meth:`compact` instead, which has the same signature.
"""

warnings.warn(
"Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.",
category=DeprecationWarning,
stacklevel=2,
)

return self.compact(partition_filters, target_size, max_concurrent_tasks)

def compact(
self,
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has
not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append,
it will fail.
:param partition_filters: the partition filters that will be used for getting the matched files
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:return: the metrics from optimize
"""
metrics = self.table._table.compact_optimize(
partition_filters, target_size, max_concurrent_tasks
)
self.table.update_incremental()
return json.loads(metrics)

def z_order(
self,
columns: Iterable[str],
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
) -> Dict[str, Any]:
"""
Reorders the data using a Z-order curve to improve data skipping.
This also performs compaction, so the same parameters as compact() apply.
:param columns: the columns to use for Z-ordering. There must be at least one column.
:param partition_filters: the partition filters that will be used for getting the matched files
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:return: the metrics from optimize
"""
metrics = self.table._table.z_order_optimize(
list(columns), partition_filters, target_size, max_concurrent_tasks
)
self.table.update_incremental()
return json.loads(metrics)
23 changes: 20 additions & 3 deletions python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,19 +417,36 @@ into a large file. Bin-packing reduces the number of API calls required for read
Optimizing will increments the table's version and creates remove actions for optimized files.
Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`.

Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a
concurrent writer performs an operation that removes any files (such as an overwrite).
``DeltaTable.optimize`` returns a :class:`TableOptimizer` object which provides
methods for optimizing the table. Note that these method will fail if a concurrent
writer performs an operation that removes any files (such as an overwrite).

For just file compaction, use the :meth:`TableOptimizer.compact` method:

.. code-block:: python
>>> dt = DeltaTable("../rust/tests/data/simple_table")
>>> dt.optimize()
>>> dt.optimize.compact()
{'numFilesAdded': 1, 'numFilesRemoved': 5,
'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555},
'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811},
'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5,
'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
For improved data skipping, use the :meth:`TableOptimizer.z_order` method. This
is slower than just file compaction, but can improve performance for queries that
filter on multiple columns at once.

.. code-block:: python
>>> dt = DeltaTable("../rust/tests/data/COVID-19_NYT")
>>> dt.optimize.z_order(["date", "county"])
{'numFilesAdded': 1, 'numFilesRemoved': 8,
'filesAdded': {'min': 2473439, 'max': 2473439, 'avg': 2473439.0, 'totalFiles': 1, 'totalSize': 2473439},
'filesRemoved': {'min': 325440, 'max': 895702, 'avg': 773810.625, 'totalFiles': 8, 'totalSize': 6190485},
'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 8,
'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
Writing Delta Tables
--------------------

Expand Down
30 changes: 28 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
Expand Down Expand Up @@ -268,7 +268,7 @@ impl RawDeltaTable {

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
pub fn optimize(
pub fn compact_optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
Expand All @@ -290,6 +290,32 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Run z-order variation of optimize
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None))]
pub fn z_order_optimize(
&mut self,
z_order_columns: Vec<String>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
.with_type(OptimizeType::ZOrder(z_order_columns));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);

let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
Expand Down
21 changes: 20 additions & 1 deletion python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,26 @@ def test_optimize_run_table(

dt = DeltaTable(table_path)
old_version = dt.version()
dt.optimize()
with pytest.warns(DeprecationWarning):
dt.optimize()
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1


def test_z_order_optimize(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, mode="append")
write_deltalake(tmp_path, sample_data, mode="append")
write_deltalake(tmp_path, sample_data, mode="append")

dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(["date32", "timestamp"])

last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1

0 comments on commit 5dc89b3

Please sign in to comment.