From 5dc89b389d830cf72f68ace54adab85c81c26a69 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 6 Jun 2023 21:47:50 -0700 Subject: [PATCH] feat(python): expose z-order in Python (#1443) # Description Updated the API to: ```python DeltaTable.optimize.compact() DeltaTable.optimize.z_order() ``` The old API of `DeltaTable.optimize()` still works, but now issues a deprecation warning. # Related Issue(s) - closes #1442 # Documentation --- python/deltalake/table.py | 115 ++++++++++++++++++++++++++-------- python/docs/source/usage.rst | 23 ++++++- python/src/lib.rs | 30 ++++++++- python/tests/test_optimize.py | 21 ++++++- 4 files changed, 156 insertions(+), 33 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 846f11b58d..dc7e7a1214 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -8,6 +8,7 @@ TYPE_CHECKING, Any, Dict, + Iterable, List, NamedTuple, Optional, @@ -437,35 +438,11 @@ def vacuum( max_concurrent_requests, ) + @property def optimize( self, - partition_filters: Optional[List[Tuple[str, str, Any]]] = None, - target_size: Optional[int] = None, - max_concurrent_tasks: Optional[int] = None, - ) -> Dict[str, Any]: - """ - Compacts small files to reduce the total number of files in the table. - - This operation is idempotent; if run twice on the same table (assuming it has - not been updated) it will do nothing the second time. - - If this operation happens concurrently with any operations other than append, - it will fail. - - :param partition_filters: the partition filters that will be used for getting the matched files - :param target_size: desired file size after bin-packing files, in bytes. If not - provided, will attempt to read the table configuration value ``delta.targetFileSize``. - If that value isn't set, will use default value of 256MB. - :param max_concurrent_tasks: the maximum number of concurrent tasks to use for - file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction - faster, but will also use more memory. - :return: the metrics from optimize - """ - metrics = self._table.optimize( - partition_filters, target_size, max_concurrent_tasks - ) - self.update_incremental() - return json.loads(metrics) + ) -> "TableOptimizer": + return TableOptimizer(self) def pyarrow_schema(self) -> pyarrow.Schema: """ @@ -638,3 +615,87 @@ def get_add_actions(self, flatten: bool = False) -> pyarrow.RecordBatch: 2 x=1/0-91820cbf-f698-45fb-886d-5d5f5669530b-0.p... 565 1970-01-20 08:40:08.071 True 1 1 0 4 4 """ return self._table.get_add_actions(flatten) + + +class TableOptimizer: + """API for various table optimization commands.""" + + def __init__(self, table: DeltaTable): + self.table = table + + def __call__( + self, + partition_filters: Optional[FilterType] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + .. deprecated:: 0.10.0 + Use :meth:`compact` instead, which has the same signature. + """ + + warnings.warn( + "Call to deprecated method DeltaTable.optimize. Use DeltaTable.optimize.compact() instead.", + category=DeprecationWarning, + stacklevel=2, + ) + + return self.compact(partition_filters, target_size, max_concurrent_tasks) + + def compact( + self, + partition_filters: Optional[FilterType] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Compacts small files to reduce the total number of files in the table. + + This operation is idempotent; if run twice on the same table (assuming it has + not been updated) it will do nothing the second time. + + If this operation happens concurrently with any operations other than append, + it will fail. + + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. + :param max_concurrent_tasks: the maximum number of concurrent tasks to use for + file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction + faster, but will also use more memory. + :return: the metrics from optimize + """ + metrics = self.table._table.compact_optimize( + partition_filters, target_size, max_concurrent_tasks + ) + self.table.update_incremental() + return json.loads(metrics) + + def z_order( + self, + columns: Iterable[str], + partition_filters: Optional[FilterType] = None, + target_size: Optional[int] = None, + max_concurrent_tasks: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Reorders the data using a Z-order curve to improve data skipping. + + This also performs compaction, so the same parameters as compact() apply. + + :param columns: the columns to use for Z-ordering. There must be at least one column. + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. + :param max_concurrent_tasks: the maximum number of concurrent tasks to use for + file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction + faster, but will also use more memory. + :return: the metrics from optimize + """ + metrics = self.table._table.z_order_optimize( + list(columns), partition_filters, target_size, max_concurrent_tasks + ) + self.table.update_incremental() + return json.loads(metrics) diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 5390fb1c0f..fdecfdc1a1 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -417,19 +417,36 @@ into a large file. Bin-packing reduces the number of API calls required for read Optimizing will increments the table's version and creates remove actions for optimized files. Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. -Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a -concurrent writer performs an operation that removes any files (such as an overwrite). +``DeltaTable.optimize`` returns a :class:`TableOptimizer` object which provides +methods for optimizing the table. Note that these method will fail if a concurrent +writer performs an operation that removes any files (such as an overwrite). + +For just file compaction, use the :meth:`TableOptimizer.compact` method: .. code-block:: python >>> dt = DeltaTable("../rust/tests/data/simple_table") - >>> dt.optimize() + >>> dt.optimize.compact() {'numFilesAdded': 1, 'numFilesRemoved': 5, 'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555}, 'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811}, 'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5, 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} +For improved data skipping, use the :meth:`TableOptimizer.z_order` method. This +is slower than just file compaction, but can improve performance for queries that +filter on multiple columns at once. + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/COVID-19_NYT") + >>> dt.optimize.z_order(["date", "county"]) + {'numFilesAdded': 1, 'numFilesRemoved': 8, + 'filesAdded': {'min': 2473439, 'max': 2473439, 'avg': 2473439.0, 'totalFiles': 1, 'totalSize': 2473439}, + 'filesRemoved': {'min': 325440, 'max': 895702, 'avg': 773810.625, 'totalFiles': 8, 'totalSize': 6190485}, + 'partitionsOptimized': 0, 'numBatches': 1, 'totalConsideredFiles': 8, + 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} + Writing Delta Tables -------------------- diff --git a/python/src/lib.rs b/python/src/lib.rs index 8e2c4d282f..4b63567f26 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,7 +24,7 @@ use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; -use deltalake::operations::optimize::OptimizeBuilder; +use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; @@ -268,7 +268,7 @@ impl RawDeltaTable { /// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))] - pub fn optimize( + pub fn compact_optimize( &mut self, partition_filters: Option>, target_size: Option, @@ -290,6 +290,32 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + /// Run z-order variation of optimize + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None))] + pub fn z_order_optimize( + &mut self, + z_order_columns: Vec, + partition_filters: Option>, + target_size: Option, + max_concurrent_tasks: Option, + ) -> PyResult { + let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) + .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) + .with_type(OptimizeType::ZOrder(z_order_columns)); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PythonError::from)?; + cmd = cmd.with_filters(&converted_filters); + + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 31339d3110..665aaaec8f 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -23,7 +23,26 @@ def test_optimize_run_table( dt = DeltaTable(table_path) old_version = dt.version() - dt.optimize() + with pytest.warns(DeprecationWarning): + dt.optimize() + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + assert dt.version() == old_version + 1 + + +def test_z_order_optimize( + tmp_path: pathlib.Path, + sample_data: pa.Table, +): + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + dt.optimize.z_order(["date32", "timestamp"]) + last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" assert dt.version() == old_version + 1