Skip to content

Commit

Permalink
add ColumnProperties And rework in python WriterProperties
Browse files Browse the repository at this point in the history
  • Loading branch information
sherlockbeard authored and ion-elgreco committed Aug 19, 2024
1 parent b762cb2 commit d595400
Show file tree
Hide file tree
Showing 6 changed files with 230 additions and 80 deletions.
10 changes: 9 additions & 1 deletion python/deltalake/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,17 @@
from .schema import DataType as DataType
from .schema import Field as Field
from .schema import Schema as Schema
from .table import (
BloomFilterProperties as BloomFilterProperties,
)
from .table import (
ColumnProperties as ColumnProperties,
)
from .table import DeltaTable as DeltaTable
from .table import Metadata as Metadata
from .table import PostCommitHookProperties as PostCommitHookProperties
from .table import WriterProperties as WriterProperties
from .table import (
WriterProperties as WriterProperties,
)
from .writer import convert_to_deltalake as convert_to_deltalake
from .writer import write_deltalake as write_deltalake
14 changes: 7 additions & 7 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ from typing import Any, Dict, List, Literal, Mapping, Optional, Tuple, Union
import pyarrow
import pyarrow.fs as fs

from deltalake.writer import AddAction
from deltalake.writer import AddAction, WriterProperties

__version__: str

