From d5954008a88a8cd3936f0d6b5c2fc002c6f11619 Mon Sep 17 00:00:00 2001 From: Siddharth kumar Date: Mon, 19 Aug 2024 21:36:55 +0530 Subject: [PATCH] add ColumnProperties And rework in python WriterProperties --- python/deltalake/__init__.py | 10 +- python/deltalake/_internal.pyi | 14 +-- python/deltalake/table.py | 82 ++++++++++++--- python/deltalake/writer.py | 4 +- python/src/lib.rs | 143 ++++++++++++++++++++------ python/tests/test_writerproperties.py | 57 +++++----- 6 files changed, 230 insertions(+), 80 deletions(-) diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index 60579a33a0..fda126d2e6 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -4,9 +4,17 @@ from .schema import DataType as DataType from .schema import Field as Field from .schema import Schema as Schema +from .table import ( + BloomFilterProperties as BloomFilterProperties, +) +from .table import ( + ColumnProperties as ColumnProperties, +) from .table import DeltaTable as DeltaTable from .table import Metadata as Metadata from .table import PostCommitHookProperties as PostCommitHookProperties -from .table import WriterProperties as WriterProperties +from .table import ( + WriterProperties as WriterProperties, +) from .writer import convert_to_deltalake as convert_to_deltalake from .writer import write_deltalake as write_deltalake diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 27033cb9d8..908a3d7807 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union import pyarrow import pyarrow.fs as fs -from deltalake.writer import AddAction +from deltalake.writer import AddAction, WriterProperties __version__: str @@ -67,7 +67,7 @@ class RawDeltaTable: target_size: Optional[int], max_concurrent_tasks: Optional[int], min_commit_interval: Optional[int], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> str: ... @@ -79,7 +79,7 @@ class RawDeltaTable: max_concurrent_tasks: Optional[int], max_spill_size: Optional[int], min_commit_interval: Optional[int], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> str: ... @@ -125,7 +125,7 @@ class RawDeltaTable: def delete( self, predicate: Optional[str], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> str: ... @@ -139,7 +139,7 @@ class RawDeltaTable: self, updates: Dict[str, str], predicate: Optional[str], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], safe_cast: bool, custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], @@ -150,7 +150,7 @@ class RawDeltaTable: predicate: str, source_alias: Optional[str], target_alias: Optional[str], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], safe_cast: bool, @@ -214,7 +214,7 @@ def write_to_deltalake( description: Optional[str], configuration: Optional[Mapping[str, Optional[str]]], storage_options: Optional[Dict[str, str]], - writer_properties: Optional[Dict[str, Optional[str]]], + writer_properties: Optional[WriterProperties], custom_metadata: Optional[Dict[str, str]], post_commithook_properties: Optional[Dict[str, Optional[bool]]], ) -> None: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index c9a4600c52..7ec54f807c 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -139,6 +139,58 @@ def __init__( self.cleanup_expired_logs = cleanup_expired_logs +@dataclass(init=True) +class BloomFilterProperties: + """The Bloom Filter Properties instance for the Rust parquet writer.""" + + def __init__( + self, + set_bloom_filter_enabled: Optional[bool], + fpp: Optional[float] = None, + ndv: Optional[int] = None, + ): + """Create a Bloom Filter Properties instance for the Rust parquet writer: + + Args: + set_bloom_filter_enabled: If True and no fpp or ndv are provided, the default values will be used. + fpp: The false positive probability for the bloom filter. Must be between 0 and 1 exclusive. + ndv: The number of distinct values for the bloom filter. + """ + if fpp is not None and (fpp <= 0 or fpp >= 1): + raise ValueError("fpp must be between 0 and 1 exclusive") + self.set_bloom_filter_enabled = set_bloom_filter_enabled + self.fpp = fpp + self.ndv = ndv + + def __str__(self) -> str: + return f"set_bloom_filter_enabled: {self.set_bloom_filter_enabled}, fpp: {self.fpp}, ndv: {self.ndv}" + + +@dataclass(init=True) +class ColumnProperties: + """The Column Properties instance for the Rust parquet writer.""" + + def __init__( + self, + dictionary_enabled: Optional[bool] = None, + max_statistics_size: Optional[int] = None, + bloom_filter_properties: Optional[BloomFilterProperties] = None, + ): + """Create a Column Properties instance for the Rust parquet writer: + + Args: + dictionary_enabled: Enable dictionary encoding for the column. + max_statistics_size: Maximum size of statistics for the column. + bloom_filter_properties: Bloom Filter Properties for the column. + """ + self.dictionary_enabled = dictionary_enabled + self.max_statistics_size = max_statistics_size + self.bloom_filter_properties = bloom_filter_properties + + def __str__(self) -> str: + return f"dictionary_enabled: {self.dictionary_enabled}, max_statistics_size: {self.max_statistics_size}, bloom_filter_properties: {self.bloom_filter_properties}" + + @dataclass(init=True) class WriterProperties: """A Writer Properties instance for the Rust parquet writer.""" @@ -163,6 +215,8 @@ def __init__( ] = None, compression_level: Optional[int] = None, statistics_truncate_length: Optional[int] = None, + default_column_properties: Optional[ColumnProperties] = None, + column_properties: Optional[Dict[str, ColumnProperties]] = None, ): """Create a Writer Properties instance for the Rust parquet writer: @@ -178,6 +232,8 @@ def __init__( BROTLI: levels (1-11), ZSTD: levels (1-22), statistics_truncate_length: maximum length of truncated min/max values in statistics. + default_column_properties: Default Column Properties for the Rust parquet writer. + column_properties: Column Properties for the Rust parquet writer. """ self.data_page_size_limit = data_page_size_limit self.dictionary_page_size_limit = dictionary_page_size_limit @@ -186,6 +242,8 @@ def __init__( self.max_row_group_size = max_row_group_size self.compression = None self.statistics_truncate_length = statistics_truncate_length + self.default_column_properties = default_column_properties + self.column_properties = column_properties if compression_level is not None and compression is None: raise ValueError( @@ -211,18 +269,18 @@ def __init__( self.compression = parquet_compression def __str__(self) -> str: + column_properties_str = ( + ", ".join([f"column '{k}': {v}" for k, v in self.column_properties.items()]) + if self.column_properties + else None + ) return ( f"WriterProperties(data_page_size_limit: {self.data_page_size_limit}, dictionary_page_size_limit: {self.dictionary_page_size_limit}, " f"data_page_row_count_limit: {self.data_page_row_count_limit}, write_batch_size: {self.write_batch_size}, " - f"max_row_group_size: {self.max_row_group_size}, compression: {self.compression}, statistics_truncate_length: {self.statistics_truncate_length})" + f"max_row_group_size: {self.max_row_group_size}, compression: {self.compression}, statistics_truncate_length: {self.statistics_truncate_length}," + f"default_column_properties: {self.default_column_properties}, column_properties: {column_properties_str})" ) - def _to_dict(self) -> Dict[str, Optional[str]]: - values = {} - for key, value in self.__dict__.items(): - values[key] = str(value) if isinstance(value, int) else value - return values - @dataclass(init=False) class Metadata: @@ -833,7 +891,7 @@ def update( metrics = self._table.update( updates, predicate, - writer_properties._to_dict() if writer_properties else None, + writer_properties, safe_cast=not error_on_type_mismatch, custom_metadata=custom_metadata, post_commithook_properties=post_commithook_properties.__dict__ @@ -1229,7 +1287,7 @@ def delete( """ metrics = self._table.delete( predicate, - writer_properties._to_dict() if writer_properties else None, + writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, ) @@ -1773,7 +1831,7 @@ def execute(self) -> Dict[str, Any]: source_alias=self.source_alias, target_alias=self.target_alias, safe_cast=self.safe_cast, - writer_properties=self.writer_properties._to_dict() + writer_properties=self.writer_properties if self.writer_properties else None, custom_metadata=self.custom_metadata, @@ -2025,7 +2083,7 @@ def compact( target_size, max_concurrent_tasks, min_commit_interval, - writer_properties._to_dict() if writer_properties else None, + writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, ) @@ -2095,7 +2153,7 @@ def z_order( max_concurrent_tasks, max_spill_size, min_commit_interval, - writer_properties._to_dict() if writer_properties else None, + writer_properties, custom_metadata, post_commithook_properties.__dict__ if post_commithook_properties else None, ) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 50aa5841a0..99b915183f 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -312,9 +312,7 @@ def write_deltalake( description=description, configuration=configuration, storage_options=storage_options, - writer_properties=( - writer_properties._to_dict() if writer_properties else None - ), + writer_properties=writer_properties, custom_metadata=custom_metadata, post_commithook_properties=post_commithook_properties.__dict__ if post_commithook_properties diff --git a/python/src/lib.rs b/python/src/lib.rs index 0f2a8151db..41f751df33 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -382,7 +382,7 @@ impl RawDeltaTable { py: Python, updates: HashMap, predicate: Option, - writer_properties: Option>>, + writer_properties: Option, safe_cast: bool, custom_metadata: Option>, post_commithook_properties: Option>>, @@ -439,7 +439,7 @@ impl RawDeltaTable { target_size: Option, max_concurrent_tasks: Option, min_commit_interval: Option, - writer_properties: Option>>, + writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, ) -> PyResult { @@ -499,7 +499,7 @@ impl RawDeltaTable { max_concurrent_tasks: Option, max_spill_size: usize, min_commit_interval: Option, - writer_properties: Option>>, + writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, ) -> PyResult { @@ -728,7 +728,7 @@ impl RawDeltaTable { source_alias: Option, target_alias: Option, safe_cast: bool, - writer_properties: Option>>, + writer_properties: Option, post_commithook_properties: Option>>, custom_metadata: Option>, matched_update_updates: Option>>, @@ -1253,7 +1253,7 @@ impl RawDeltaTable { &mut self, py: Python, predicate: Option, - writer_properties: Option>>, + writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, ) -> PyResult { @@ -1349,47 +1349,95 @@ fn set_post_commithook_properties( commit_properties } -fn set_writer_properties( - writer_properties: HashMap>, -) -> DeltaResult { +fn set_writer_properties(writer_properties: PyWriterProperties) -> DeltaResult { let mut properties = WriterProperties::builder(); - let data_page_size_limit = writer_properties.get("data_page_size_limit"); - let dictionary_page_size_limit = writer_properties.get("dictionary_page_size_limit"); - let data_page_row_count_limit = writer_properties.get("data_page_row_count_limit"); - let write_batch_size = writer_properties.get("write_batch_size"); - let max_row_group_size = writer_properties.get("max_row_group_size"); - let compression = writer_properties.get("compression"); - let statistics_truncate_length = writer_properties.get("statistics_truncate_length"); - - if let Some(Some(data_page_size)) = data_page_size_limit { - properties = properties.set_data_page_size_limit(data_page_size.parse::().unwrap()); + let data_page_size_limit = writer_properties.data_page_size_limit; + let dictionary_page_size_limit = writer_properties.dictionary_page_size_limit; + let data_page_row_count_limit = writer_properties.data_page_row_count_limit; + let write_batch_size = writer_properties.write_batch_size; + let max_row_group_size = writer_properties.max_row_group_size; + let compression = writer_properties.compression; + let statistics_truncate_length = writer_properties.statistics_truncate_length; + let default_column_properties = writer_properties.default_column_properties; + let column_properties = writer_properties.column_properties; + + if let Some(data_page_size) = data_page_size_limit { + properties = properties.set_data_page_size_limit(data_page_size); } - if let Some(Some(dictionary_page_size)) = dictionary_page_size_limit { - properties = properties - .set_dictionary_page_size_limit(dictionary_page_size.parse::().unwrap()); + if let Some(dictionary_page_size) = dictionary_page_size_limit { + properties = properties.set_dictionary_page_size_limit(dictionary_page_size); } - if let Some(Some(data_page_row_count)) = data_page_row_count_limit { - properties = - properties.set_data_page_row_count_limit(data_page_row_count.parse::().unwrap()); + if let Some(data_page_row_count) = data_page_row_count_limit { + properties = properties.set_data_page_row_count_limit(data_page_row_count); } - if let Some(Some(batch_size)) = write_batch_size { - properties = properties.set_write_batch_size(batch_size.parse::().unwrap()); + if let Some(batch_size) = write_batch_size { + properties = properties.set_write_batch_size(batch_size); } - if let Some(Some(row_group_size)) = max_row_group_size { - properties = properties.set_max_row_group_size(row_group_size.parse::().unwrap()); - } - if let Some(Some(statistics_truncate_length)) = statistics_truncate_length { - properties = properties - .set_statistics_truncate_length(statistics_truncate_length.parse::().ok()); + if let Some(row_group_size) = max_row_group_size { + properties = properties.set_max_row_group_size(row_group_size); } + properties = properties.set_statistics_truncate_length(statistics_truncate_length); - if let Some(Some(compression)) = compression { + if let Some(compression) = compression { let compress: Compression = compression .parse() .map_err(|err: ParquetError| DeltaTableError::Generic(err.to_string()))?; properties = properties.set_compression(compress); } + + if let Some(default_column_properties) = default_column_properties { + if let Some(dictionary_enabled) = default_column_properties.dictionary_enabled { + properties = properties.set_dictionary_enabled(dictionary_enabled); + } + if let Some(max_statistics_size) = default_column_properties.max_statistics_size { + properties = properties.set_max_statistics_size(max_statistics_size); + } + if let Some(bloom_filter_properties) = default_column_properties.bloom_filter_properties { + if let Some(set_bloom_filter_enabled) = bloom_filter_properties.set_bloom_filter_enabled + { + properties = properties.set_bloom_filter_enabled(set_bloom_filter_enabled); + } + if let Some(bloom_filter_fpp) = bloom_filter_properties.fpp { + properties = properties.set_bloom_filter_fpp(bloom_filter_fpp); + } + if let Some(bloom_filter_ndv) = bloom_filter_properties.ndv { + properties = properties.set_bloom_filter_ndv(bloom_filter_ndv); + } + } + } + if let Some(column_properties) = column_properties { + for (column_name, column_prop) in column_properties { + if let Some(column_prop) = column_prop { + if let Some(dictionary_enabled) = column_prop.dictionary_enabled { + properties = properties.set_column_dictionary_enabled( + column_name.clone().into(), + dictionary_enabled, + ); + } + if let Some(bloom_filter_properties) = column_prop.bloom_filter_properties { + if let Some(set_bloom_filter_enabled) = + bloom_filter_properties.set_bloom_filter_enabled + { + properties = properties.set_column_bloom_filter_enabled( + column_name.clone().into(), + set_bloom_filter_enabled, + ); + } + if let Some(bloom_filter_fpp) = bloom_filter_properties.fpp { + properties = properties.set_column_bloom_filter_fpp( + column_name.clone().into(), + bloom_filter_fpp, + ); + } + if let Some(bloom_filter_ndv) = bloom_filter_properties.ndv { + properties = properties + .set_column_bloom_filter_ndv(column_name.into(), bloom_filter_ndv); + } + } + } + } + } Ok(properties.build()) } @@ -1672,6 +1720,33 @@ impl From<&PyAddAction> for Add { } } +#[derive(FromPyObject)] +pub struct BloomFilterProperties { + pub set_bloom_filter_enabled: Option, + pub fpp: Option, + pub ndv: Option, +} + +#[derive(FromPyObject)] +pub struct ColumnProperties { + pub dictionary_enabled: Option, + pub max_statistics_size: Option, + pub bloom_filter_properties: Option, +} + +#[derive(FromPyObject)] +pub struct PyWriterProperties { + data_page_size_limit: Option, + dictionary_page_size_limit: Option, + data_page_row_count_limit: Option, + write_batch_size: Option, + max_row_group_size: Option, + statistics_truncate_length: Option, + compression: Option, + default_column_properties: Option, + column_properties: Option>>, +} + #[pyfunction] #[allow(clippy::too_many_arguments)] fn write_to_deltalake( @@ -1687,7 +1762,7 @@ fn write_to_deltalake( description: Option, configuration: Option>>, storage_options: Option>, - writer_properties: Option>>, + writer_properties: Option, custom_metadata: Option>, post_commithook_properties: Option>>, ) -> PyResult<()> { diff --git a/python/tests/test_writerproperties.py b/python/tests/test_writerproperties.py index 89f8e02690..30c25548ad 100644 --- a/python/tests/test_writerproperties.py +++ b/python/tests/test_writerproperties.py @@ -4,7 +4,13 @@ import pyarrow.parquet as pq import pytest -from deltalake import DeltaTable, WriterProperties, write_deltalake +from deltalake import ( + BloomFilterProperties, + ColumnProperties, + DeltaTable, + WriterProperties, + write_deltalake, +) def test_writer_properties_all_filled(): @@ -16,35 +22,35 @@ def test_writer_properties_all_filled(): max_row_group_size=500, compression="SNAPPY", statistics_truncate_length=600, + default_column_properties=ColumnProperties( + dictionary_enabled=False, + ), + column_properties={ + "a": ColumnProperties( + dictionary_enabled=True, + max_statistics_size=40, + bloom_filter_properties=BloomFilterProperties( + set_bloom_filter_enabled=True, fpp=0.2, ndv=30 + ), + ), + "b": ColumnProperties( + dictionary_enabled=True, + max_statistics_size=400, + bloom_filter_properties=BloomFilterProperties( + set_bloom_filter_enabled=False, fpp=0.2, ndv=30 + ), + ), + }, ) - expected = { - "data_page_size_limit": "100", - "dictionary_page_size_limit": "200", - "data_page_row_count_limit": "300", - "write_batch_size": "400", - "max_row_group_size": "500", - "compression": "SNAPPY", - "statistics_truncate_length": "600", - } - - assert wp._to_dict() == expected + assert wp.default_column_properties.bloom_filter_properties is None + assert wp.column_properties["a"].bloom_filter_properties.fpp == 0.2 def test_writer_properties_lower_case_compression(): wp = WriterProperties(compression="snappy") # type: ignore - expected = { - "data_page_size_limit": None, - "dictionary_page_size_limit": None, - "data_page_row_count_limit": None, - "write_batch_size": None, - "max_row_group_size": None, - "compression": "SNAPPY", - "statistics_truncate_length": None, - } - - assert wp._to_dict() == expected + assert wp.compression == "SNAPPY" @pytest.mark.parametrize( @@ -78,6 +84,11 @@ def test_writer_properties_no_compression(): WriterProperties(compression_level=10) +def test_invalid_fpp_value(): + with pytest.raises(ValueError): + BloomFilterProperties(set_bloom_filter_enabled=True, fpp=1.1, ndv=30) + + def test_write_with_writerproperties( tmp_path: pathlib.Path, sample_table: pa.Table, writer_properties: WriterProperties ):