Expand Down Expand Up @@ -67,7 +67,7 @@ class RawDeltaTable:
target_size: Optional[int],
max_concurrent_tasks: Optional[int],
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> str: ...
Expand All @@ -79,7 +79,7 @@ class RawDeltaTable:
max_concurrent_tasks: Optional[int],
max_spill_size: Optional[int],
min_commit_interval: Optional[int],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> str: ...
Expand Down Expand Up @@ -125,7 +125,7 @@ class RawDeltaTable:
def delete(
self,
predicate: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> str: ...
Expand All @@ -139,7 +139,7 @@ class RawDeltaTable:
self,
updates: Dict[str, str],
predicate: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
safe_cast: bool,
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
Expand All @@ -150,7 +150,7 @@ class RawDeltaTable:
predicate: str,
source_alias: Optional[str],
target_alias: Optional[str],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
safe_cast: bool,
Expand Down Expand Up @@ -214,7 +214,7 @@ def write_to_deltalake(
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
storage_options: Optional[Dict[str, str]],
writer_properties: Optional[Dict[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
custom_metadata: Optional[Dict[str, str]],
post_commithook_properties: Optional[Dict[str, Optional[bool]]],
) -> None: ...
Expand Down
82 changes: 70 additions & 12 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,58 @@ def __init__(
self.cleanup_expired_logs = cleanup_expired_logs


@dataclass(init=True)
class BloomFilterProperties:
"""The Bloom Filter Properties instance for the Rust parquet writer."""

def __init__(
self,
set_bloom_filter_enabled: Optional[bool],
fpp: Optional[float] = None,
ndv: Optional[int] = None,
):
"""Create a Bloom Filter Properties instance for the Rust parquet writer:
Args:
set_bloom_filter_enabled: If True and no fpp or ndv are provided, the default values will be used.
fpp: The false positive probability for the bloom filter. Must be between 0 and 1 exclusive.
ndv: The number of distinct values for the bloom filter.
"""
if fpp is not None and (fpp <= 0 or fpp >= 1):
raise ValueError("fpp must be between 0 and 1 exclusive")
self.set_bloom_filter_enabled = set_bloom_filter_enabled
self.fpp = fpp
self.ndv = ndv

def __str__(self) -> str:
return f"set_bloom_filter_enabled: {self.set_bloom_filter_enabled}, fpp: {self.fpp}, ndv: {self.ndv}"


@dataclass(init=True)
class ColumnProperties:
"""The Column Properties instance for the Rust parquet writer."""

def __init__(
self,
dictionary_enabled: Optional[bool] = None,
max_statistics_size: Optional[int] = None,
bloom_filter_properties: Optional[BloomFilterProperties] = None,
):
"""Create a Column Properties instance for the Rust parquet writer:
Args:
dictionary_enabled: Enable dictionary encoding for the column.
max_statistics_size: Maximum size of statistics for the column.
bloom_filter_properties: Bloom Filter Properties for the column.
"""
self.dictionary_enabled = dictionary_enabled
self.max_statistics_size = max_statistics_size
self.bloom_filter_properties = bloom_filter_properties

def __str__(self) -> str:
return f"dictionary_enabled: {self.dictionary_enabled}, max_statistics_size: {self.max_statistics_size}, bloom_filter_properties: {self.bloom_filter_properties}"


@dataclass(init=True)
class WriterProperties:
"""A Writer Properties instance for the Rust parquet writer."""
Expand All @@ -163,6 +215,8 @@ def __init__(
] = None,
compression_level: Optional[int] = None,
statistics_truncate_length: Optional[int] = None,
default_column_properties: Optional[ColumnProperties] = None,
column_properties: Optional[Dict[str, ColumnProperties]] = None,
):
"""Create a Writer Properties instance for the Rust parquet writer:
Expand All @@ -178,6 +232,8 @@ def __init__(
BROTLI: levels (1-11),
ZSTD: levels (1-22),
statistics_truncate_length: maximum length of truncated min/max values in statistics.
default_column_properties: Default Column Properties for the Rust parquet writer.
column_properties: Column Properties for the Rust parquet writer.
"""
self.data_page_size_limit = data_page_size_limit
self.dictionary_page_size_limit = dictionary_page_size_limit
Expand All @@ -186,6 +242,8 @@ def __init__(
self.max_row_group_size = max_row_group_size
self.compression = None
self.statistics_truncate_length = statistics_truncate_length
self.default_column_properties = default_column_properties
self.column_properties = column_properties

if compression_level is not None and compression is None:
raise ValueError(
Expand All @@ -211,18 +269,18 @@ def __init__(
self.compression = parquet_compression

def __str__(self) -> str:
column_properties_str = (
", ".join([f"column '{k}': {v}" for k, v in self.column_properties.items()])
if self.column_properties
else None
)
return (
f"WriterProperties(data_page_size_limit: {self.data_page_size_limit}, dictionary_page_size_limit: {self.dictionary_page_size_limit}, "
f"data_page_row_count_limit: {self.data_page_row_count_limit}, write_batch_size: {self.write_batch_size}, "
f"max_row_group_size: {self.max_row_group_size}, compression: {self.compression}, statistics_truncate_length: {self.statistics_truncate_length})"
f"max_row_group_size: {self.max_row_group_size}, compression: {self.compression}, statistics_truncate_length: {self.statistics_truncate_length},"
f"default_column_properties: {self.default_column_properties}, column_properties: {column_properties_str})"
)

def _to_dict(self) -> Dict[str, Optional[str]]:
values = {}
for key, value in self.__dict__.items():
values[key] = str(value) if isinstance(value, int) else value
return values


@dataclass(init=False)
class Metadata:
Expand Down Expand Up @@ -833,7 +891,7 @@ def update(
metrics = self._table.update(
updates,
predicate,
writer_properties._to_dict() if writer_properties else None,
writer_properties,
safe_cast=not error_on_type_mismatch,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
Expand Down Expand Up @@ -1229,7 +1287,7 @@ def delete(
"""
metrics = self._table.delete(
predicate,
writer_properties._to_dict() if writer_properties else None,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
)
Expand Down Expand Up @@ -1773,7 +1831,7 @@ def execute(self) -> Dict[str, Any]:
source_alias=self.source_alias,
target_alias=self.target_alias,
safe_cast=self.safe_cast,
writer_properties=self.writer_properties._to_dict()
writer_properties=self.writer_properties
if self.writer_properties
else None,
custom_metadata=self.custom_metadata,
Expand Down Expand Up @@ -2025,7 +2083,7 @@ def compact(
target_size,
max_concurrent_tasks,
min_commit_interval,
writer_properties._to_dict() if writer_properties else None,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
)
Expand Down Expand Up @@ -2095,7 +2153,7 @@ def z_order(
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
writer_properties._to_dict() if writer_properties else None,
writer_properties,
custom_metadata,
post_commithook_properties.__dict__ if post_commithook_properties else None,
)
Expand Down
4 changes: 1 addition & 3 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,7 @@ def write_deltalake(
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=(
writer_properties._to_dict() if writer_properties else None
),
writer_properties=writer_properties,
custom_metadata=custom_metadata,
post_commithook_properties=post_commithook_properties.__dict__
if post_commithook_properties
Expand Down
Loading

0 comments on commit d595400

Please sign in to comment.